From e2af19683bc51830c3e5df711144f4667bca6ed4 Mon Sep 17 00:00:00 2001 From: jkee <jkee@yandex-team.ru> Date: Thu, 19 Mar 2015 13:33:37 +0300 Subject: [PATCH] METR-15511: http client configuration, more meta fixes --- .../metrika/clickhouse/CHConnection.java | 21 ++++- .../clickhouse/CHDatabaseMetadata.java | 13 ++- .../metrika/clickhouse/CHStatement.java | 63 +++++++++++-- .../clickhouse/copypaste/CHResultBuilder.java | 2 +- .../clickhouse/copypaste/CHResultSet.java | 19 +++- .../copypaste/CHResultSetMetaData.java | 24 ++--- .../copypaste/IpVersionPriorityResolver.java | 52 ++++++++++ .../clickhouse/util/CHHttpClientBuilder.java | 94 +++++++++++++++++++ .../clickhouse/util/CopypasteUtils.java | 65 +++++++++++++ 9 files changed, 324 insertions(+), 29 deletions(-) create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/copypaste/IpVersionPriorityResolver.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/util/CHHttpClientBuilder.java diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHConnection.java b/src/main/java/ru/yandex/metrika/clickhouse/CHConnection.java index 88948f74..6fcd399c 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHConnection.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHConnection.java @@ -1,9 +1,11 @@ package ru.yandex.metrika.clickhouse; import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; import ru.yandex.metrika.clickhouse.config.ClickHouseSource; +import ru.yandex.metrika.clickhouse.copypaste.HttpConnectionProperties; +import ru.yandex.metrika.clickhouse.util.CHHttpClientBuilder; +import java.io.IOException; import java.sql.*; import java.util.Map; import java.util.Properties; @@ -13,12 +15,16 @@ import java.util.Properties; */ public class CHConnection implements Connection { - private final CloseableHttpClient httpclient = HttpClients.createDefault(); + private final CloseableHttpClient httpclient; + + private final HttpConnectionProperties properties = new HttpConnectionProperties(); private final String url; public CHConnection(String url) { this.url = url; + CHHttpClientBuilder clientBuilder = new CHHttpClientBuilder(properties); + httpclient = clientBuilder.buildClient(); } @Override @@ -27,10 +33,11 @@ public class CHConnection implements Connection { String hostPort = url.substring("jdbc:clickhouse:".length()); String host = hostPort.substring(0, hostPort.indexOf(':')); String port = hostPort.substring(hostPort.indexOf(':') + 1); + int portNum = Integer.parseInt(port); - ClickHouseSource source = new ClickHouseSource(host, "default"); + ClickHouseSource source = new ClickHouseSource(host, portNum, "default"); - return new CHStatement(httpclient, source); + return new CHStatement(httpclient, source, properties); } @Override @@ -70,7 +77,11 @@ public class CHConnection implements Connection { @Override public void close() throws SQLException { - + try { + httpclient.close(); + } catch (IOException e) { + throw new CHException("HTTP client close exception", e); + } } @Override diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHDatabaseMetadata.java b/src/main/java/ru/yandex/metrika/clickhouse/CHDatabaseMetadata.java index 4ed72c8d..d78c2968 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHDatabaseMetadata.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHDatabaseMetadata.java @@ -729,6 +729,9 @@ public class CHDatabaseMetadata implements DatabaseMetaData { @Override public ResultSet getColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) throws SQLException { + // todo support patterns as it should be (how?!) + log.debug("getColumns: cat " + catalog + " sp " + schemaPattern + + " tnp " + tableNamePattern + " cnp " + columnNamePattern); CHResultBuilder builder = CHResultBuilder.builder(23); builder.names( "TABLE_CAT", @@ -780,9 +783,17 @@ public class CHDatabaseMetadata implements DatabaseMetaData { "Int32", "String" ); - ResultSet descTable = request("desc table " + catalog + '.' + tableNamePattern); + String sql = "desc table "; + if (catalog != null) sql += catalog + '.'; + sql += tableNamePattern; + ResultSet descTable = request(sql); int colNum = 1; while (descTable.next()) { + // column filter + if (columnNamePattern != null && !columnNamePattern.equals(descTable.getString(1)) + && !columnNamePattern.equals("%")) { + continue; + } List<String> row = new ArrayList<String>(); row.add(catalog); row.add(tableNamePattern); diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java b/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java index 48250730..4f8a1cb3 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java @@ -36,17 +36,25 @@ public class CHStatement implements Statement { private ClickHouseSource source; - public CHStatement(CloseableHttpClient client, ClickHouseSource source) { + private CHResultSet currentResult; + + public CHStatement(CloseableHttpClient client, ClickHouseSource source, + HttpConnectionProperties properties) { this.client = client; this.source = source; + this.properties = properties; } @Override public ResultSet executeQuery(String sql) throws SQLException { InputStream is = getInputStream(sql, null, false); try { - return new CHResultSet(properties.isCompress() - ? new ClickhouseLZ4Stream(is) : is, properties.getBufferSize()); + currentResult = new CHResultSet(properties.isCompress() + ? new ClickhouseLZ4Stream(is) : is, properties.getBufferSize(), + extractDBName(sql), + extractTableName(sql) + ); + return currentResult; } catch (Exception e) { throw new RuntimeException(e); } @@ -66,11 +74,7 @@ public class CHStatement implements Statement { @Override public void close() throws SQLException { - try { - client.close(); - } catch (IOException e) { - throw new CHException("HTTP client close exception", e); - } + currentResult.close(); } @Override @@ -283,6 +287,43 @@ public class CHStatement implements Statement { return sql; } + private String extractTableName(String sql) { + String s = extractDBAndTableName(sql); + if (s.contains(".")) { + return s.substring(s.indexOf(".") + 1); + } else return s; + } + + private String extractDBName(String sql) { + String s = extractDBAndTableName(sql); + if (s.contains(".")) { + return s.substring(0, s.indexOf(".")); + } else { + return source.getDb(); + } + } + + private String extractDBAndTableName(String sql) { + // паршивый код, надо пиÑать или найти нормальный парÑер + if (CopypasteUtils.startsWithIgnoreCase(sql, "select")) { + String withoutStrings = CopypasteUtils.retainUnquoted(sql, '\''); + int fromIndex = withoutStrings.indexOf("from"); + if (fromIndex == -1) fromIndex = withoutStrings.indexOf("FROM"); + if (fromIndex != -1) { + String fromFrom = withoutStrings.substring(fromIndex); + String fromTable = fromFrom.substring("from".length()).trim(); + return fromTable.split(" ")[0]; + } + } + if (CopypasteUtils.startsWithIgnoreCase(sql, "desc")) { + return "system.columns"; // bullshit + } + if (CopypasteUtils.startsWithIgnoreCase(sql, "show")) { + return "system.tables"; // bullshit + } + return "system.unknown"; + } + private InputStream getInputStream(String sql, Map<String, String> additionalClickHouseDBParams, boolean ignoreDatabase @@ -317,7 +358,11 @@ public class CHStatement implements Statement { if (response.getStatusLine().getStatusCode() != HttpURLConnection.HTTP_OK) { String chMessage = null; try { - chMessage = EntityUtils.toString(response.getEntity()); + InputStream messageStream = entity.getContent(); + if (properties.isCompress()) { + messageStream = new ClickhouseLZ4Stream(messageStream); + } + chMessage = CopypasteUtils.toString(messageStream); } catch (IOException e) { chMessage = "error while read response "+ e.getMessage(); } diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResultBuilder.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResultBuilder.java index 36290161..5f340e79 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResultBuilder.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResultBuilder.java @@ -70,7 +70,7 @@ public class CHResultBuilder { byte[] bytes = baos.toByteArray(); ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes); - return new CHResultSet(inputStream, 1024); + return new CHResultSet(inputStream, 1024, "system", "unknown"); } catch (IOException e) { throw new RuntimeException("Never happens", e); } diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResultSet.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResultSet.java index 0c82b9f0..d930212f 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResultSet.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResultSet.java @@ -1,5 +1,7 @@ package ru.yandex.metrika.clickhouse.copypaste; +import ru.yandex.metrika.clickhouse.util.Logger; + import java.io.IOException; import java.io.InputStream; import java.sql.*; @@ -19,8 +21,13 @@ import java.util.Map; */ public class CHResultSet extends AbstractResultSet { + private static final Logger log = Logger.of(CHResultSet.class); + private final StreamSplitter bis; + private final String db; + private final String table; + private final Map<String, Integer> col = new HashMap<String, Integer>(); // column name -> 1-based index private final String[] columns; private final String[] types; @@ -39,7 +46,9 @@ public class CHResultSet extends AbstractResultSet { // row counter private int rowNumber; - public CHResultSet(InputStream is, int bufferSize) throws IOException { + public CHResultSet(InputStream is, int bufferSize, String db, String table) throws IOException { + this.db = db; + this.table = table; bis = new StreamSplitter(is, (byte) 0x0A, bufferSize); /// \n ByteFragment headerFragment = bis.next(); if (headerFragment == null) { @@ -365,6 +374,14 @@ public class CHResultSet extends AbstractResultSet { return rowNumber + 1; } + public String getDb() { + return db; + } + + public String getTable() { + return table; + } + ///// // 1-based insex in column list diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResultSetMetaData.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResultSetMetaData.java index 8c827b5e..47b65bab 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResultSetMetaData.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CHResultSetMetaData.java @@ -8,15 +8,15 @@ import java.sql.SQLException; */ public class CHResultSetMetaData implements ResultSetMetaData { - private final CHResultSet CHResultSet; + private final CHResultSet resultSet; - public CHResultSetMetaData(CHResultSet CHResultSet) { - this.CHResultSet = CHResultSet; + public CHResultSetMetaData(CHResultSet resultSet) { + this.resultSet = resultSet; } @Override public int getColumnCount() throws SQLException { - return CHResultSet.getColumnNames().length; + return resultSet.getColumnNames().length; } @Override @@ -46,7 +46,7 @@ public class CHResultSetMetaData implements ResultSetMetaData { @Override public boolean isSigned(int column) throws SQLException { - return !CHResultSet.getTypes()[column - 1].startsWith("U"); + return !resultSet.getTypes()[column - 1].startsWith("U"); } @Override @@ -56,12 +56,12 @@ public class CHResultSetMetaData implements ResultSetMetaData { @Override public String getColumnLabel(int column) throws SQLException { - return CHResultSet.getColumnNames()[column - 1]; + return resultSet.getColumnNames()[column - 1]; } @Override public String getColumnName(int column) throws SQLException { - return CHResultSet.getColumnNames()[column - 1]; + return resultSet.getColumnNames()[column - 1]; } @Override @@ -81,12 +81,12 @@ public class CHResultSetMetaData implements ResultSetMetaData { @Override public String getTableName(int column) throws SQLException { - throw new UnsupportedOperationException("table name unknown at this stage"); + return resultSet.getTable(); } @Override public String getCatalogName(int column) throws SQLException { - throw new UnsupportedOperationException("catalog name unknown at this stage"); + return resultSet.getDb(); } @Override @@ -96,10 +96,10 @@ public class CHResultSetMetaData implements ResultSetMetaData { @Override public String getColumnTypeName(int column) throws SQLException { - if (CHResultSet.getTypes().length < column) { - throw new ArrayIndexOutOfBoundsException("Array length: " + CHResultSet.getTypes().length + " requested: " + (column - 1)); + if (resultSet.getTypes().length < column) { + throw new ArrayIndexOutOfBoundsException("Array length: " + resultSet.getTypes().length + " requested: " + (column - 1)); } - return CHResultSet.getTypes()[column - 1]; + return resultSet.getTypes()[column - 1]; } @Override diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/IpVersionPriorityResolver.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/IpVersionPriorityResolver.java new file mode 100644 index 00000000..332dd0a1 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/IpVersionPriorityResolver.java @@ -0,0 +1,52 @@ +package ru.yandex.metrika.clickhouse.copypaste; + +import org.apache.http.conn.DnsResolver; +import org.apache.http.impl.conn.SystemDefaultDnsResolver; + +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; + +/** + * @author jkee + */ + +public class IpVersionPriorityResolver implements DnsResolver { + + private DnsResolver defaultResolver = new SystemDefaultDnsResolver(); + + private boolean preferV6 = true; + + public IpVersionPriorityResolver() { + } + + public IpVersionPriorityResolver(boolean preferV6) { + this.preferV6 = preferV6; + } + + @Override + public InetAddress[] resolve(String host) throws UnknownHostException { + InetAddress[] resolve = defaultResolver.resolve(host); + Comparator<InetAddress> comparator = new Comparator<InetAddress>() { + @Override + public int compare(InetAddress o1, InetAddress o2) { + boolean o16 = o1 instanceof Inet6Address; + boolean o26 = o2 instanceof Inet6Address; + if (o16 == o26) return 0; + if (o16) return -1; + if (o26) return 1; + return 0; + } + }; + if (!preferV6) comparator = Collections.reverseOrder(comparator); + Arrays.sort(resolve, comparator); + return resolve; + } + + public void setPreferV6(boolean preferV6) { + this.preferV6 = preferV6; + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/util/CHHttpClientBuilder.java b/src/main/java/ru/yandex/metrika/clickhouse/util/CHHttpClientBuilder.java new file mode 100644 index 00000000..f35a1a6a --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/util/CHHttpClientBuilder.java @@ -0,0 +1,94 @@ +package ru.yandex.metrika.clickhouse.util; + +import org.apache.http.HeaderElement; +import org.apache.http.HeaderElementIterator; +import org.apache.http.HttpResponse; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.config.ConnectionConfig; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.ConnectionKeepAliveStrategy; +import org.apache.http.conn.socket.ConnectionSocketFactory; +import org.apache.http.conn.socket.PlainConnectionSocketFactory; +import org.apache.http.conn.ssl.SSLConnectionSocketFactory; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.apache.http.message.BasicHeaderElementIterator; +import org.apache.http.protocol.HTTP; +import org.apache.http.protocol.HttpContext; +import ru.yandex.metrika.clickhouse.copypaste.HttpConnectionProperties; +import ru.yandex.metrika.clickhouse.copypaste.IpVersionPriorityResolver; + +import java.net.HttpURLConnection; +import java.util.concurrent.TimeUnit; + +/** + * Created by jkee on 19.03.15. + */ +public class CHHttpClientBuilder { + + private final HttpConnectionProperties properties; + + public CHHttpClientBuilder(HttpConnectionProperties properties) { + this.properties = properties; + } + + public CloseableHttpClient buildClient() { + return HttpClientBuilder.create() + .setConnectionManager(getConnectionManager()) + .setKeepAliveStrategy(createKeepAliveStrategy()) + .setDefaultConnectionConfig(getConnectionConfig()) + .setDefaultRequestConfig(getRequestConfig()) + .build(); + } + + private static PoolingHttpClientConnectionManager getConnectionManager() { + //noinspection resource + PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager( + RegistryBuilder.<ConnectionSocketFactory>create() + .register("http", PlainConnectionSocketFactory.getSocketFactory()) + .register("https", SSLConnectionSocketFactory.getSocketFactory()) + .build(), + null, null, new IpVersionPriorityResolver(), 1, TimeUnit.MINUTES); + connectionManager.setDefaultMaxPerRoute(500); + connectionManager.setMaxTotal(1000); + return connectionManager; + } + + private ConnectionConfig getConnectionConfig() { + return ConnectionConfig.custom() + .setBufferSize(properties.getApacheBufferSize()) + .build(); + } + + private RequestConfig getRequestConfig() { + return RequestConfig.custom() + .setSocketTimeout(properties.getSocketTimeout()) + .setConnectTimeout(properties.getConnectionTimeout()) + .build(); + } + + private ConnectionKeepAliveStrategy createKeepAliveStrategy() { + return new ConnectionKeepAliveStrategy() { + @Override + public long getKeepAliveDuration(HttpResponse httpResponse, HttpContext httpContext) { + // при ошибках keep-alive не вÑегда правильно работает, на вÑÑкий Ñлучай закроем коннекшн + if (httpResponse.getStatusLine().getStatusCode() != HttpURLConnection.HTTP_OK) { + return -1; + } + HeaderElementIterator it = new BasicHeaderElementIterator( + httpResponse.headerIterator(HTTP.CONN_DIRECTIVE)); + while (it.hasNext()) { + HeaderElement he = it.nextElement(); + String param = he.getName(); + //String value = he.getValue(); + if (param != null && param.equalsIgnoreCase(HTTP.CONN_KEEP_ALIVE)) { + return properties.getKeepAliveTimeout(); + } + } + return -1; + } + }; + } + +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java b/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java index 6565a1b0..77a5ae71 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java @@ -5,7 +5,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.Charset; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; /** * Created by jkee on 16.03.15. @@ -14,6 +16,65 @@ public class CopypasteUtils { public static final Charset UTF_8 = Charset.forName("UTF-8"); + /////// Metrika code ////// + + public static boolean startsWithIgnoreCase(String haystack, String pattern) { + return haystack.substring(0, pattern.length()).equalsIgnoreCase(pattern); + } + + /** + * ОÑтавлÑет от haystack только вÑе чаÑти не в кавычках + */ + public static String retainUnquoted(String haystack, char quoteChar) { + StringBuilder sb = new StringBuilder(); + String[] split = splitWithoutEscaped(haystack, quoteChar, true); + // нечетные - наши пациенты + for (int i = 0; i < split.length; i++) { + String s = split[i]; + if ((i & 1) == 0) { + sb.append(s); + } + } + return sb.toString(); + } + + public static String[] splitWithoutEscaped(String str, char separatorChar) { + return splitWithoutEscaped(str, separatorChar, false); + } + + /** + * Ðе учитывает ÑÑкейпленные Ñепараторы + */ + public static String[] splitWithoutEscaped(String str, char separatorChar, boolean retainEmpty) { + int len = str.length(); + if (len == 0) { + return new String[0]; + } + List<String> list = new ArrayList<String>(); + int i = 0; + int start = 0; + boolean match = false; + while (i < len) { + if (str.charAt(i) == '\\') { + match = true; + i += 2; + } else if (str.charAt(i) == separatorChar) { + if (retainEmpty || match) { + list.add(str.substring(start, i)); + match = false; + } + start = ++i; + } else { + match = true; + i++; + } + } + if (retainEmpty || match) { + list.add(str.substring(start, i)); + } + return list.toArray(new String[list.size()]); + } + /////// Apache StringUtils //////// @@ -67,6 +128,10 @@ public class CopypasteUtils { private static final int BUF_SIZE = 0x1000; // 4K + public static String toString(InputStream in) throws IOException { + return new String(toByteArray(in), UTF_8); + } + public static byte[] toByteArray(InputStream in) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); copy(in, out); -- GitLab