diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHConnectionImpl.java b/src/main/java/ru/yandex/metrika/clickhouse/CHConnectionImpl.java index 8052c4e3933e379694c66667e0b7e531286cff86..992ab1ae47ed690a0db2981f2f70ef490cadfee7 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHConnectionImpl.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHConnectionImpl.java @@ -1,7 +1,7 @@ package ru.yandex.metrika.clickhouse; import org.apache.http.impl.client.CloseableHttpClient; -import ru.yandex.metrika.clickhouse.copypaste.HttpConnectionProperties; +import ru.yandex.metrika.clickhouse.copypaste.CHProperties; import ru.yandex.metrika.clickhouse.util.CHHttpClientBuilder; import ru.yandex.metrika.clickhouse.util.LogProxy; import ru.yandex.metrika.clickhouse.util.Logger; @@ -22,21 +22,21 @@ public class CHConnectionImpl implements CHConnection { private final CloseableHttpClient httpclient; - private final HttpConnectionProperties properties; + private final CHProperties properties; private CHDataSource dataSource; private boolean closed = false; public CHConnectionImpl(String url){ - this(url, new HttpConnectionProperties()); + this(url, new CHProperties()); } public CHConnectionImpl(String url, Properties info){ - this(url, new HttpConnectionProperties(info)); + this(url, new CHProperties(info)); } - public CHConnectionImpl(String url, HttpConnectionProperties properties) { + public CHConnectionImpl(String url, CHProperties properties) { this.properties = properties; this.dataSource = new CHDataSource(url); CHHttpClientBuilder clientBuilder = new CHHttpClientBuilder(properties); diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHDataSource.java b/src/main/java/ru/yandex/metrika/clickhouse/CHDataSource.java index fa05882e675ea57d7a4f8e4e14999a83d6738550..925a6527d489039f15b42df5c25261dbef1388eb 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHDataSource.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHDataSource.java @@ -1,6 +1,6 @@ package ru.yandex.metrika.clickhouse; -import ru.yandex.metrika.clickhouse.copypaste.HttpConnectionProperties; +import ru.yandex.metrika.clickhouse.copypaste.CHProperties; import javax.sql.DataSource; import java.io.PrintWriter; @@ -35,17 +35,17 @@ public class CHDataSource implements DataSource { PrintWriter printWriter; protected int loginTimeout = 0; - private HttpConnectionProperties properties; + private CHProperties properties; public CHDataSource(String url) { - this(url, new HttpConnectionProperties()); + this(url, new CHProperties()); } public CHDataSource(String url, Properties info) { - this(url, new HttpConnectionProperties(info)); + this(url, new CHProperties(info)); } - public CHDataSource(String url, HttpConnectionProperties properties) { + public CHDataSource(String url, CHProperties properties) { if (url == null) { throw new IllegalArgumentException("Incorrect clickhouse jdbc url: " + url); } @@ -64,6 +64,7 @@ public class CHDataSource implements DataSource { throw new IllegalArgumentException("Incorrect clickhouse jdbc url: " + url); } this.properties = properties; + this.properties.setDatabase(database); } @Override diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHDriver.java b/src/main/java/ru/yandex/metrika/clickhouse/CHDriver.java index bd7ceb0f3ebb2a61593c46bde2c834926cf6be11..271321187664010ee2f0db3e4d222796457a03bb 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHDriver.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHDriver.java @@ -1,6 +1,6 @@ package ru.yandex.metrika.clickhouse; -import ru.yandex.metrika.clickhouse.copypaste.HttpConnectionProperties; +import ru.yandex.metrika.clickhouse.copypaste.CHProperties; import ru.yandex.metrika.clickhouse.util.LogProxy; import ru.yandex.metrika.clickhouse.util.Logger; @@ -39,7 +39,7 @@ public class CHDriver implements Driver { return LogProxy.wrap(CHConnection.class, new CHConnectionImpl(url, info)); } - public CHConnection connect(String url, HttpConnectionProperties properties) throws SQLException { + public CHConnection connect(String url, CHProperties properties) throws SQLException { logger.info("Creating connection"); return LogProxy.wrap(CHConnection.class, new CHConnectionImpl(url, properties)); } diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHPreparedStatementImpl.java b/src/main/java/ru/yandex/metrika/clickhouse/CHPreparedStatementImpl.java index 6e6721a8b70ecf6b445104f263eb8f759abb8c27..1d9a46bf98e50fab2bca60b19126f456a0b32b1b 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHPreparedStatementImpl.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHPreparedStatementImpl.java @@ -1,7 +1,7 @@ package ru.yandex.metrika.clickhouse; import org.apache.http.impl.client.CloseableHttpClient; -import ru.yandex.metrika.clickhouse.copypaste.HttpConnectionProperties; +import ru.yandex.metrika.clickhouse.copypaste.CHProperties; import ru.yandex.metrika.clickhouse.util.Logger; import java.io.InputStream; @@ -29,7 +29,7 @@ public class CHPreparedStatementImpl extends CHStatementImpl implements CHPrepar List<String> binds; public CHPreparedStatementImpl(CloseableHttpClient client, CHDataSource source, - HttpConnectionProperties properties, String sql) throws SQLException { + CHProperties properties, String sql) throws SQLException { super(client, source, properties); this.sql = sql; this.sqlParts = parseSql(sql); diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHStatementImpl.java b/src/main/java/ru/yandex/metrika/clickhouse/CHStatementImpl.java index 6f36698402bcec0728bed2d0fd3c2abf6f114b23..94d8cbf4718ddafb44849f9481248a4a3e32414a 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHStatementImpl.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHStatementImpl.java @@ -9,8 +9,6 @@ import org.apache.http.entity.InputStreamEntity; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; - - import ru.yandex.metrika.clickhouse.copypaste.*; import ru.yandex.metrika.clickhouse.except.ClickhouseExceptionSpecifier; import ru.yandex.metrika.clickhouse.util.CopypasteUtils; @@ -21,13 +19,13 @@ import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URI; import java.net.URISyntaxException; -import java.sql.*; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLWarning; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; - -import static ru.yandex.metrika.clickhouse.copypaste.CHQueryParam.*; /** * Created by jkee on 14.03.15. */ @@ -37,7 +35,7 @@ public class CHStatementImpl implements CHStatement { private final CloseableHttpClient client; - private HttpConnectionProperties properties = new HttpConnectionProperties(); + private CHProperties properties = new CHProperties(); private CHDataSource source; @@ -52,7 +50,7 @@ public class CHStatementImpl implements CHStatement { private ObjectMapper objectMapper; public CHStatementImpl(CloseableHttpClient client, CHDataSource source, - HttpConnectionProperties properties) { + CHProperties properties) { this.client = client; this.source = source; this.properties = properties; @@ -389,7 +387,7 @@ public class CHStatementImpl implements CHStatement { log.debug("Executing SQL: " + sql); URI uri = null; try { - Map<CHQueryParam, String> params = getParams(ignoreDatabase); + Map<CHQueryParam, String> params = properties.buildParams(ignoreDatabase); if (additionalClickHouseDBParams != null && !additionalClickHouseDBParams.isEmpty()) { params.putAll(additionalClickHouseDBParams); } @@ -472,34 +470,7 @@ public class CHStatementImpl implements CHStatement { } } - public Map<CHQueryParam, String> getParams(boolean ignoreDatabase) { - Map<CHQueryParam, String> params = new HashMap<CHQueryParam, String>(); - //в clickhouse бывают таблички без базы (Ñ‚.е. в базе default) - if (!CopypasteUtils.isBlank(source.getDatabase()) && !ignoreDatabase) { - params.put(DATABASE, source.getDatabase()); - } - if (properties.isCompress()) { - params.put(COMPRESS, "1"); - } - // нам вÑегда нужны min и max в ответе - params.put(EXTREMES, "1"); - if (CopypasteUtils.isBlank(properties.getProfile())) { - if (properties.getMaxThreads() != null) - params.put(MAX_THREADS, String.valueOf(properties.getMaxThreads())); - // да, там в Ñекундах - params.put(MAX_EXECUTION_TIME, String.valueOf((properties.getSocketTimeout() + properties.getDataTransferTimeout()) / 1000)); - if (properties.getMaxBlockSize() != null) { - params.put(MAX_BLOCK_SIZE, String.valueOf(properties.getMaxBlockSize())); - } - } else { - params.put(PROFILE, properties.getProfile()); - } - //в ÐºÐ»Ð¸ÐºÑ…Ð°ÑƒÑ Ð¸Ð½Ð¾Ð³Ð´Ð° бывает user - if (properties.getUser() != null) { - params.put(USER, properties.getUser()); - } - return params; - } + public void closeOnCompletion() throws SQLException { closeOnCompletion = true; diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/HttpConnectionProperties.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHProperties.java similarity index 60% rename from src/main/java/ru/yandex/metrika/clickhouse/copypaste/HttpConnectionProperties.java rename to src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHProperties.java index c1491ebf55c6fbbacb9ee4862aa0a770e5b8874a..e756b136ed5b0bb9057e1a4d8052591c2735bd85 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/HttpConnectionProperties.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHProperties.java @@ -1,5 +1,9 @@ package ru.yandex.metrika.clickhouse.copypaste; +import ru.yandex.metrika.clickhouse.util.CopypasteUtils; + +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import static ru.yandex.metrika.clickhouse.copypaste.CHConnectionSettings.*; @@ -9,72 +13,95 @@ import static ru.yandex.metrika.clickhouse.copypaste.CHQueryParam.*; * Date: 10/17/13 * Time: 2:48 PM */ -public class HttpConnectionProperties { - - // ÐаÑтройки кликхауÑа +public class CHProperties { - /** - * profile=web&sign_rewrite=0 - * Ðа Ñтороне clickhouse Ñделаны Ð¾Ð³Ñ€Ð°Ð½Ð¸Ñ‡ÐµÐ½Ð¸Ñ Ð½Ð° запроÑÑ‹. - * https://svn.yandex.ru/websvn/wsvn/conv/trunk/metrica/src/dbms/src/Server/config.conf - */ - private String profile; - private boolean compress; - // asynchronous=0&max_threads=1 + // поÑтоÑнные наÑтройки ÑÐ¾ÐµÐ´Ð¸Ð½ÐµÐ½Ð¸Ñ private boolean async; - private Integer maxThreads; - private Integer maxBlockSize; - private int bufferSize; private int apacheBufferSize; - - //наÑтройки Ð´Ð»Ñ Ð´ÐµÐ¼Ð¾Ð½Ð¾Ð² private int socketTimeout; private int connectionTimeout; - - //METR-9568: параметр user Ð´Ð»Ñ Ð¾Ð¿Ñ€ÐµÐ´ÐµÐ»ÐµÐ½Ð¸Ñ Ð¿Ñ€Ð¾Ñ„Ð¸Ð»Ñ Ð½Ð°Ñтроек(?). - private String user; - - /* - * Ñто таймаут на передачу данных. - * ЧиÑло socketTimeout + dataTransferTimeout отправлÑетÑÑ Ð² clickhouse в параметре max_execution_time - * ПоÑле чего ÐºÐ»Ð¸ÐºÑ…Ð°ÑƒÑ Ñам оÑтанавливает Ð·Ð°Ð¿Ñ€Ð¾Ñ ÐµÑли Ð²Ñ€ÐµÐ¼Ñ ÐµÐ³Ð¾ Ð²Ñ‹Ð¿Ð¾Ð»Ð½ÐµÐ½Ð¸Ñ Ð¿Ñ€ÐµÐ²Ñ‹ÑˆÐ°ÐµÑ‚ max_execution_time - * */ private int dataTransferTimeout; private int keepAliveTimeout; - - /** - * Ð”Ð»Ñ ConnectionManager'а - */ private int timeToLiveMillis; private int defaultMaxPerRoute; private int maxTotal; - public HttpConnectionProperties() { + + // наÑтройки в запроÑÑ‹ + private Integer maxParallelReplicas; + private String totalsMode; + private String quotaKey; + private Integer priority; + private String database; + private boolean compress; + private boolean extremes; + private Integer maxThreads; + private Integer maxExecutionTime; + private Integer maxBlockSize; + private String profile; + private String user; + + + public CHProperties() { this(new Properties()); } - public HttpConnectionProperties(Properties info) { - this.profile = getSetting(info, PROFILE); - this.compress = getSetting(info, COMPRESS); + public CHProperties(Properties info) { this.async = getSetting(info, ASYNC); - this.maxThreads = getSetting(info, MAX_THREADS); - this.maxBlockSize = getSetting(info, MAX_BLOCK_SIZE); - this.bufferSize = getSetting(info, BUFFER_SIZE); this.apacheBufferSize = getSetting(info, APACHE_BUFFER_SIZE); - this.socketTimeout = getSetting(info, SOCKET_TIMEOUT); this.connectionTimeout = getSetting(info, CONNECTION_TIMEOUT); - - this.user = getSetting(info, USER); - this.dataTransferTimeout = getSetting(info, DATA_TRANSFER_TIMEOUT); this.keepAliveTimeout = getSetting(info, KEEP_ALIVE_TIMEOUT); - this.timeToLiveMillis = getSetting(info, TIME_TO_LIVE_MILLIS); this.defaultMaxPerRoute = getSetting(info, DEFAULT_MAX_PER_ROUTE); this.maxTotal = getSetting(info, MAX_TOTAL); + + this.maxParallelReplicas = getSetting(info, MAX_PARALLEL_REPLICAS); + this.totalsMode = getSetting(info, TOTALS_MODE); + this.quotaKey = getSetting(info, QUOTA_KEY); + this.priority = getSetting(info, PRIORITY); + this.database = getSetting(info, DATABASE); + this.compress = getSetting(info, COMPRESS); + this.extremes = getSetting(info, EXTREMES); + this.maxThreads = getSetting(info, MAX_THREADS); + this.maxExecutionTime = getSetting(info, MAX_EXECUTION_TIME); + this.maxBlockSize = getSetting(info, MAX_BLOCK_SIZE); + this.profile = getSetting(info, PROFILE); + this.user = getSetting(info, USER); + } + + public Map<CHQueryParam, String> buildParams(boolean ignoreDatabase){ + Map<CHQueryParam, String> params = new HashMap<CHQueryParam, String>(); + + if (maxParallelReplicas != null) params.put(MAX_PARALLEL_REPLICAS, String.valueOf(maxParallelReplicas)); + if (totalsMode != null) params.put(TOTALS_MODE, totalsMode); + if (quotaKey != null) params.put(QUOTA_KEY, quotaKey); + if (priority != null) params.put(PRIORITY, String.valueOf(priority)); + + if (!CopypasteUtils.isBlank(database) && !ignoreDatabase) params.put(DATABASE, getDatabase()); + + if (compress) params.put(COMPRESS, "1"); + + if (extremes) params.put(EXTREMES, "1"); + + if (CopypasteUtils.isBlank(profile)) { + if (getMaxThreads() != null) + params.put(MAX_THREADS, String.valueOf(maxThreads)); + // да, там в Ñекундах + params.put(MAX_EXECUTION_TIME, String.valueOf((maxExecutionTime != null? maxExecutionTime:(socketTimeout + dataTransferTimeout)) / 1000)); + if (getMaxBlockSize() != null) { + params.put(MAX_BLOCK_SIZE, String.valueOf(getMaxBlockSize())); + } + } else { + params.put(PROFILE, profile); + } + //в ÐºÐ»Ð¸ÐºÑ…Ð°ÑƒÑ Ð¸Ð½Ð¾Ð³Ð´Ð° бывает user + if (user != null) params.put(USER, user); + + return params; } @@ -224,4 +251,59 @@ public class HttpConnectionProperties { this.maxTotal = maxTotal; } + public Integer getMaxParallelReplicas() { + return maxParallelReplicas; + } + + public void setMaxParallelReplicas(Integer maxParallelReplicas) { + this.maxParallelReplicas = maxParallelReplicas; + } + + public String getTotalsMode() { + return totalsMode; + } + + public void setTotalsMode(String totalsMode) { + this.totalsMode = totalsMode; + } + + public String getQuotaKey() { + return quotaKey; + } + + public void setQuotaKey(String quotaKey) { + this.quotaKey = quotaKey; + } + + public Integer getPriority() { + return priority; + } + + public void setPriority(Integer priority) { + this.priority = priority; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public boolean isExtremes() { + return extremes; + } + + public void setExtremes(boolean extremes) { + this.extremes = extremes; + } + + public Integer getMaxExecutionTime() { + return maxExecutionTime; + } + + public void setMaxExecutionTime(Integer maxExecutionTime) { + this.maxExecutionTime = maxExecutionTime; + } } diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHQueryParam.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHQueryParam.java index 5240afb55f531cff6f72c1eba638fd66c1461fc5..a2956c125127d9eb92fda0187a466d3a29037f36 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHQueryParam.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHQueryParam.java @@ -5,7 +5,7 @@ package ru.yandex.metrika.clickhouse.copypaste; * @since 25.03.16 */ public enum CHQueryParam { - MAX_PARALLEL_REPLICAS("max_parallel_replicas", null, String.class), + MAX_PARALLEL_REPLICAS("max_parallel_replicas", null, Integer.class), /** * Каким образом вычиÑлÑÑ‚ÑŒ TOTALS при наличии HAVING, а также при наличии max_rows_to_group_by и group_by_overflow_mode = 'any' * https://clickhouse.yandex-team.ru/#%D0%9C%D0%BE%D0%B4%D0%B8%D1%84%D0%B8%D0%BA%D0%B0%D1%82%D0%BE%D1%80%20WITH%20TOTALS @@ -36,7 +36,7 @@ public enum CHQueryParam { * Ð”Ð»Ñ Ñтого, выÑтавите наÑтройку extremes в 1. Минимумы и макÑимумы ÑчитаютÑÑ Ð´Ð»Ñ Ñ‡Ð¸Ñловых типов, дат, дат-Ñ-временем. * Ð”Ð»Ñ Ð¾Ñтальных Ñтолбцов, будут выведены Ð·Ð½Ð°Ñ‡ÐµÐ½Ð¸Ñ Ð¿Ð¾ умолчанию. */ - EXTREMES("extremes", "0", String.class), + EXTREMES("extremes", false, String.class), /** * МакÑимальное количеÑтво потоков обработки запроÑа * https://clickhouse.yandex-team.ru/#max_threads diff --git a/src/main/java/ru/yandex/metrika/clickhouse/util/CHHttpClientBuilder.java b/src/main/java/ru/yandex/metrika/clickhouse/util/CHHttpClientBuilder.java index d5e96b0e426ac57ae2178c440d69d12dde32b192..0e7b0c9adb4d48b13f71d3c3c3fafaca645f77f3 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/util/CHHttpClientBuilder.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/util/CHHttpClientBuilder.java @@ -16,7 +16,7 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.message.BasicHeaderElementIterator; import org.apache.http.protocol.HTTP; import org.apache.http.protocol.HttpContext; -import ru.yandex.metrika.clickhouse.copypaste.HttpConnectionProperties; +import ru.yandex.metrika.clickhouse.copypaste.CHProperties; import ru.yandex.metrika.clickhouse.copypaste.IpVersionPriorityResolver; import java.net.HttpURLConnection; @@ -27,9 +27,9 @@ import java.util.concurrent.TimeUnit; */ public class CHHttpClientBuilder { - private final HttpConnectionProperties properties; + private final CHProperties properties; - public CHHttpClientBuilder(HttpConnectionProperties properties) { + public CHHttpClientBuilder(CHProperties properties) { this.properties = properties; }