diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHConnectionImpl.java b/src/main/java/ru/yandex/metrika/clickhouse/CHConnectionImpl.java index 76ceb4941e88f3c455c6150be7fe029695cc5605..d0d1875462f8eacf8ef61f45f5ccada16af93e4b 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHConnectionImpl.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHConnectionImpl.java @@ -1,7 +1,8 @@ package ru.yandex.metrika.clickhouse; import org.apache.http.impl.client.CloseableHttpClient; -import ru.yandex.metrika.clickhouse.copypaste.HttpConnectionProperties; +import ru.yandex.metrika.clickhouse.copypaste.CHProperties; +import ru.yandex.metrika.clickhouse.except.CHUnknownException; import ru.yandex.metrika.clickhouse.util.CHHttpClientBuilder; import ru.yandex.metrika.clickhouse.util.LogProxy; import ru.yandex.metrika.clickhouse.util.Logger; @@ -22,13 +23,22 @@ public class CHConnectionImpl implements CHConnection { private final CloseableHttpClient httpclient; - private final HttpConnectionProperties properties = new HttpConnectionProperties(); + private final CHProperties properties; private CHDataSource dataSource; private boolean closed = false; - public CHConnectionImpl(String url) { + public CHConnectionImpl(String url){ + this(url, new CHProperties()); + } + + public CHConnectionImpl(String url, Properties info){ + this(url, new CHProperties(info)); + } + + public CHConnectionImpl(String url, CHProperties properties) { + this.properties = properties; this.dataSource = new CHDataSource(url); CHHttpClientBuilder clientBuilder = new CHHttpClientBuilder(properties); log.debug("new connection"); @@ -108,7 +118,7 @@ public class CHConnectionImpl implements CHConnection { httpclient.close(); closed = true; } catch (IOException e) { - throw new CHException("HTTP client close exception", e); + throw new CHUnknownException("HTTP client close exception", e, dataSource.getHost(), dataSource.getPort()); } } diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHDataSource.java b/src/main/java/ru/yandex/metrika/clickhouse/CHDataSource.java index 007d2a16a12ba4f4ef3a6ec8f73d74b59b677ebf..925a6527d489039f15b42df5c25261dbef1388eb 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHDataSource.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHDataSource.java @@ -1,5 +1,7 @@ package ru.yandex.metrika.clickhouse; +import ru.yandex.metrika.clickhouse.copypaste.CHProperties; + import javax.sql.DataSource; import java.io.PrintWriter; import java.sql.Connection; @@ -33,7 +35,17 @@ public class CHDataSource implements DataSource { PrintWriter printWriter; protected int loginTimeout = 0; + private CHProperties properties; + public CHDataSource(String url) { + this(url, new CHProperties()); + } + + public CHDataSource(String url, Properties info) { + this(url, new CHProperties(info)); + } + + public CHDataSource(String url, CHProperties properties) { if (url == null) { throw new IllegalArgumentException("Incorrect clickhouse jdbc url: " + url); } @@ -51,16 +63,18 @@ public class CHDataSource implements DataSource { } else { throw new IllegalArgumentException("Incorrect clickhouse jdbc url: " + url); } + this.properties = properties; + this.properties.setDatabase(database); } @Override public Connection getConnection() throws SQLException { - return driver.connect(url, new Properties()); + return driver.connect(url, properties); } @Override public Connection getConnection(String username, String password) throws SQLException { - return driver.connect(url, new Properties()); + return driver.connect(url, properties); } public String getHost() { diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHDriver.java b/src/main/java/ru/yandex/metrika/clickhouse/CHDriver.java index 48170a8fca881a041f2d22fedd5c11e6a3da8c88..271321187664010ee2f0db3e4d222796457a03bb 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHDriver.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHDriver.java @@ -1,5 +1,6 @@ package ru.yandex.metrika.clickhouse; +import ru.yandex.metrika.clickhouse.copypaste.CHProperties; import ru.yandex.metrika.clickhouse.util.LogProxy; import ru.yandex.metrika.clickhouse.util.Logger; @@ -35,7 +36,12 @@ public class CHDriver implements Driver { @Override public CHConnection connect(String url, Properties info) throws SQLException { logger.info("Creating connection"); - return LogProxy.wrap(CHConnection.class, new CHConnectionImpl(url)); + return LogProxy.wrap(CHConnection.class, new CHConnectionImpl(url, info)); + } + + public CHConnection connect(String url, CHProperties properties) throws SQLException { + logger.info("Creating connection"); + return LogProxy.wrap(CHConnection.class, new CHConnectionImpl(url, properties)); } @Override diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHException.java b/src/main/java/ru/yandex/metrika/clickhouse/CHException.java deleted file mode 100644 index de3e900bc1da614c83a1c63ddc492ac6294c7b59..0000000000000000000000000000000000000000 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHException.java +++ /dev/null @@ -1,34 +0,0 @@ -package ru.yandex.metrika.clickhouse; - -import java.sql.SQLException; - -/** - * Created by jkee on 16.03.15. - */ -public class CHException extends SQLException { - - - public CHException(String reason) { - super(reason); - } - - public CHException(String reason, int vendorCode) { - super(reason, null, vendorCode); - } - - public CHException(String reason, Throwable cause) { - super(reason, cause); - } - - public CHException(Throwable cause, String host, int port) { - super("ClickHouse exception, host: " + host + ", port: " + port, cause); - } - - public CHException(int code, Throwable cause, String host, int port) { - super("ClickHouse exception, code: " + code + ", host: " + host + ", port: " + port, cause); - } - - public CHException(String message, Throwable cause, String host, int port) { - super("ClickHouse exception, message: " + message + ", host: " + host + ", port: " + port, cause); - } -} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHPreparedStatementImpl.java b/src/main/java/ru/yandex/metrika/clickhouse/CHPreparedStatementImpl.java index 6e6721a8b70ecf6b445104f263eb8f759abb8c27..1d9a46bf98e50fab2bca60b19126f456a0b32b1b 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHPreparedStatementImpl.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHPreparedStatementImpl.java @@ -1,7 +1,7 @@ package ru.yandex.metrika.clickhouse; import org.apache.http.impl.client.CloseableHttpClient; -import ru.yandex.metrika.clickhouse.copypaste.HttpConnectionProperties; +import ru.yandex.metrika.clickhouse.copypaste.CHProperties; import ru.yandex.metrika.clickhouse.util.Logger; import java.io.InputStream; @@ -29,7 +29,7 @@ public class CHPreparedStatementImpl extends CHStatementImpl implements CHPrepar List<String> binds; public CHPreparedStatementImpl(CloseableHttpClient client, CHDataSource source, - HttpConnectionProperties properties, String sql) throws SQLException { + CHProperties properties, String sql) throws SQLException { super(client, source, properties); this.sql = sql; this.sqlParts = parseSql(sql); diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java b/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java index bd41cf461d0f3c628e3a0caffa1095274d70cd60..42294bfa81bd82eecc8f1a5362de9ffcf8d77861 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java @@ -1,7 +1,9 @@ package ru.yandex.metrika.clickhouse; -import ru.yandex.metrika.clickhouse.copypaste.ClickhouseResponse; +import ru.yandex.metrika.clickhouse.copypaste.CHQueryParam; +import ru.yandex.metrika.clickhouse.copypaste.CHResponse; +import java.io.InputStream; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -12,8 +14,9 @@ import java.util.Map; * @since 22.03.16 */ public interface CHStatement extends Statement { - ClickhouseResponse executeQueryClickhouseResponse(String sql) throws SQLException; - ClickhouseResponse executeQueryClickhouseResponse(String sql, Map<String, String> additionalDBParams) throws SQLException; - ClickhouseResponse executeQueryClickhouseResponse(String sql, Map<String, String> additionalDBParams, boolean ignoreDatabase) throws SQLException; - ResultSet executeQuery(String sql, Map<String, String> additionalDBParams) throws SQLException; + CHResponse executeQueryClickhouseResponse(String sql) throws SQLException; + CHResponse executeQueryClickhouseResponse(String sql, Map<CHQueryParam, String> additionalDBParams) throws SQLException; + CHResponse executeQueryClickhouseResponse(String sql, Map<CHQueryParam, String> additionalDBParams, boolean ignoreDatabase) throws SQLException; + ResultSet executeQuery(String sql, Map<CHQueryParam, String> additionalDBParams) throws SQLException; + void sendStream(InputStream content, String table) throws SQLException; } diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHStatementImpl.java b/src/main/java/ru/yandex/metrika/clickhouse/CHStatementImpl.java index 707e46d2b3f94025b7927be4db74b53ed0eb5202..7ea82d47dbad712be1644f756d1680b605967423 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHStatementImpl.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHStatementImpl.java @@ -5,13 +5,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.InputStreamEntity; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; - - import ru.yandex.metrika.clickhouse.copypaste.*; -import ru.yandex.metrika.clickhouse.except.ClickhouseExceptionSpecifier; +import ru.yandex.metrika.clickhouse.except.CHException; +import ru.yandex.metrika.clickhouse.except.CHExceptionSpecifier; import ru.yandex.metrika.clickhouse.util.CopypasteUtils; import ru.yandex.metrika.clickhouse.util.Logger; @@ -20,12 +20,13 @@ import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URI; import java.net.URISyntaxException; -import java.sql.*; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLWarning; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; - /** * Created by jkee on 14.03.15. */ @@ -35,7 +36,7 @@ public class CHStatementImpl implements CHStatement { private final CloseableHttpClient client; - private HttpConnectionProperties properties = new HttpConnectionProperties(); + private CHProperties properties = new CHProperties(); private CHDataSource source; @@ -50,7 +51,7 @@ public class CHStatementImpl implements CHStatement { private ObjectMapper objectMapper; public CHStatementImpl(CloseableHttpClient client, CHDataSource source, - HttpConnectionProperties properties) { + CHProperties properties) { this.client = client; this.source = source; this.properties = properties; @@ -64,40 +65,41 @@ public class CHStatementImpl implements CHStatement { return executeQuery(sql, null); } - public ResultSet executeQuery(String sql, Map<String, String> additionalDBParams) throws SQLException { + public ResultSet executeQuery(String sql, Map<CHQueryParam, String> additionalDBParams) throws SQLException { InputStream is = getInputStream(sql, additionalDBParams, false); try { currentResult = new CHResultSet(properties.isCompress() - ? new ClickhouseLZ4Stream(is) : is, properties.getBufferSize(), + ? new CHLZ4Stream(is) : is, properties.getBufferSize(), extractDBName(sql), extractTableName(sql) ); currentResult.setMaxRows(maxRows); return currentResult; - } catch (Exception e) { - throw new RuntimeException(e); + } catch (Exception e){ + CopypasteUtils.close(is); + throw CHExceptionSpecifier.specify(e, source.getHost(), source.getPort()); } } - public ClickhouseResponse executeQueryClickhouseResponse(String sql) throws SQLException { + public CHResponse executeQueryClickhouseResponse(String sql) throws SQLException { return executeQueryClickhouseResponse(sql, null); } - public ClickhouseResponse executeQueryClickhouseResponse(String sql, Map<String, String> additionalDBParams) throws SQLException { + public CHResponse executeQueryClickhouseResponse(String sql, Map<CHQueryParam, String> additionalDBParams) throws SQLException { return executeQueryClickhouseResponse(sql, additionalDBParams, false); } - public ClickhouseResponse executeQueryClickhouseResponse(String sql, Map<String, String> additionalDBParams, boolean ignoreDatabase) throws SQLException { + public CHResponse executeQueryClickhouseResponse(String sql, Map<CHQueryParam, String> additionalDBParams, boolean ignoreDatabase) throws SQLException { InputStream is = getInputStream(clickhousifySql(sql, "JSONCompact"), additionalDBParams, ignoreDatabase); try { byte[] bytes = null; try { if (properties.isCompress()){ - bytes = CopypasteUtils.toByteArray(new ClickhouseLZ4Stream(is)); + bytes = CopypasteUtils.toByteArray(new CHLZ4Stream(is)); } else { bytes = CopypasteUtils.toByteArray(is); } - return objectMapper.readValue(bytes, ClickhouseResponse.class); + return objectMapper.readValue(bytes, CHResponse.class); } catch (IOException e) { if (bytes != null) log.warn("Wrong json: "+new String(bytes)); throw e; @@ -105,7 +107,7 @@ public class CHStatementImpl implements CHStatement { } catch (IOException e) { throw new RuntimeException(e); } finally { - if (is != null) try {is.close();} catch (IOException ignored) { } + CopypasteUtils.close(is); } } @@ -117,19 +119,22 @@ public class CHStatementImpl implements CHStatement { //noinspection StatementWithEmptyBody while (rs.next()) {} } finally { - try { if (rs != null) rs.close(); } catch (Exception ignored) {} + CopypasteUtils.close(rs); } return 1; } @Override public boolean execute(String sql) throws SQLException { - executeQuery(sql); + ResultSet rs = null; + try { + rs = executeQuery(sql); + } finally { + CopypasteUtils.close(rs); + } return true; } - - @Override public void close() throws SQLException { if (currentResult != null) { @@ -380,20 +385,20 @@ public class CHStatementImpl implements CHStatement { } private InputStream getInputStream(String sql, - Map<String, String> additionalClickHouseDBParams, + Map<CHQueryParam, String> additionalClickHouseDBParams, boolean ignoreDatabase ) throws CHException { sql = clickhousifySql(sql); log.debug("Executing SQL: " + sql); URI uri = null; try { - Map<String, String> params = getParams(ignoreDatabase); + Map<CHQueryParam, String> params = properties.buildParams(ignoreDatabase); if (additionalClickHouseDBParams != null && !additionalClickHouseDBParams.isEmpty()) { params.putAll(additionalClickHouseDBParams); } List<String> paramPairs = new ArrayList<String>(); - for (Map.Entry<String, String> entry : params.entrySet()) { - paramPairs.add(entry.getKey() + '=' + entry.getValue()); + for (Map.Entry<CHQueryParam, String> entry : params.entrySet()) { + paramPairs.add(entry.getKey().toString() + '=' + entry.getValue()); } String query = CopypasteUtils.join(paramPairs, '&'); uri = new URI("http", null, source.getHost(), source.getPort(), @@ -415,14 +420,14 @@ public class CHStatementImpl implements CHStatement { try { InputStream messageStream = entity.getContent(); if (properties.isCompress()) { - messageStream = new ClickhouseLZ4Stream(messageStream); + messageStream = new CHLZ4Stream(messageStream); } chMessage = CopypasteUtils.toString(messageStream); } catch (IOException e) { - chMessage = "error while read response "+ e.getMessage(); + chMessage = "error while read response " + e.getMessage(); } EntityUtils.consumeQuietly(entity); - throw ClickhouseExceptionSpecifier.specify(chMessage, source.getHost(), source.getPort()); + throw CHExceptionSpecifier.specify(chMessage, source.getHost(), source.getPort()); } if (entity.isStreaming()) { is = entity.getContent(); @@ -432,44 +437,48 @@ public class CHStatementImpl implements CHStatement { is = baos.convertToInputStream(); } return is; - } catch (IOException e) { + } catch (CHException e){ + throw e; + } catch (Exception e) { log.info("Error during connection to " + source + ", reporting failure to data source, message: " + e.getMessage()); EntityUtils.consumeQuietly(entity); - try { if (is != null) is.close(); } catch (IOException ignored) { } + CopypasteUtils.close(is); log.info("Error sql: " + sql); - throw new CHException("Unknown IO exception", e); + throw CHExceptionSpecifier.specify(e, source.getHost(), source.getPort()); } } - public Map<String, String> getParams(boolean ignoreDatabase) { - Map<String, String> params = new HashMap<String, String>(); - //в clickhouse бывают таблички без базы (Ñ‚.е. в базе default) - if (!CopypasteUtils.isBlank(source.getDatabase()) && !ignoreDatabase) { - params.put("database", source.getDatabase()); - } - if (properties.isCompress()) { - params.put("compress", "1"); - } - // нам вÑегда нужны min и max в ответе - params.put("extremes", "1"); - if (CopypasteUtils.isBlank(properties.getProfile())) { - if (properties.getMaxThreads() != null) - params.put("max_threads", String.valueOf(properties.getMaxThreads())); - // да, там в Ñекундах - params.put("max_execution_time", String.valueOf((properties.getSocketTimeout() + properties.getDataTransferTimeout()) / 1000)); - if (properties.getMaxBlockSize() != null) { - params.put("max_block_size", String.valueOf(properties.getMaxBlockSize())); + public void sendStream(InputStream content, String table) throws CHException { + // echo -ne '10\n11\n12\n' | POST 'http://localhost:8123/?query=INSERT INTO t FORMAT TabSeparated' + HttpEntity entity = null; + try { + URI uri = new URI("http", null, source.getHost(), source.getPort(), + "/", (CopypasteUtils.isEmpty(source.getDatabase()) ? "" : "database=" + source.getDatabase() + '&') + + "query=INSERT INTO " + table + " FORMAT TabSeparated", null); + HttpPost httpPost = new HttpPost(uri); + httpPost.setEntity(new InputStreamEntity(content, -1)); + HttpResponse response = client.execute(httpPost); + entity = response.getEntity(); + if (response.getStatusLine().getStatusCode() != HttpURLConnection.HTTP_OK) { + String chMessage; + try { + chMessage = EntityUtils.toString(response.getEntity()); + } catch (IOException e) { + chMessage = "error while read response "+ e.getMessage(); + } + throw CHExceptionSpecifier.specify(chMessage, source.getHost(), source.getPort()); } - } else { - params.put("profile", properties.getProfile()); - } - //в ÐºÐ»Ð¸ÐºÑ…Ð°ÑƒÑ Ð¸Ð½Ð¾Ð³Ð´Ð° бывает user - if (properties.getUser() != null) { - params.put("user", properties.getUser()); + } catch (CHException e) { + throw e; + } catch (Exception e) { + throw CHExceptionSpecifier.specify(e, source.getHost(), source.getPort()); + } finally { + EntityUtils.consumeQuietly(entity); } - return params; } + + public void closeOnCompletion() throws SQLException { closeOnCompletion = true; } diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHConnectionSettings.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHConnectionSettings.java new file mode 100644 index 0000000000000000000000000000000000000000..33c5e0e4d7ea83fc193c1cf2a4f07e1893cbd377 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHConnectionSettings.java @@ -0,0 +1,53 @@ +package ru.yandex.metrika.clickhouse.copypaste; + +/** + * @author serebrserg + * @since 24.03.16 + */ +public enum CHConnectionSettings { + + ASYNC("async", false), + BUFFER_SIZE("buffer_size", 65536), + APACHE_BUFFER_SIZE("apache_buffer_size", 65536), + SOCKET_TIMEOUT("socket_timeout", 30000), + CONNECTION_TIMEOUT("connection_timeout", 50), + + /* + * Ñто таймаут на передачу данных. + * ЧиÑло socketTimeout + dataTransferTimeout отправлÑетÑÑ Ð² clickhouse в параметре max_execution_time + * ПоÑле чего ÐºÐ»Ð¸ÐºÑ…Ð°ÑƒÑ Ñам оÑтанавливает Ð·Ð°Ð¿Ñ€Ð¾Ñ ÐµÑли Ð²Ñ€ÐµÐ¼Ñ ÐµÐ³Ð¾ Ð²Ñ‹Ð¿Ð¾Ð»Ð½ÐµÐ½Ð¸Ñ Ð¿Ñ€ÐµÐ²Ñ‹ÑˆÐ°ÐµÑ‚ max_execution_time + * */ + DATA_TRANSFER_TIMEOUT( "dataTransferTimeout", 10000), + + + KEEP_ALIVE_TIMEOUT("keepAliveTimeout", 30 * 100), + + /** + * Ð”Ð»Ñ ConnectionManager'а + */ + TIME_TO_LIVE_MILLIS("timeToLiveMillis", 60*1000), + DEFAULT_MAX_PER_ROUTE("defaultMaxPerRoute", 500), + MAX_TOTAL("maxTotal", 10000); + + private final String key; + private final Object defaultValue; + private final Class clazz; + + CHConnectionSettings(String key, Object defaultValue) { + this.key = key; + this.defaultValue = defaultValue; + this.clazz = defaultValue.getClass(); + } + + public String getKey() { + return key; + } + + public Object getDefaultValue() { + return defaultValue; + } + + public Class getClazz() { + return clazz; + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/ClickhouseLZ4Stream.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHLZ4Stream.java similarity index 96% rename from src/main/java/ru/yandex/metrika/clickhouse/copypaste/ClickhouseLZ4Stream.java rename to src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHLZ4Stream.java index ee2cae2d810331679ca542bd869bac603b04d660..f52af35bb6a564c2a898985b3b03bfaefcad7d61 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/ClickhouseLZ4Stream.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHLZ4Stream.java @@ -12,7 +12,7 @@ import java.io.InputStream; * @author jkee */ -public class ClickhouseLZ4Stream extends InputStream { +public class CHLZ4Stream extends InputStream { private static final LZ4Factory factory = LZ4Factory.safeInstance(); @@ -24,7 +24,7 @@ public class ClickhouseLZ4Stream extends InputStream { private byte[] currentBlock; private int pointer; - public ClickhouseLZ4Stream(InputStream stream) { + public CHLZ4Stream(InputStream stream) { this.stream = stream; dataWrapper = new LittleEndianDataInputStream(stream); } diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHProperties.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHProperties.java new file mode 100644 index 0000000000000000000000000000000000000000..e756b136ed5b0bb9057e1a4d8052591c2735bd85 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHProperties.java @@ -0,0 +1,309 @@ +package ru.yandex.metrika.clickhouse.copypaste; + +import ru.yandex.metrika.clickhouse.util.CopypasteUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static ru.yandex.metrika.clickhouse.copypaste.CHConnectionSettings.*; +import static ru.yandex.metrika.clickhouse.copypaste.CHQueryParam.*; +/** + * User: hamilkar + * Date: 10/17/13 + * Time: 2:48 PM + */ +public class CHProperties { + + // поÑтоÑнные наÑтройки ÑÐ¾ÐµÐ´Ð¸Ð½ÐµÐ½Ð¸Ñ + private boolean async; + private int bufferSize; + private int apacheBufferSize; + private int socketTimeout; + private int connectionTimeout; + private int dataTransferTimeout; + private int keepAliveTimeout; + private int timeToLiveMillis; + private int defaultMaxPerRoute; + private int maxTotal; + + + // наÑтройки в запроÑÑ‹ + private Integer maxParallelReplicas; + private String totalsMode; + private String quotaKey; + private Integer priority; + private String database; + private boolean compress; + private boolean extremes; + private Integer maxThreads; + private Integer maxExecutionTime; + private Integer maxBlockSize; + private String profile; + private String user; + + + public CHProperties() { + this(new Properties()); + } + + public CHProperties(Properties info) { + this.async = getSetting(info, ASYNC); + this.bufferSize = getSetting(info, BUFFER_SIZE); + this.apacheBufferSize = getSetting(info, APACHE_BUFFER_SIZE); + this.socketTimeout = getSetting(info, SOCKET_TIMEOUT); + this.connectionTimeout = getSetting(info, CONNECTION_TIMEOUT); + this.dataTransferTimeout = getSetting(info, DATA_TRANSFER_TIMEOUT); + this.keepAliveTimeout = getSetting(info, KEEP_ALIVE_TIMEOUT); + this.timeToLiveMillis = getSetting(info, TIME_TO_LIVE_MILLIS); + this.defaultMaxPerRoute = getSetting(info, DEFAULT_MAX_PER_ROUTE); + this.maxTotal = getSetting(info, MAX_TOTAL); + + this.maxParallelReplicas = getSetting(info, MAX_PARALLEL_REPLICAS); + this.totalsMode = getSetting(info, TOTALS_MODE); + this.quotaKey = getSetting(info, QUOTA_KEY); + this.priority = getSetting(info, PRIORITY); + this.database = getSetting(info, DATABASE); + this.compress = getSetting(info, COMPRESS); + this.extremes = getSetting(info, EXTREMES); + this.maxThreads = getSetting(info, MAX_THREADS); + this.maxExecutionTime = getSetting(info, MAX_EXECUTION_TIME); + this.maxBlockSize = getSetting(info, MAX_BLOCK_SIZE); + this.profile = getSetting(info, PROFILE); + this.user = getSetting(info, USER); + } + + public Map<CHQueryParam, String> buildParams(boolean ignoreDatabase){ + Map<CHQueryParam, String> params = new HashMap<CHQueryParam, String>(); + + if (maxParallelReplicas != null) params.put(MAX_PARALLEL_REPLICAS, String.valueOf(maxParallelReplicas)); + if (totalsMode != null) params.put(TOTALS_MODE, totalsMode); + if (quotaKey != null) params.put(QUOTA_KEY, quotaKey); + if (priority != null) params.put(PRIORITY, String.valueOf(priority)); + + if (!CopypasteUtils.isBlank(database) && !ignoreDatabase) params.put(DATABASE, getDatabase()); + + if (compress) params.put(COMPRESS, "1"); + + if (extremes) params.put(EXTREMES, "1"); + + if (CopypasteUtils.isBlank(profile)) { + if (getMaxThreads() != null) + params.put(MAX_THREADS, String.valueOf(maxThreads)); + // да, там в Ñекундах + params.put(MAX_EXECUTION_TIME, String.valueOf((maxExecutionTime != null? maxExecutionTime:(socketTimeout + dataTransferTimeout)) / 1000)); + if (getMaxBlockSize() != null) { + params.put(MAX_BLOCK_SIZE, String.valueOf(getMaxBlockSize())); + } + } else { + params.put(PROFILE, profile); + } + //в ÐºÐ»Ð¸ÐºÑ…Ð°ÑƒÑ Ð¸Ð½Ð¾Ð³Ð´Ð° бывает user + if (user != null) params.put(USER, user); + + return params; + } + + + private <T> T getSetting(Properties info, CHQueryParam param){ + return getSetting(info, param.getKey(), param.getDefaultValue(), param.getClazz()); + } + + private <T> T getSetting(Properties info, CHConnectionSettings settings){ + return getSetting(info, settings.getKey(), settings.getDefaultValue(), settings.getClazz()); + } + + @SuppressWarnings("unchecked") + private <T> T getSetting(Properties info, String key, Object defaultValue, Class clazz){ + Object val = info.get(key); + if (val == null) + return (T)defaultValue; + if ((clazz == int.class || clazz == Integer.class) && val instanceof String) { + return (T) clazz.cast(Integer.valueOf((String) val)); + } + if ((clazz == long.class || clazz == Long.class) && val instanceof String) { + return (T) clazz.cast(Long.valueOf((String) val)); + } + if ((clazz == boolean.class || clazz == Boolean.class) && val instanceof String) { + return (T) clazz.cast(Boolean.valueOf((String) val)); + } + return (T) clazz.cast(val); + } + + + public String getProfile() { + return profile; + } + + public void setProfile(String profile) { + this.profile = profile; + } + + public boolean isCompress() { + return compress; + } + + public void setCompress(boolean compress) { + this.compress = compress; + } + + public boolean isAsync() { + return async; + } + + public void setAsync(boolean async) { + this.async = async; + } + + public Integer getMaxThreads() { + return maxThreads; + } + + public void setMaxThreads(Integer maxThreads) { + this.maxThreads = maxThreads; + } + + public Integer getMaxBlockSize() { + return maxBlockSize; + } + + public void setMaxBlockSize(Integer maxBlockSize) { + this.maxBlockSize = maxBlockSize; + } + + public int getBufferSize() { + return bufferSize; + } + + public void setBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + } + + public int getApacheBufferSize() { + return apacheBufferSize; + } + + public void setApacheBufferSize(int apacheBufferSize) { + this.apacheBufferSize = apacheBufferSize; + } + + public int getSocketTimeout() { + return socketTimeout; + } + + public void setSocketTimeout(int socketTimeout) { + this.socketTimeout = socketTimeout; + } + + public int getConnectionTimeout() { + return connectionTimeout; + } + + public void setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + public int getDataTransferTimeout() { + return dataTransferTimeout; + } + + public void setDataTransferTimeout(int dataTransferTimeout) { + this.dataTransferTimeout = dataTransferTimeout; + } + + public int getKeepAliveTimeout() { + return keepAliveTimeout; + } + + public void setKeepAliveTimeout(int keepAliveTimeout) { + this.keepAliveTimeout = keepAliveTimeout; + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public int getTimeToLiveMillis() { + return timeToLiveMillis; + } + + public void setTimeToLiveMillis(int timeToLiveMillis) { + this.timeToLiveMillis = timeToLiveMillis; + } + + public int getDefaultMaxPerRoute() { + return defaultMaxPerRoute; + } + + public void setDefaultMaxPerRoute(int defaultMaxPerRoute) { + this.defaultMaxPerRoute = defaultMaxPerRoute; + } + + public int getMaxTotal() { + return maxTotal; + } + + public void setMaxTotal(int maxTotal) { + this.maxTotal = maxTotal; + } + + public Integer getMaxParallelReplicas() { + return maxParallelReplicas; + } + + public void setMaxParallelReplicas(Integer maxParallelReplicas) { + this.maxParallelReplicas = maxParallelReplicas; + } + + public String getTotalsMode() { + return totalsMode; + } + + public void setTotalsMode(String totalsMode) { + this.totalsMode = totalsMode; + } + + public String getQuotaKey() { + return quotaKey; + } + + public void setQuotaKey(String quotaKey) { + this.quotaKey = quotaKey; + } + + public Integer getPriority() { + return priority; + } + + public void setPriority(Integer priority) { + this.priority = priority; + } + + public String getDatabase() { + return database; + } + + public void setDatabase(String database) { + this.database = database; + } + + public boolean isExtremes() { + return extremes; + } + + public void setExtremes(boolean extremes) { + this.extremes = extremes; + } + + public Integer getMaxExecutionTime() { + return maxExecutionTime; + } + + public void setMaxExecutionTime(Integer maxExecutionTime) { + this.maxExecutionTime = maxExecutionTime; + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHQueryParam.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHQueryParam.java new file mode 100644 index 0000000000000000000000000000000000000000..a2956c125127d9eb92fda0187a466d3a29037f36 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHQueryParam.java @@ -0,0 +1,91 @@ +package ru.yandex.metrika.clickhouse.copypaste; + +/** + * @author serebrserg + * @since 25.03.16 + */ +public enum CHQueryParam { + MAX_PARALLEL_REPLICAS("max_parallel_replicas", null, Integer.class), + /** + * Каким образом вычиÑлÑÑ‚ÑŒ TOTALS при наличии HAVING, а также при наличии max_rows_to_group_by и group_by_overflow_mode = 'any' + * https://clickhouse.yandex-team.ru/#%D0%9C%D0%BE%D0%B4%D0%B8%D1%84%D0%B8%D0%BA%D0%B0%D1%82%D0%BE%D1%80%20WITH%20TOTALS + */ + TOTALS_MODE("totals_mode", null, String.class), + /** + * keyed - значит в параметре запроÑа передаётÑÑ "ключ" quota_key, + и квота ÑчитаетÑÑ Ð¿Ð¾ отдельноÑти Ð´Ð»Ñ ÐºÐ°Ð¶Ð´Ð¾Ð³Ð¾ Ð·Ð½Ð°Ñ‡ÐµÐ½Ð¸Ñ ÐºÐ»ÑŽÑ‡Ð°. + Ðапример, в качеÑтве ключа может передаватьÑÑ Ð»Ð¾Ð³Ð¸Ð½ Ð¿Ð¾Ð»ÑŒÐ·Ð¾Ð²Ð°Ñ‚ÐµÐ»Ñ Ð² Метрике, + и тогда квота будет ÑчитатьÑÑ Ð´Ð»Ñ ÐºÐ°Ð¶Ð´Ð¾Ð³Ð¾ логина по отдельноÑти. + Имеет ÑмыÑл иÑпользовать только еÑли quota_key передаётÑÑ Ð½Ðµ пользователем, а программой. + */ + QUOTA_KEY("quota_key", null, String.class), + /** + * Меньше значение - больше приоритет + */ + PRIORITY("priority", null, Integer.class), + /** + * БД по умолчанию. + */ + DATABASE("database", "default", String.class), + /** + * Ñервер будет Ñжимать отправлÑемые вам данные + */ + COMPRESS("compress", true, Boolean.class), + /** + * Ð’Ñ‹ можете получить в дополнение к результату также минимальные и макÑимальные Ð·Ð½Ð°Ñ‡ÐµÐ½Ð¸Ñ Ð¿Ð¾ Ñтолбцам результата. + * Ð”Ð»Ñ Ñтого, выÑтавите наÑтройку extremes в 1. Минимумы и макÑимумы ÑчитаютÑÑ Ð´Ð»Ñ Ñ‡Ð¸Ñловых типов, дат, дат-Ñ-временем. + * Ð”Ð»Ñ Ð¾Ñтальных Ñтолбцов, будут выведены Ð·Ð½Ð°Ñ‡ÐµÐ½Ð¸Ñ Ð¿Ð¾ умолчанию. + */ + EXTREMES("extremes", false, String.class), + /** + * МакÑимальное количеÑтво потоков обработки запроÑа + * https://clickhouse.yandex-team.ru/#max_threads + */ + MAX_THREADS("max_threads", null, Integer.class), + /** + * МакÑимальное Ð²Ñ€ÐµÐ¼Ñ Ð²Ñ‹Ð¿Ð¾Ð»Ð½ÐµÐ½Ð¸Ñ Ð·Ð°Ð¿Ñ€Ð¾Ñа в Ñекундах. + * https://clickhouse.yandex-team.ru/#max_execution_time + */ + MAX_EXECUTION_TIME("max_execution_time", null, Integer.class), + /** + * Ñто рекомендациÑ, какого размера блоки (в количеÑтве Ñтрок) загружать из таблицы. + * https://clickhouse.yandex-team.ru/#max_block_size + */ + MAX_BLOCK_SIZE("max_block_size", null, Integer.class), + /** + * Профили наÑтроек - Ñто множеÑтво наÑтроек, Ñгруппированных под одним именем. + * Ð”Ð»Ñ ÐºÐ°Ð¶Ð´Ð¾Ð³Ð¾ Ð¿Ð¾Ð»ÑŒÐ·Ð¾Ð²Ð°Ñ‚ÐµÐ»Ñ ClickHouse указываетÑÑ Ð½ÐµÐºÐ¾Ñ‚Ð¾Ñ€Ñ‹Ð¹ профиль. + */ + PROFILE("profile", null, String.class), + /** + * Ð¸Ð¼Ñ Ð¿Ð¾Ð»ÑŒÐ·Ð¾Ð²Ð°Ñ‚ÐµÐ»Ñ, по умолчанию - default. + */ + USER("user", null, String.class); + + private final String key; + private final Object defaultValue; + private final Class clazz; + + CHQueryParam(String key, Object defaultValue, Class clazz) { + this.key = key; + this.defaultValue = defaultValue; + this.clazz = clazz; + } + + public String getKey() { + return key; + } + + public Object getDefaultValue() { + return defaultValue; + } + + public Class getClazz() { + return clazz; + } + + @Override + public String toString() { + return name().toLowerCase(); + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/ClickhouseResponse.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResponse.java similarity index 98% rename from src/main/java/ru/yandex/metrika/clickhouse/copypaste/ClickhouseResponse.java rename to src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResponse.java index 410b76f844060ea32052fee457663e65782a7c0c..a3340d8ebf6eaaf716d53db414915e4bddfa4428 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/ClickhouseResponse.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResponse.java @@ -8,7 +8,7 @@ import java.util.List; * ÐœÑп-объект Ð´Ð»Ñ Ð´Ð¶ÐµÐºÑона Ð´Ð»Ñ Ð¾Ñ‚Ð²ÐµÑ‚Ð° кликхауÑа * @author jkee */ -public class ClickhouseResponse { +public class CHResponse { private List<Meta> meta; @JsonDeserialize(contentUsing = ArrayToStringDeserializer.class) private List<List<String>> data; diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResultSet.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResultSet.java index 6aeff1918877f5120a4b0370bfec9e4ca87f43ed..ea899a59bf863db804f729e65542eed117f8210f 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResultSet.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResultSet.java @@ -127,7 +127,7 @@ public class CHResultSet extends AbstractResultSet { return columns; } - Map<String, Integer> getCol() { + public Map<String, Integer> getCol() { return col; } @@ -440,4 +440,8 @@ public class CHResultSet extends AbstractResultSet { public <T> T getObject(String columnLabel, Class<T> type) throws SQLException { return getObject(asColNum(columnLabel), type); } + + public ByteFragment[] getValues() { + return values; + } } diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/HttpConnectionProperties.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/HttpConnectionProperties.java deleted file mode 100644 index 7f73092dbe90135f961356a67a25257b6b3244e9..0000000000000000000000000000000000000000 --- a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/HttpConnectionProperties.java +++ /dev/null @@ -1,137 +0,0 @@ -package ru.yandex.metrika.clickhouse.copypaste; - -/** - * User: hamilkar - * Date: 10/17/13 - * Time: 2:48 PM - */ -public class HttpConnectionProperties { - - // ÐаÑтройки кликхауÑа - - /** - * profile=web&sign_rewrite=0 - * Ðа Ñтороне clickhouse Ñделаны Ð¾Ð³Ñ€Ð°Ð½Ð¸Ñ‡ÐµÐ½Ð¸Ñ Ð½Ð° запроÑÑ‹. - * https://svn.yandex.ru/websvn/wsvn/conv/trunk/metrica/src/dbms/src/Server/config.conf - */ - private String profile; - private boolean compress = true; - // asynchronous=0&max_threads=1 - private boolean async; - private Integer maxThreads; - private Integer maxBlockSize; - - private int bufferSize = 65536; - private int apacheBufferSize = 65536; - - //наÑтройки Ð´Ð»Ñ Ð´ÐµÐ¼Ð¾Ð½Ð¾Ð² - private int socketTimeout = 30000; - private int connectionTimeout = 50; - - //METR-9568: параметр user Ð´Ð»Ñ Ð¾Ð¿Ñ€ÐµÐ´ÐµÐ»ÐµÐ½Ð¸Ñ Ð¿Ñ€Ð¾Ñ„Ð¸Ð»Ñ Ð½Ð°Ñтроек(?). - private String user = null; - - /* - * Ñто таймаут на передачу данных. - * ЧиÑло socketTimeout + dataTransferTimeout отправлÑетÑÑ Ð² clickhouse в параметре max_execution_time - * ПоÑле чего ÐºÐ»Ð¸ÐºÑ…Ð°ÑƒÑ Ñам оÑтанавливает Ð·Ð°Ð¿Ñ€Ð¾Ñ ÐµÑли Ð²Ñ€ÐµÐ¼Ñ ÐµÐ³Ð¾ Ð²Ñ‹Ð¿Ð¾Ð»Ð½ÐµÐ½Ð¸Ñ Ð¿Ñ€ÐµÐ²Ñ‹ÑˆÐ°ÐµÑ‚ max_execution_time - * */ - private int dataTransferTimeout = 10000; - private int keepAliveTimeout = 30 * 1000; - - public String getProfile() { - return profile; - } - - public void setProfile(String profile) { - this.profile = profile; - } - - public boolean isCompress() { - return compress; - } - - public void setCompress(boolean compress) { - this.compress = compress; - } - - public boolean isAsync() { - return async; - } - - public void setAsync(boolean async) { - this.async = async; - } - - public Integer getMaxThreads() { - return maxThreads; - } - - public void setMaxThreads(Integer maxThreads) { - this.maxThreads = maxThreads; - } - - public Integer getMaxBlockSize() { - return maxBlockSize; - } - - public void setMaxBlockSize(Integer maxBlockSize) { - this.maxBlockSize = maxBlockSize; - } - - public int getBufferSize() { - return bufferSize; - } - - public void setBufferSize(int bufferSize) { - this.bufferSize = bufferSize; - } - - public int getApacheBufferSize() { - return apacheBufferSize; - } - - public void setApacheBufferSize(int apacheBufferSize) { - this.apacheBufferSize = apacheBufferSize; - } - - public int getSocketTimeout() { - return socketTimeout; - } - - public void setSocketTimeout(int socketTimeout) { - this.socketTimeout = socketTimeout; - } - - public int getConnectionTimeout() { - return connectionTimeout; - } - - public void setConnectionTimeout(int connectionTimeout) { - this.connectionTimeout = connectionTimeout; - } - - public int getDataTransferTimeout() { - return dataTransferTimeout; - } - - public void setDataTransferTimeout(int dataTransferTimeout) { - this.dataTransferTimeout = dataTransferTimeout; - } - - public int getKeepAliveTimeout() { - return keepAliveTimeout; - } - - public void setKeepAliveTimeout(int keepAliveTimeout) { - this.keepAliveTimeout = keepAliveTimeout; - } - - public String getUser() { - return user; - } - - public void setUser(String user) { - this.user = user; - } -} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseErrorCode.java b/src/main/java/ru/yandex/metrika/clickhouse/except/CHErrorCode.java similarity index 50% rename from src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseErrorCode.java rename to src/main/java/ru/yandex/metrika/clickhouse/except/CHErrorCode.java index 5c2a7557e797005b920cf54f7e48fbc08d86bef8..d9b8b7072b9633d63a3ae4abe3822a3de7c7f56d 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseErrorCode.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/except/CHErrorCode.java @@ -6,7 +6,7 @@ import java.util.*; * @author lopashev * @since 18.02.15 */ -public enum ClickhouseErrorCode { +public enum CHErrorCode { OK (0), NOT_FOUND_COLUMN_IN_BLOCK (10), ATTEMPT_TO_READ_AFTER_EOF (32), @@ -16,6 +16,7 @@ public enum ClickhouseErrorCode { NOT_IMPLEMENTED (48), LOGICAL_ERROR (49), TYPE_MISMATCH (53), + TABLE_ALREADY_EXISTS (57), UNKNOWN_TABLE (60), SYNTAX_ERROR (62), TOO_MUCH_ROWS (158), @@ -23,62 +24,34 @@ public enum ClickhouseErrorCode { TOO_SLOW (160), TOO_MUCH_TEMPORARY_NON_CONST_COLUMNS (166), TOO_BIG_AST (168), + CYCLIC_ALIASES (174), MULTIPLE_EXPRESSIONS_FOR_ALIAS (179), SET_SIZE_LIMIT_EXCEEDED (191), SOCKET_TIMEOUT (209), NETWORK_ERROR (210), EMPTY_QUERY (211), MEMORY_LIMIT_EXCEEDED (241), - POCO_EXCEPTION (1000); + TOO_MUCH_PARTS (252), + DOUBLE_DISTRIBUTED (288), + POCO_EXCEPTION (1000), + UNKNOWN_EXCEPTION (1002); public final Integer code; - private static final Map<Integer, ClickhouseErrorCode> byCodes; + private static final Map<Integer, CHErrorCode> byCodes; static { - Map<Integer, ClickhouseErrorCode> map = new HashMap<Integer, ClickhouseErrorCode>(); - for (ClickhouseErrorCode errorCode : values()) + Map<Integer, CHErrorCode> map = new HashMap<Integer, CHErrorCode>(); + for (CHErrorCode errorCode : values()) map.put(errorCode.code, errorCode); byCodes = Collections.unmodifiableMap(map); } - ClickhouseErrorCode(Integer code) { + CHErrorCode(Integer code) { this.code = code; } - public static final Set<ClickhouseErrorCode> ALL = Collections.unmodifiableSet(EnumSet.allOf(ClickhouseErrorCode.class)); - public static final Set<ClickhouseErrorCode> API = Collections.unmodifiableSet(EnumSet.of( - EMPTY_QUERY, - NOT_FOUND_COLUMN_IN_BLOCK, - ILLEGAL_TYPE_OF_ARGUMENT, - ILLEGAL_COLUMN, - UNKNOWN_IDENTIFIER, - NOT_IMPLEMENTED, - LOGICAL_ERROR, - TYPE_MISMATCH, - UNKNOWN_TABLE, - SYNTAX_ERROR, - TOO_MUCH_TEMPORARY_NON_CONST_COLUMNS, - TOO_BIG_AST, - MULTIPLE_EXPRESSIONS_FOR_ALIAS, - SET_SIZE_LIMIT_EXCEEDED, - MEMORY_LIMIT_EXCEEDED - )); - - public static final Set<ClickhouseErrorCode> DB = Collections.unmodifiableSet(EnumSet.of( - ATTEMPT_TO_READ_AFTER_EOF, - SOCKET_TIMEOUT, - NETWORK_ERROR, - POCO_EXCEPTION - )); - - public static final Set<ClickhouseErrorCode> QUERY = Collections.unmodifiableSet(EnumSet.of( - TOO_MUCH_ROWS, - TIMEOUT_EXCEEDED, - TOO_SLOW - )); - - public static ClickhouseErrorCode fromCode(Integer code) { + public static CHErrorCode fromCode(Integer code) { return byCodes.get(code); } diff --git a/src/main/java/ru/yandex/metrika/clickhouse/except/CHException.java b/src/main/java/ru/yandex/metrika/clickhouse/except/CHException.java new file mode 100644 index 0000000000000000000000000000000000000000..0c4c651801785ef6b2114d37f9bf69479810cffd --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/except/CHException.java @@ -0,0 +1,17 @@ +package ru.yandex.metrika.clickhouse.except; + +import java.sql.SQLException; + +/** + * Created by jkee on 16.03.15. + */ +public class CHException extends SQLException { + + public CHException(int code, Throwable cause, String host, int port) { + super("ClickHouse exception, code: " + code + ", host: " + host + ", port: " + port, null, code, cause); + } + + public CHException(int code, String message, Throwable cause, String host, int port) { + super("ClickHouse exception, message: " + message + ", host: " + host + ", port: " + port, null, code, cause); + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseExceptionSpecifier.java b/src/main/java/ru/yandex/metrika/clickhouse/except/CHExceptionSpecifier.java similarity index 58% rename from src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseExceptionSpecifier.java rename to src/main/java/ru/yandex/metrika/clickhouse/except/CHExceptionSpecifier.java index 43b51ff57d230615406bda832e562936492f1176..36b7e63e8e3d716f045fb44691558611ae487a3e 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseExceptionSpecifier.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/except/CHExceptionSpecifier.java @@ -1,14 +1,12 @@ package ru.yandex.metrika.clickhouse.except; import org.apache.http.conn.ConnectTimeoutException; -import ru.yandex.metrika.clickhouse.CHException; import ru.yandex.metrika.clickhouse.util.CopypasteUtils; import ru.yandex.metrika.clickhouse.util.Logger; +import java.net.ConnectException; import java.net.SocketTimeoutException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; + /** * @author lemmsh @@ -30,46 +28,11 @@ import java.util.Map; * https://github.yandex-team.ru/Metrika/metrika-core/blob/master/metrica/src/dbms/include/DB/Core/ErrorCodes.h */ -public final class ClickhouseExceptionSpecifier { - - private static final Logger log = Logger.of(ClickhouseExceptionSpecifier.class); - - private static final Map<Integer, ClickhouseExceptionFactory> FACTORIES; +public final class CHExceptionSpecifier { - static { - Map<Integer, ClickhouseExceptionFactory> map = new HashMap<Integer, ClickhouseExceptionFactory>(); - for (ClickhouseErrorCode errorCode : ClickhouseErrorCode.API) - map.put(errorCode.code, new ClickhouseExceptionFactory() { - @Override - public CHException create(Integer code, Throwable cause, String host, int port) { - return new ClickhouseApiException(code, cause, host, port); - } - }); - for (ClickhouseErrorCode errorCode : ClickhouseErrorCode.DB) - map.put(errorCode.code, new ClickhouseExceptionFactory() { - @Override - public CHException create(Integer code, Throwable cause, String host, int port) { - return new ClickhouseDbException(code, cause, host, port); - } - }); - for (ClickhouseErrorCode errorCode : ClickhouseErrorCode.QUERY) - map.put(errorCode.code, new ClickhouseExceptionFactory() { - @Override - public CHException create(Integer code, Throwable cause, String host, int port) { - return new ClickhouseQueryException(code, cause, host, port); - } - }); - FACTORIES = Collections.unmodifiableMap(map); - } - - private static final ClickhouseExceptionFactory DEFAULT_FACTORY = new ClickhouseExceptionFactory() { - @Override - public CHException create(Integer code, Throwable cause, String host, int port) { - return new ClickhouseUnhandledException(code, cause, host, port); - } - }; + private static final Logger log = Logger.of(CHExceptionSpecifier.class); - private ClickhouseExceptionSpecifier() { + private CHExceptionSpecifier() { } public static CHException specify(Throwable cause, String host, int port) { @@ -86,26 +49,33 @@ public final class ClickhouseExceptionSpecifier { * Очень надеемÑÑ, что кто-то будет в Ñти теÑÑ‚Ñ‹ Ñмотреть и актуализировать парÑинг. */ public static CHException specify(String clickhouseMessage, Throwable cause, String host, int port) { - if (CopypasteUtils.isBlank(clickhouseMessage) && cause != null) { + if (CopypasteUtils.isEmpty(clickhouseMessage) && cause != null) { if (cause instanceof SocketTimeoutException) - return new ClickhouseQueryException(ClickhouseErrorCode.SOCKET_TIMEOUT.code, cause, host, port); - else if (cause instanceof ConnectTimeoutException) - return new ClickhouseQueryException(ClickhouseErrorCode.NETWORK_ERROR.code, cause, host, port); + // еÑли приехал STE, то Ñкажем, что Ñто Ð·Ð°Ð¿Ñ€Ð¾Ñ Ð¿Ð»Ð¾Ñ…Ð¾Ð¹, Ñто не то же Ñамое, что SOCKET_TIMEOUT от кликхауÑа + // Ñ…Ð¾Ñ‚Ñ Ñто также может значить падающий кликхауÑ, поÑмотрим что выглÑдит правдоподобнее + return new CHException(CHErrorCode.TIMEOUT_EXCEEDED.code, cause, host, port); + else if (cause instanceof ConnectTimeoutException || cause instanceof ConnectException) + // не Ñмогли ÑоединитьÑÑ Ñ ÐºÐ»Ð¸ÐºÑ…Ð°ÑƒÑом за connectTimeout - в принципе, может быть никто не виноват + // Ñреди наших ÑущноÑтей (query/api/db), но обвинить кого-то надо, и Ñто будет db + return new CHException(CHErrorCode.NETWORK_ERROR.code, cause, host, port); else - return new ClickhouseUnhandledException(cause, host, port); + return new CHUnknownException(cause, host, port); } try { - // быÑтро и опаÑно - int code = Integer.parseInt(clickhouseMessage.substring(clickhouseMessage.indexOf(' ') + 1, clickhouseMessage.indexOf(','))); + int code; + if(clickhouseMessage.startsWith("Poco::Exception. Code: 1000, ")) { + code = 1000; + } else { + // быÑтро и опаÑно Code: 175, e.displayText() = DB::Exception: + code = Integer.parseInt(clickhouseMessage.substring(clickhouseMessage.indexOf(' ') + 1, clickhouseMessage.indexOf(','))); + } // ошибку в изначальном виде вÑе-таки укажем Throwable messageHolder = cause != null ? cause : new Throwable(clickhouseMessage); - ClickhouseExceptionFactory clickhouseExceptionFactory = FACTORIES.get(code); - if (clickhouseExceptionFactory == null) clickhouseExceptionFactory = DEFAULT_FACTORY; - return clickhouseExceptionFactory.create(code, messageHolder, host, port); + return new CHException(code, messageHolder, host, port); } catch (Exception e) { log.error("Unsupported clickhouse error format, please fix ClickhouseExceptionSpecifier, message: " + clickhouseMessage + ", error: " + e.getMessage()); - return new ClickhouseUnhandledException(clickhouseMessage, cause, host, port); + return new CHUnknownException(clickhouseMessage, cause, host, port); } } @@ -113,4 +83,4 @@ public final class ClickhouseExceptionSpecifier { CHException create(Integer code, Throwable cause, String host, int port); } -} +} \ No newline at end of file diff --git a/src/main/java/ru/yandex/metrika/clickhouse/except/CHUnknownException.java b/src/main/java/ru/yandex/metrika/clickhouse/except/CHUnknownException.java new file mode 100644 index 0000000000000000000000000000000000000000..4b31083522875893ff88724953ec7fb3eaf3fd67 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/except/CHUnknownException.java @@ -0,0 +1,22 @@ +package ru.yandex.metrika.clickhouse.except; + +/** + * @author lopashev + * @since 16.02.15 + */ +public class CHUnknownException extends CHException { + + public CHUnknownException(Throwable cause, String host, int port) { + super(CHErrorCode.UNKNOWN_EXCEPTION.code, cause, host, port); + } + + + public CHUnknownException(String message, Throwable cause, String host, int port) { + super(CHErrorCode.UNKNOWN_EXCEPTION.code, message, cause, host, port); + } + + public CHUnknownException(Integer code, Throwable cause, String host, int port) { + super(code, cause, host, port); + } + +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseApiException.java b/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseApiException.java deleted file mode 100644 index 67e36dfb173e65b3d012814f85598e5871d80ad4..0000000000000000000000000000000000000000 --- a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseApiException.java +++ /dev/null @@ -1,16 +0,0 @@ -package ru.yandex.metrika.clickhouse.except; - -import ru.yandex.metrika.clickhouse.CHException; - -/** - * @author lopashev - * @since 16.02.15 - */ -public class ClickhouseApiException extends CHException { - public static final String MESSAGE = - "Ð—Ð°Ð¿Ñ€Ð¾Ñ Ð½Ðµ может быть обработан из-за внутренней ошибки. Мы проанализируем и поÑтараемÑÑ ÑƒÑтранить причину как можно быÑтрее. ПожалуйÑта, не отправлÑйте Ð·Ð°Ð¿Ñ€Ð¾Ñ Ð¿Ð¾Ð²Ñ‚Ð¾Ñ€Ð½Ð¾."; - - public ClickhouseApiException(Integer code, Throwable cause, String host, int port) { - super(code, cause, host, port); - } -} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseDbException.java b/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseDbException.java deleted file mode 100644 index 03fa2a2b46450c95c409ee0ac22fa8a18a2cef46..0000000000000000000000000000000000000000 --- a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseDbException.java +++ /dev/null @@ -1,16 +0,0 @@ -package ru.yandex.metrika.clickhouse.except; - -import ru.yandex.metrika.clickhouse.CHException; - -/** - * @author lopashev - * @since 16.02.15 - */ -public class ClickhouseDbException extends CHException { - public static final String MESSAGE = - "Ð—Ð°Ð¿Ñ€Ð¾Ñ Ð½Ðµ может быть обработан в данный момент из-за возроÑшей нагрузки. ПожалуйÑта, отправьте Ð·Ð°Ð¿Ñ€Ð¾Ñ Ð¿Ð¾Ð²Ñ‚Ð¾Ñ€Ð½Ð¾ через неÑколько минут."; - - public ClickhouseDbException(Integer code, Throwable cause, String host, int port) { - super(code, cause, host, port); - } -} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseQueryException.java b/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseQueryException.java deleted file mode 100644 index c2625935359cfd45b69746de93b1aa878042641a..0000000000000000000000000000000000000000 --- a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseQueryException.java +++ /dev/null @@ -1,17 +0,0 @@ -package ru.yandex.metrika.clickhouse.except; - - -import ru.yandex.metrika.clickhouse.CHException; - -/** - * @author lopashev - * @since 16.02.15 - */ -public class ClickhouseQueryException extends CHException { - public static final String MESSAGE = - "Ð—Ð°Ð¿Ñ€Ð¾Ñ Ñлишком Ñложный и не может быть обработан. ПожалуйÑта, уменьшите интервал дат запроÑа либо включите/уменьшите Ñемплирование и повторите запроÑ."; - - public ClickhouseQueryException(Integer code, Throwable cause, String host, int port) { - super(code, cause, host, port); - } -} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseUnhandledException.java b/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseUnhandledException.java deleted file mode 100644 index e47d500970e180bf1cc1f01dad04f4e0c279ea76..0000000000000000000000000000000000000000 --- a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseUnhandledException.java +++ /dev/null @@ -1,26 +0,0 @@ -package ru.yandex.metrika.clickhouse.except; - -import ru.yandex.metrika.clickhouse.CHException; - -/** - * @author lopashev - * @since 16.02.15 - */ -public class ClickhouseUnhandledException extends CHException { - public static final String MESSAGE = - "Ð—Ð°Ð¿Ñ€Ð¾Ñ Ð½Ðµ может быть обработан по неизвеÑтной причине. Мы проанализируем и поÑтараемÑÑ ÑƒÑтранить причину как можно быÑтрее. ПожалуйÑта, отправьте Ð·Ð°Ð¿Ñ€Ð¾Ñ Ð¿Ð¾Ð²Ñ‚Ð¾Ñ€Ð½Ð¾ через неÑколько минут."; - - public ClickhouseUnhandledException(Throwable cause, String host, int port) { - super(cause, host, port); - } - - - public ClickhouseUnhandledException(String message, Throwable cause, String host, int port) { - super(message, cause, host, port); - } - - public ClickhouseUnhandledException(Integer code, Throwable cause, String host, int port) { - super(code, cause, host, port); - } - -} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/util/CHHttpClientBuilder.java b/src/main/java/ru/yandex/metrika/clickhouse/util/CHHttpClientBuilder.java index 1ca485d56c961eca17c415ed178e0770114c95a8..0e7b0c9adb4d48b13f71d3c3c3fafaca645f77f3 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/util/CHHttpClientBuilder.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/util/CHHttpClientBuilder.java @@ -16,7 +16,7 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.message.BasicHeaderElementIterator; import org.apache.http.protocol.HTTP; import org.apache.http.protocol.HttpContext; -import ru.yandex.metrika.clickhouse.copypaste.HttpConnectionProperties; +import ru.yandex.metrika.clickhouse.copypaste.CHProperties; import ru.yandex.metrika.clickhouse.copypaste.IpVersionPriorityResolver; import java.net.HttpURLConnection; @@ -27,9 +27,9 @@ import java.util.concurrent.TimeUnit; */ public class CHHttpClientBuilder { - private final HttpConnectionProperties properties; + private final CHProperties properties; - public CHHttpClientBuilder(HttpConnectionProperties properties) { + public CHHttpClientBuilder(CHProperties properties) { this.properties = properties; } @@ -43,16 +43,16 @@ public class CHHttpClientBuilder { .build(); } - private static PoolingHttpClientConnectionManager getConnectionManager() { + private PoolingHttpClientConnectionManager getConnectionManager() { //noinspection resource PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager( RegistryBuilder.<ConnectionSocketFactory>create() .register("http", PlainConnectionSocketFactory.getSocketFactory()) .register("https", SSLConnectionSocketFactory.getSocketFactory()) .build(), - null, null, new IpVersionPriorityResolver(), 1, TimeUnit.MINUTES); - connectionManager.setDefaultMaxPerRoute(500); - connectionManager.setMaxTotal(1000); + null, null, new IpVersionPriorityResolver(), properties.getTimeToLiveMillis(), TimeUnit.MILLISECONDS); + connectionManager.setDefaultMaxPerRoute(properties.getDefaultMaxPerRoute()); + connectionManager.setMaxTotal(properties.getMaxTotal()); return connectionManager; } diff --git a/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java b/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java index 69d1753c7cb1878bff86cf0378e900c585db9af5..0059b0c1431f2168ff14573552fb4a242536040b 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java @@ -1,10 +1,9 @@ package ru.yandex.metrika.clickhouse.util; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import java.io.*; import java.nio.charset.Charset; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -14,6 +13,7 @@ import java.util.List; */ public class CopypasteUtils { + private static final Logger log = Logger.of(CopypasteUtils.class); public static final Charset UTF_8 = Charset.forName("UTF-8"); /////// Metrika code ////// @@ -125,6 +125,10 @@ public class CopypasteUtils { return true; } + public static boolean isEmpty(String str) { + return str == null || str.isEmpty(); + } + public static String join(final Iterable<?> iterable, final char separator) { Iterator<?> iterator = iterable.iterator(); @@ -187,4 +191,22 @@ public class CopypasteUtils { return total; } + public static void close(Closeable closeable){ + if (closeable == null) return; + try{ + closeable.close(); + } catch (IOException e){ + log.error("can not close stream: " + e.getMessage()); + } + } + + public static void close(ResultSet rs){ + if (rs == null) return; + try{ + rs.close(); + } catch (SQLException e){ + log.error("can not close resultset: " + e.getMessage()); + } + } + }