From e3c7c0df1ec4e3e5e1f93fd794ddc6fe2df8e7c9 Mon Sep 17 00:00:00 2001 From: jkee <jkee@yandex-team.ru> Date: Mon, 16 Mar 2015 22:44:38 +0300 Subject: [PATCH] METR-15511: exception handling, refactor 4 --- .../metrika/clickhouse/CHConnection.java | 11 +- .../metrika/clickhouse/CHException.java | 12 ++ .../metrika/clickhouse/CHStatement.java | 181 ++++++++++++------ .../clickhouse/config/ClickHouseSource.java | 95 +++++++++ .../clickhouse/copypaste/ByteFragment.java | 2 + .../clickhouse/copypaste/CopypasteUtils.java | 12 -- .../copypaste/CountingInputStream.java | 87 --------- .../except/ClickhouseApiException.java | 16 ++ .../except/ClickhouseDbException.java | 16 ++ .../except/ClickhouseErrorCode.java | 89 +++++++++ .../except/ClickhouseExceptionSpecifier.java | 116 +++++++++++ .../except/ClickhouseQueryException.java | 17 ++ .../except/ClickhouseUnhandledException.java | 26 +++ .../clickhouse/util/CopypasteUtils.java | 91 +++++++++ 14 files changed, 617 insertions(+), 154 deletions(-) create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/config/ClickHouseSource.java delete mode 100644 src/main/java/ru/yandex/metrika/clickhouse/copypaste/CopypasteUtils.java delete mode 100644 src/main/java/ru/yandex/metrika/clickhouse/copypaste/CountingInputStream.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseApiException.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseDbException.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseErrorCode.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseExceptionSpecifier.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseQueryException.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseUnhandledException.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHConnection.java b/src/main/java/ru/yandex/metrika/clickhouse/CHConnection.java index dd1ca464..f78e89ce 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHConnection.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHConnection.java @@ -2,7 +2,9 @@ package ru.yandex.metrika.clickhouse; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import ru.yandex.metrika.clickhouse.config.ClickHouseSource; +import java.net.URI; import java.sql.*; import java.util.Map; import java.util.Properties; @@ -22,7 +24,14 @@ public class CHConnection implements Connection { @Override public Statement createStatement() throws SQLException { - return new CHStatement(httpclient, url); + + URI uri = URI.create(url); + String host = uri.getHost(); + int port = uri.getPort(); + + ClickHouseSource source = new ClickHouseSource(host, port); + + return new CHStatement(httpclient, source); } @Override diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHException.java b/src/main/java/ru/yandex/metrika/clickhouse/CHException.java index c5fc9730..de3e900b 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHException.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHException.java @@ -19,4 +19,16 @@ public class CHException extends SQLException { public CHException(String reason, Throwable cause) { super(reason, cause); } + + public CHException(Throwable cause, String host, int port) { + super("ClickHouse exception, host: " + host + ", port: " + port, cause); + } + + public CHException(int code, Throwable cause, String host, int port) { + super("ClickHouse exception, code: " + code + ", host: " + host + ", port: " + port, cause); + } + + public CHException(String message, Throwable cause, String host, int port) { + super("ClickHouse exception, message: " + message + ", host: " + host + ", port: " + port, cause); + } } diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java b/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java index 57a5d978..b0d6be90 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java @@ -6,48 +6,44 @@ import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; +import ru.yandex.metrika.clickhouse.config.ClickHouseSource; import ru.yandex.metrika.clickhouse.copypaste.*; +import ru.yandex.metrika.clickhouse.except.ClickhouseExceptionSpecifier; +import ru.yandex.metrika.clickhouse.util.CopypasteUtils; +import ru.yandex.metrika.clickhouse.util.Logger; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URISyntaxException; import java.sql.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * Created by jkee on 14.03.15. */ public class CHStatement implements Statement { - private final CloseableHttpClient client; + private static final Logger log = Logger.of(CHStatement.class); - private final String url; + private final CloseableHttpClient client; private HttpConnectionProperties properties = new HttpConnectionProperties(); - public CHStatement(CloseableHttpClient client, String url) { - this.client = client; - this.url = url; - } + private ClickHouseSource source; - public static String clickhousifySql(String sql) { - return clickhousifySql(sql, "TabSeparatedWithNamesAndTypes"); - } - - public static String clickhousifySql(String sql, String format) { - sql = sql.trim(); - if (!sql.replace(";", "").trim().endsWith(" TabSeparatedWithNamesAndTypes") - && !sql.replace(";", "").trim().endsWith(" TabSeparated") - && !sql.replace(";", "").trim().endsWith(" JSONCompact")) { - if (sql.endsWith(";")) sql = sql.substring(0, sql.length() - 1); - sql += " FORMAT " + format + ';'; - } - return sql; + public CHStatement(CloseableHttpClient client, ClickHouseSource source) { + this.client = client; + this.source = source; } @Override public ResultSet executeQuery(String sql) throws SQLException { - String csql = clickhousifySql(sql); - CountingInputStream is = getInputStream(csql); + InputStream is = getInputStream(sql, null, false); try { return new CHResultSet(properties.isCompress() ? new ClickhouseLZ4Stream(is) : is, properties.getBufferSize()); @@ -56,41 +52,16 @@ public class CHStatement implements Statement { } } - private CountingInputStream getInputStream(String sql) { - HttpPost post = new HttpPost(url); - post.setEntity(new StringEntity(sql, CopypasteUtils.UTF_8)); - HttpEntity entity = null; - InputStream is = null; - try { - HttpResponse response = client.execute(post); - entity = response.getEntity(); - if (response.getStatusLine().getStatusCode() != HttpURLConnection.HTTP_OK) { - String chMessage = null; - try { - chMessage = EntityUtils.toString(response.getEntity()); - } catch (IOException e) { - chMessage = "error while read response "+ e.getMessage(); - } - EntityUtils.consumeQuietly(entity); - throw new RuntimeException("CH error: " + chMessage); - } - if (entity.isStreaming()) { - is = entity.getContent(); - } else { - FastByteArrayOutputStream baos = new FastByteArrayOutputStream(); - entity.writeTo(baos); - is = baos.convertToInputStream(); - } - return new CountingInputStream(is); - } catch (IOException e) { - EntityUtils.consumeQuietly(entity); - throw new RuntimeException(e); - } - } - @Override public int executeUpdate(String sql) throws SQLException { - throw new UnsupportedOperationException(); + ResultSet rs = null; + try { + rs = executeQuery(sql); + while (rs.next()) {} + } finally { + try { rs.close(); } catch (Exception e) {}; + } + return 1; } @Override @@ -296,4 +267,106 @@ public class CHStatement implements Statement { public boolean isWrapperFor(Class<?> iface) throws SQLException { return false; } + + public static String clickhousifySql(String sql) { + return clickhousifySql(sql, "TabSeparatedWithNamesAndTypes"); + } + + public static String clickhousifySql(String sql, String format) { + sql = sql.trim(); + if (!sql.replace(";", "").trim().endsWith(" TabSeparatedWithNamesAndTypes") + && !sql.replace(";", "").trim().endsWith(" TabSeparated") + && !sql.replace(";", "").trim().endsWith(" JSONCompact")) { + if (sql.endsWith(";")) sql = sql.substring(0, sql.length() - 1); + sql += " FORMAT " + format + ';'; + } + return sql; + } + + private InputStream getInputStream(String sql, + Map<String, String> additionalClickHouseDBParams, + boolean ignoreDatabase + ) throws CHException { + sql = clickhousifySql(sql); + log.debug(sql); + URI uri = null; + try { + Map<String, String> params = getParams(false); + if (additionalClickHouseDBParams != null && !additionalClickHouseDBParams.isEmpty()) { + params.putAll(additionalClickHouseDBParams); + } + List<String> paramPairs = new ArrayList<String>(); + for (Map.Entry<String, String> entry : params.entrySet()) { + paramPairs.add(entry.getKey() + '=' + entry.getValue()); + } + String query = CopypasteUtils.join(paramPairs, '&'); + uri = new URI("http", null, source.getHost(), source.getPort(), + "/", query, null); + } catch (URISyntaxException e) { + log.error("Mailformed URL: " + e.getMessage()); + throw new IllegalStateException("illegal configuration of db"); + } + log.debug("Request url: " + uri); + HttpPost post = new HttpPost(uri); + post.setEntity(new StringEntity(sql, CopypasteUtils.UTF_8)); + HttpEntity entity = null; + InputStream is = null; + try { + HttpResponse response = client.execute(post); + entity = response.getEntity(); + if (response.getStatusLine().getStatusCode() != HttpURLConnection.HTTP_OK) { + String chMessage = null; + try { + chMessage = EntityUtils.toString(response.getEntity()); + } catch (IOException e) { + chMessage = "error while read response "+ e.getMessage(); + } + EntityUtils.consumeQuietly(entity); + throw ClickhouseExceptionSpecifier.specify(chMessage, source.getHost(), source.getPort()); + } + if (entity.isStreaming()) { + is = entity.getContent(); + } else { + FastByteArrayOutputStream baos = new FastByteArrayOutputStream(); + entity.writeTo(baos); + is = baos.convertToInputStream(); + } + return is; + } catch (IOException e) { + log.info("Error during connection to " + source + ", reporting failure to data source, message: " + e.getMessage()); + EntityUtils.consumeQuietly(entity); + try { if (is != null) is.close(); } catch (IOException ignored) { } + log.info("Error sql: " + sql); + throw new CHException("Unknown IO exception", e); + } + } + + public Map<String, String> getParams(boolean ignoreDatabase) { + Map<String, String> params = new HashMap<String, String>(); + //в clickhouse бывают таблички без базы (Ñ‚.е. в базе default) + if (!CopypasteUtils.isBlank(source.getDb()) && !ignoreDatabase) { + params.put("database", source.getDb()); + } + if (properties.isCompress()) { + params.put("compress", "1"); + } + // нам вÑегда нужны min и max в ответе + params.put("extremes", "1"); + if (CopypasteUtils.isBlank(properties.getProfile())) { + if (properties.getMaxThreads() != null) + params.put("max_threads", String.valueOf(properties.getMaxThreads())); + // да, там в Ñекундах + params.put("max_execution_time", String.valueOf((properties.getSocketTimeout() + properties.getDataTransferTimeout()) / 1000)); + if (properties.getMaxBlockSize() != null) { + params.put("max_block_size", String.valueOf(properties.getMaxBlockSize())); + } + } else { + params.put("profile", properties.getProfile()); + } + //в ÐºÐ»Ð¸ÐºÑ…Ð°ÑƒÑ Ð¸Ð½Ð¾Ð³Ð´Ð° бывает user + if (properties.getUser() != null) { + params.put("user", properties.getUser()); + } + return params; + } } diff --git a/src/main/java/ru/yandex/metrika/clickhouse/config/ClickHouseSource.java b/src/main/java/ru/yandex/metrika/clickhouse/config/ClickHouseSource.java new file mode 100644 index 00000000..d9190d13 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/config/ClickHouseSource.java @@ -0,0 +1,95 @@ +package ru.yandex.metrika.clickhouse.config; + +/** + * datasource + * @author orantius + * @version $Id$ + * @since 7/12/12 + */ +public class ClickHouseSource { + public static final int DEFAULT_PORT = 8123; + + private String host = "localhost"; + + private int port = DEFAULT_PORT; + + private String database; + + public ClickHouseSource() { + } + + public ClickHouseSource(String host, String database) { + this.host = host; + this.database = database; + } + + public ClickHouseSource(String host, int port) { + this.host = host; + this.port = port; + } + + public ClickHouseSource(String host, int port, String database) { + this.host = host; + this.port = port; + this.database = database; + } + + public ClickHouseSource(String database) { + this.database = database; + } + + public void setHost(String host) { + this.host = host; + } + + public void setPort(int port) { + this.port = port; + } + + public void setDb(String database) { + this.database = database; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public String getDb() { + return database; + } + + public String getUrl() { + return "http://"+host+ ':' +port+ '/'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ClickHouseSource source = (ClickHouseSource) o; + if (database != null ? !database.equals(source.database) : source.database != null) return false; + return port == source.port && host.equals(source.host); + } + + @Override + public int hashCode() { + int result = host.hashCode(); + result = 31 * result + port; + result = 31 * result + (database != null ? database.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "ClickHouseSource{" + + "host='" + host + '\'' + + ", port=" + port + + ", database='" + database + '\'' + + '}'; + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/ByteFragment.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/ByteFragment.java index 5401d7fb..c8d40e82 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/ByteFragment.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/ByteFragment.java @@ -1,5 +1,7 @@ package ru.yandex.metrika.clickhouse.copypaste; +import ru.yandex.metrika.clickhouse.util.CopypasteUtils; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.Arrays; diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CopypasteUtils.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CopypasteUtils.java deleted file mode 100644 index 2d6f7387..00000000 --- a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CopypasteUtils.java +++ /dev/null @@ -1,12 +0,0 @@ -package ru.yandex.metrika.clickhouse.copypaste; - -import java.nio.charset.Charset; - -/** - * Created by jkee on 16.03.15. - */ -public class CopypasteUtils { - - public static final Charset UTF_8 = Charset.forName("UTF-8"); - -} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CountingInputStream.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CountingInputStream.java deleted file mode 100644 index b38d8808..00000000 --- a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CountingInputStream.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (C) 2007 The Guava Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ru.yandex.metrika.clickhouse.copypaste; - -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; - -/** - * An {@link java.io.InputStream} that counts the number of bytes read. - * - * @author Chris Nokleberg - * @since 1.0 - */ -public final class CountingInputStream extends FilterInputStream { - - private long count; - private long mark = -1; - - /** - * Wraps another input stream, counting the number of bytes read. - * - * @param in the input stream to be wrapped - */ - public CountingInputStream(InputStream in) { - super(in); - } - - /** Returns the number of bytes read. */ - public long getCount() { - return count; - } - - @Override public int read() throws IOException { - int result = in.read(); - if (result != -1) { - count++; - } - return result; - } - - @Override public int read(byte[] b, int off, int len) throws IOException { - int result = in.read(b, off, len); - if (result != -1) { - count += result; - } - return result; - } - - @Override public long skip(long n) throws IOException { - long result = in.skip(n); - count += result; - return result; - } - - @Override public synchronized void mark(int readlimit) { - in.mark(readlimit); - mark = count; - // it's okay to mark even if mark isn't supported, as reset won't work - } - - @Override public synchronized void reset() throws IOException { - if (!in.markSupported()) { - throw new IOException("Mark not supported"); - } - if (mark == -1) { - throw new IOException("Mark not set"); - } - - in.reset(); - count = mark; - } -} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseApiException.java b/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseApiException.java new file mode 100644 index 00000000..67e36dfb --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseApiException.java @@ -0,0 +1,16 @@ +package ru.yandex.metrika.clickhouse.except; + +import ru.yandex.metrika.clickhouse.CHException; + +/** + * @author lopashev + * @since 16.02.15 + */ +public class ClickhouseApiException extends CHException { + public static final String MESSAGE = + "Ð—Ð°Ð¿Ñ€Ð¾Ñ Ð½Ðµ может быть обработан из-за внутренней ошибки. Мы проанализируем и поÑтараемÑÑ ÑƒÑтранить причину как можно быÑтрее. ПожалуйÑта, не отправлÑйте Ð·Ð°Ð¿Ñ€Ð¾Ñ Ð¿Ð¾Ð²Ñ‚Ð¾Ñ€Ð½Ð¾."; + + public ClickhouseApiException(Integer code, Throwable cause, String host, int port) { + super(code, cause, host, port); + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseDbException.java b/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseDbException.java new file mode 100644 index 00000000..03fa2a2b --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseDbException.java @@ -0,0 +1,16 @@ +package ru.yandex.metrika.clickhouse.except; + +import ru.yandex.metrika.clickhouse.CHException; + +/** + * @author lopashev + * @since 16.02.15 + */ +public class ClickhouseDbException extends CHException { + public static final String MESSAGE = + "Ð—Ð°Ð¿Ñ€Ð¾Ñ Ð½Ðµ может быть обработан в данный момент из-за возроÑшей нагрузки. ПожалуйÑта, отправьте Ð·Ð°Ð¿Ñ€Ð¾Ñ Ð¿Ð¾Ð²Ñ‚Ð¾Ñ€Ð½Ð¾ через неÑколько минут."; + + public ClickhouseDbException(Integer code, Throwable cause, String host, int port) { + super(code, cause, host, port); + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseErrorCode.java b/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseErrorCode.java new file mode 100644 index 00000000..5c2a7557 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseErrorCode.java @@ -0,0 +1,89 @@ +package ru.yandex.metrika.clickhouse.except; + +import java.util.*; + +/** +* @author lopashev +* @since 18.02.15 +*/ +public enum ClickhouseErrorCode { + OK (0), + NOT_FOUND_COLUMN_IN_BLOCK (10), + ATTEMPT_TO_READ_AFTER_EOF (32), + ILLEGAL_TYPE_OF_ARGUMENT (43), + ILLEGAL_COLUMN (44), + UNKNOWN_IDENTIFIER (47), + NOT_IMPLEMENTED (48), + LOGICAL_ERROR (49), + TYPE_MISMATCH (53), + UNKNOWN_TABLE (60), + SYNTAX_ERROR (62), + TOO_MUCH_ROWS (158), + TIMEOUT_EXCEEDED (159), + TOO_SLOW (160), + TOO_MUCH_TEMPORARY_NON_CONST_COLUMNS (166), + TOO_BIG_AST (168), + MULTIPLE_EXPRESSIONS_FOR_ALIAS (179), + SET_SIZE_LIMIT_EXCEEDED (191), + SOCKET_TIMEOUT (209), + NETWORK_ERROR (210), + EMPTY_QUERY (211), + MEMORY_LIMIT_EXCEEDED (241), + POCO_EXCEPTION (1000); + + public final Integer code; + + private static final Map<Integer, ClickhouseErrorCode> byCodes; + static { + Map<Integer, ClickhouseErrorCode> map = new HashMap<Integer, ClickhouseErrorCode>(); + for (ClickhouseErrorCode errorCode : values()) + map.put(errorCode.code, errorCode); + byCodes = Collections.unmodifiableMap(map); + } + + ClickhouseErrorCode(Integer code) { + this.code = code; + } + + public static final Set<ClickhouseErrorCode> ALL = Collections.unmodifiableSet(EnumSet.allOf(ClickhouseErrorCode.class)); + + public static final Set<ClickhouseErrorCode> API = Collections.unmodifiableSet(EnumSet.of( + EMPTY_QUERY, + NOT_FOUND_COLUMN_IN_BLOCK, + ILLEGAL_TYPE_OF_ARGUMENT, + ILLEGAL_COLUMN, + UNKNOWN_IDENTIFIER, + NOT_IMPLEMENTED, + LOGICAL_ERROR, + TYPE_MISMATCH, + UNKNOWN_TABLE, + SYNTAX_ERROR, + TOO_MUCH_TEMPORARY_NON_CONST_COLUMNS, + TOO_BIG_AST, + MULTIPLE_EXPRESSIONS_FOR_ALIAS, + SET_SIZE_LIMIT_EXCEEDED, + MEMORY_LIMIT_EXCEEDED + )); + + public static final Set<ClickhouseErrorCode> DB = Collections.unmodifiableSet(EnumSet.of( + ATTEMPT_TO_READ_AFTER_EOF, + SOCKET_TIMEOUT, + NETWORK_ERROR, + POCO_EXCEPTION + )); + + public static final Set<ClickhouseErrorCode> QUERY = Collections.unmodifiableSet(EnumSet.of( + TOO_MUCH_ROWS, + TIMEOUT_EXCEEDED, + TOO_SLOW + )); + + public static ClickhouseErrorCode fromCode(Integer code) { + return byCodes.get(code); + } + + @Override + public String toString() { + return name() + " (code " + code + ')'; + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseExceptionSpecifier.java b/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseExceptionSpecifier.java new file mode 100644 index 00000000..43b51ff5 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseExceptionSpecifier.java @@ -0,0 +1,116 @@ +package ru.yandex.metrika.clickhouse.except; + +import org.apache.http.conn.ConnectTimeoutException; +import ru.yandex.metrika.clickhouse.CHException; +import ru.yandex.metrika.clickhouse.util.CopypasteUtils; +import ru.yandex.metrika.clickhouse.util.Logger; + +import java.net.SocketTimeoutException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * @author lemmsh + * @since 7/17/14 + * + * * ИÑключениÑ, возбуждаемые плохим ответом от кликхауÑа, будут рождатьÑÑ Ð·Ð´ÐµÑÑŒ. РазделÑем их на 4 категории: + * 1. ClickhouseApiException - пришел ответ Ñ Ð¾ÑˆÐ¸Ð±ÐºÐ¾Ð¹, характерной Ð´Ð»Ñ Ð½ÐµÐ²ÐµÑ€Ð½Ð¾ Ñгенерированного запроÑа, например, + * неизвеÑтный Ñтолбец, токен, неÑовмеÑтимоÑÑ‚ÑŒ типов и Ñ‚.п., Ñ‚.е. виновато API, Ñледует залогировать Ñто Ñ Ð±Ð¾Ð»ÐµÐµ + * выÑоким приоритетом и чинить первопричину; + * 2. ClickhouseDBException - пришел ответ Ñ Ð¾ÑˆÐ¸Ð±ÐºÐ¾Ð¹, характерной Ð´Ð»Ñ Ð²Ð½ÑƒÑ‚Ñ€ÐµÐ½Ð½ÐµÐ¹ ошибки СУБД, например, закончилиÑÑŒ + * коннекты к реплике, закончилоÑÑŒ меÑто и Ñ‚.п.; + * 3. ClickhouseQueryException - Ð·Ð°Ð¿Ñ€Ð¾Ñ Ð»Ð¸Ð±Ð¾ Ñлишком Ñложный и не Ñмог уложитьÑÑ Ð² отведенные квоты по памÑти/времени, + * либо Ñто запроÑ-неудачник, пришедший в неудачное времÑ, когда нода кликхауÑа была перегружена. Ð’ Ñтом Ñлучае отдаем + * пользователю Ñовет Ñузить Ð·Ð°Ð¿Ñ€Ð¾Ñ (включить/уменьшить Ñемплирование), попробовать Ð·Ð°Ð¿Ñ€Ð¾Ñ Ð¿Ð¾Ð·Ð¶Ðµ и Ñ‚.п.; + * 4. ClickhouseUnhandledException - мир не идеален, а в не идеальном мире не вÑегда удаетÑÑ Ñразу разделить + * ошибки кликхауÑа на 3 категории, указанные выше, вот за Ñтим Ñоздана четвертаÑ. Сюда попадают впервые поÑвившиеÑÑ Ð² + * логах ошибки, чтобы в наиближайшем будущем превратитьÑÑ Ð² Api, DB или Query вариант ClickhouseException. + * + * https://github.yandex-team.ru/Metrika/metrika-core/blob/master/metrica/src/dbms/include/DB/Core/ErrorCodes.h + */ + +public final class ClickhouseExceptionSpecifier { + + private static final Logger log = Logger.of(ClickhouseExceptionSpecifier.class); + + private static final Map<Integer, ClickhouseExceptionFactory> FACTORIES; + + static { + Map<Integer, ClickhouseExceptionFactory> map = new HashMap<Integer, ClickhouseExceptionFactory>(); + for (ClickhouseErrorCode errorCode : ClickhouseErrorCode.API) + map.put(errorCode.code, new ClickhouseExceptionFactory() { + @Override + public CHException create(Integer code, Throwable cause, String host, int port) { + return new ClickhouseApiException(code, cause, host, port); + } + }); + for (ClickhouseErrorCode errorCode : ClickhouseErrorCode.DB) + map.put(errorCode.code, new ClickhouseExceptionFactory() { + @Override + public CHException create(Integer code, Throwable cause, String host, int port) { + return new ClickhouseDbException(code, cause, host, port); + } + }); + for (ClickhouseErrorCode errorCode : ClickhouseErrorCode.QUERY) + map.put(errorCode.code, new ClickhouseExceptionFactory() { + @Override + public CHException create(Integer code, Throwable cause, String host, int port) { + return new ClickhouseQueryException(code, cause, host, port); + } + }); + FACTORIES = Collections.unmodifiableMap(map); + } + + private static final ClickhouseExceptionFactory DEFAULT_FACTORY = new ClickhouseExceptionFactory() { + @Override + public CHException create(Integer code, Throwable cause, String host, int port) { + return new ClickhouseUnhandledException(code, cause, host, port); + } + }; + + private ClickhouseExceptionSpecifier() { + } + + public static CHException specify(Throwable cause, String host, int port) { + return specify(null, cause, host, port); + } + + public static CHException specify(String clickhouseMessage, String host, int port) { + return specify(clickhouseMessage, null, host, port); + } + + /** + * Очень надеемÑÑ, что формат ÑÐ¾Ð¾Ð±Ñ‰ÐµÐ½Ð¸Ñ Ð¾Ñ‚ кликхауÑа имеет вид "Code: 10, e.displayText() = DB::Exception: ...". + * Очень надеемÑÑ, что кто-то будет Ñто проверÑÑ‚ÑŒ в теÑтах. + * Очень надеемÑÑ, что кто-то будет в Ñти теÑÑ‚Ñ‹ Ñмотреть и актуализировать парÑинг. + */ + public static CHException specify(String clickhouseMessage, Throwable cause, String host, int port) { + if (CopypasteUtils.isBlank(clickhouseMessage) && cause != null) { + if (cause instanceof SocketTimeoutException) + return new ClickhouseQueryException(ClickhouseErrorCode.SOCKET_TIMEOUT.code, cause, host, port); + else if (cause instanceof ConnectTimeoutException) + return new ClickhouseQueryException(ClickhouseErrorCode.NETWORK_ERROR.code, cause, host, port); + else + return new ClickhouseUnhandledException(cause, host, port); + } + try { + // быÑтро и опаÑно + int code = Integer.parseInt(clickhouseMessage.substring(clickhouseMessage.indexOf(' ') + 1, clickhouseMessage.indexOf(','))); + // ошибку в изначальном виде вÑе-таки укажем + Throwable messageHolder = cause != null ? cause : new Throwable(clickhouseMessage); + ClickhouseExceptionFactory clickhouseExceptionFactory = FACTORIES.get(code); + if (clickhouseExceptionFactory == null) clickhouseExceptionFactory = DEFAULT_FACTORY; + return clickhouseExceptionFactory.create(code, messageHolder, host, port); + } catch (Exception e) { + log.error("Unsupported clickhouse error format, please fix ClickhouseExceptionSpecifier, message: " + + clickhouseMessage + ", error: " + e.getMessage()); + return new ClickhouseUnhandledException(clickhouseMessage, cause, host, port); + } + } + + private interface ClickhouseExceptionFactory { + CHException create(Integer code, Throwable cause, String host, int port); + } + +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseQueryException.java b/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseQueryException.java new file mode 100644 index 00000000..c2625935 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseQueryException.java @@ -0,0 +1,17 @@ +package ru.yandex.metrika.clickhouse.except; + + +import ru.yandex.metrika.clickhouse.CHException; + +/** + * @author lopashev + * @since 16.02.15 + */ +public class ClickhouseQueryException extends CHException { + public static final String MESSAGE = + "Ð—Ð°Ð¿Ñ€Ð¾Ñ Ñлишком Ñложный и не может быть обработан. ПожалуйÑта, уменьшите интервал дат запроÑа либо включите/уменьшите Ñемплирование и повторите запроÑ."; + + public ClickhouseQueryException(Integer code, Throwable cause, String host, int port) { + super(code, cause, host, port); + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseUnhandledException.java b/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseUnhandledException.java new file mode 100644 index 00000000..e47d5009 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/except/ClickhouseUnhandledException.java @@ -0,0 +1,26 @@ +package ru.yandex.metrika.clickhouse.except; + +import ru.yandex.metrika.clickhouse.CHException; + +/** + * @author lopashev + * @since 16.02.15 + */ +public class ClickhouseUnhandledException extends CHException { + public static final String MESSAGE = + "Ð—Ð°Ð¿Ñ€Ð¾Ñ Ð½Ðµ может быть обработан по неизвеÑтной причине. Мы проанализируем и поÑтараемÑÑ ÑƒÑтранить причину как можно быÑтрее. ПожалуйÑта, отправьте Ð·Ð°Ð¿Ñ€Ð¾Ñ Ð¿Ð¾Ð²Ñ‚Ð¾Ñ€Ð½Ð¾ через неÑколько минут."; + + public ClickhouseUnhandledException(Throwable cause, String host, int port) { + super(cause, host, port); + } + + + public ClickhouseUnhandledException(String message, Throwable cause, String host, int port) { + super(message, cause, host, port); + } + + public ClickhouseUnhandledException(Integer code, Throwable cause, String host, int port) { + super(code, cause, host, port); + } + +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java b/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java new file mode 100644 index 00000000..6565a1b0 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java @@ -0,0 +1,91 @@ +package ru.yandex.metrika.clickhouse.util; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.util.Iterator; + +/** + * Created by jkee on 16.03.15. + */ +public class CopypasteUtils { + + public static final Charset UTF_8 = Charset.forName("UTF-8"); + + + /////// Apache StringUtils //////// + + public static boolean isBlank(final CharSequence cs) { + int strLen; + if (cs == null || (strLen = cs.length()) == 0) { + return true; + } + for (int i = 0; i < strLen; i++) { + if (!Character.isWhitespace(cs.charAt(i))) { + return false; + } + } + return true; + } + + public static String join(final Iterable<?> iterable, final char separator) { + + Iterator<?> iterator = iterable.iterator(); + + // handle null, zero and one elements before building a buffer + if (iterator == null) { + return null; + } + if (!iterator.hasNext()) { + return ""; + } + final Object first = iterator.next(); + if (!iterator.hasNext()) { + return first == null ? "" : first.toString(); + } + + // two or more elements + final StringBuilder buf = new StringBuilder(256); // Java default is 16, probably too small + if (first != null) { + buf.append(first); + } + + while (iterator.hasNext()) { + buf.append(separator); + final Object obj = iterator.next(); + if (obj != null) { + buf.append(obj); + } + } + + return buf.toString(); + } + + /////// Guava ////// + + private static final int BUF_SIZE = 0x1000; // 4K + + public static byte[] toByteArray(InputStream in) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + copy(in, out); + return out.toByteArray(); + } + + public static long copy(InputStream from, OutputStream to) + throws IOException { + byte[] buf = new byte[BUF_SIZE]; + long total = 0; + while (true) { + int r = from.read(buf); + if (r == -1) { + break; + } + to.write(buf, 0, r); + total += r; + } + return total; + } + +} -- GitLab