From c20bd5e8eafa361da60c471f67fecb5929b749ab Mon Sep 17 00:00:00 2001 From: Viktor Tarnavsky <jkee@yandex-team.ru> Date: Mon, 16 Mar 2015 17:22:23 +0300 Subject: [PATCH] METR-15511: initial commit --- .gitignore | 21 +- clickhouse-jdbc.iml | 21 + pom.xml | 36 + .../metrika/clickhouse/CHConnection.java | 293 ++++++ .../clickhouse/CHDatabaseMetadata.java | 922 ++++++++++++++++ .../yandex/metrika/clickhouse/CHDriver.java | 55 + .../metrika/clickhouse/CHStatement.java | 303 ++++++ .../copypaste/AbstractResultSet.java | 990 ++++++++++++++++++ .../clickhouse/copypaste/ByteFragment.java | 186 ++++ .../copypaste/ByteFragmentUtils.java | 147 +++ .../copypaste/ClickhouseLZ4Stream.java | 110 ++ .../copypaste/CountingInputStream.java | 87 ++ .../copypaste/FastByteArrayInputStream.java | 121 +++ .../copypaste/FastByteArrayOutputStream.java | 179 ++++ .../copypaste/HttpConnectionProperties.java | 137 +++ .../clickhouse/copypaste/HttpResult.java | 417 ++++++++ .../LittleEndianDataInputStream.java | 265 +++++ .../clickhouse/copypaste/Patterns.java | 45 + .../clickhouse/copypaste/StreamSplitter.java | 137 +++ 19 files changed, 4467 insertions(+), 5 deletions(-) create mode 100644 clickhouse-jdbc.iml create mode 100644 pom.xml create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/CHConnection.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/CHDatabaseMetadata.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/CHDriver.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/copypaste/AbstractResultSet.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/copypaste/ByteFragment.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/copypaste/ByteFragmentUtils.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/copypaste/ClickhouseLZ4Stream.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/copypaste/CountingInputStream.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/copypaste/FastByteArrayInputStream.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/copypaste/FastByteArrayOutputStream.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/copypaste/HttpConnectionProperties.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/copypaste/HttpResult.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/copypaste/LittleEndianDataInputStream.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/copypaste/Patterns.java create mode 100644 src/main/java/ru/yandex/metrika/clickhouse/copypaste/StreamSplitter.java diff --git a/.gitignore b/.gitignore index 32858aad..9bb1c461 100644 --- a/.gitignore +++ b/.gitignore @@ -1,12 +1,23 @@ *.class -# Mobile Tools for Java (J2ME) -.mtj.tmp/ - # Package Files # *.jar *.war *.ear -# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml -hs_err_pid* + +# Eclipse +.classpath +.project +.settings/ + + +# Intellij +.idea/ +*.iml +*.iws +out/ + +# Maven +log/ +target/ \ No newline at end of file diff --git a/clickhouse-jdbc.iml b/clickhouse-jdbc.iml new file mode 100644 index 00000000..f151c7aa --- /dev/null +++ b/clickhouse-jdbc.iml @@ -0,0 +1,21 @@ +<?xml version="1.0" encoding="UTF-8"?> +<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4"> + <component name="NewModuleRootManager" inherit-compiler-output="false"> + <output url="file://$MODULE_DIR$/target/classes" /> + <output-test url="file://$MODULE_DIR$/target/test-classes" /> + <content url="file://$MODULE_DIR$"> + <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" /> + <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" /> + <excludeFolder url="file://$MODULE_DIR$/target" /> + </content> + <orderEntry type="inheritedJdk" /> + <orderEntry type="sourceFolder" forTests="false" /> + <orderEntry type="library" scope="TEST" name="Maven: junit:junit:3.8.1" level="project" /> + <orderEntry type="library" name="Maven: org.apache.httpcomponents:httpclient:4.3.6" level="project" /> + <orderEntry type="library" name="Maven: org.apache.httpcomponents:httpcore:4.3.3" level="project" /> + <orderEntry type="library" name="Maven: commons-logging:commons-logging:1.1.3" level="project" /> + <orderEntry type="library" name="Maven: commons-codec:commons-codec:1.6" level="project" /> + <orderEntry type="library" name="Maven: net.jpountz.lz4:lz4:1.1.2" level="project" /> + </component> +</module> + diff --git a/pom.xml b/pom.xml new file mode 100644 index 00000000..c4d831e4 --- /dev/null +++ b/pom.xml @@ -0,0 +1,36 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>ru.yandex.metrika.clickhouse</groupId> + <artifactId>jdbc</artifactId> + <version>1.0-SNAPSHOT</version> + <packaging>jar</packaging> + + <name>jdbc</name> + <url>http://maven.apache.org</url> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <dependencies> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>3.8.1</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.3.6</version> + </dependency> + <dependency> + <groupId>net.jpountz.lz4</groupId> + <artifactId>lz4</artifactId> + <version>1.1.2</version> + </dependency> + </dependencies> +</project> diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHConnection.java b/src/main/java/ru/yandex/metrika/clickhouse/CHConnection.java new file mode 100644 index 00000000..a4336fb9 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHConnection.java @@ -0,0 +1,293 @@ +package ru.yandex.metrika.clickhouse; + +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; + +import java.sql.*; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; + +/** + * Created by jkee on 14.03.15. + */ +public class CHConnection implements Connection { + + private final CloseableHttpClient httpclient = HttpClients.createDefault(); + + private final String url; + + public CHConnection(String url) { + this.url = url; + } + + @Override + public Statement createStatement() throws SQLException { + return new CHStatement(httpclient, url); + } + + @Override + public PreparedStatement prepareStatement(String sql) throws SQLException { + return null; + } + + @Override + public CallableStatement prepareCall(String sql) throws SQLException { + return null; + } + + @Override + public String nativeSQL(String sql) throws SQLException { + return null; + } + + @Override + public void setAutoCommit(boolean autoCommit) throws SQLException { + + } + + @Override + public boolean getAutoCommit() throws SQLException { + return false; + } + + @Override + public void commit() throws SQLException { + + } + + @Override + public void rollback() throws SQLException { + + } + + @Override + public void close() throws SQLException { + + } + + @Override + public boolean isClosed() throws SQLException { + return false; + } + + @Override + public DatabaseMetaData getMetaData() throws SQLException { + return new CHDatabaseMetadata(url, this); + } + + @Override + public void setReadOnly(boolean readOnly) throws SQLException { + + } + + @Override + public boolean isReadOnly() throws SQLException { + return false; + } + + @Override + public void setCatalog(String catalog) throws SQLException { + + } + + @Override + public String getCatalog() throws SQLException { + return null; + } + + @Override + public void setTransactionIsolation(int level) throws SQLException { + + } + + @Override + public int getTransactionIsolation() throws SQLException { + return 0; + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public void clearWarnings() throws SQLException { + + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { + return null; + } + + @Override + public Map<String, Class<?>> getTypeMap() throws SQLException { + return null; + } + + @Override + public void setTypeMap(Map<String, Class<?>> map) throws SQLException { + + } + + @Override + public void setHoldability(int holdability) throws SQLException { + + } + + @Override + public int getHoldability() throws SQLException { + return 0; + } + + @Override + public Savepoint setSavepoint() throws SQLException { + return null; + } + + @Override + public Savepoint setSavepoint(String name) throws SQLException { + return null; + } + + @Override + public void rollback(Savepoint savepoint) throws SQLException { + + } + + @Override + public void releaseSavepoint(Savepoint savepoint) throws SQLException { + + } + + @Override + public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + @Override + public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { + return null; + } + + @Override + public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { + return null; + } + + @Override + public Clob createClob() throws SQLException { + return null; + } + + @Override + public Blob createBlob() throws SQLException { + return null; + } + + @Override + public NClob createNClob() throws SQLException { + return null; + } + + @Override + public SQLXML createSQLXML() throws SQLException { + return null; + } + + @Override + public boolean isValid(int timeout) throws SQLException { + return false; + } + + @Override + public void setClientInfo(String name, String value) throws SQLClientInfoException { + + } + + @Override + public void setClientInfo(Properties properties) throws SQLClientInfoException { + + } + + @Override + public String getClientInfo(String name) throws SQLException { + return null; + } + + @Override + public Properties getClientInfo() throws SQLException { + return null; + } + + @Override + public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + return null; + } + + @Override + public Struct createStruct(String typeName, Object[] attributes) throws SQLException { + return null; + } + + @Override + public void setSchema(String schema) throws SQLException { + + } + + @Override + public String getSchema() throws SQLException { + return null; + } + + @Override + public void abort(Executor executor) throws SQLException { + + } + + @Override + public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { + + } + + @Override + public int getNetworkTimeout() throws SQLException { + return 0; + } + + @Override + public <T> T unwrap(Class<T> iface) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class<?> iface) throws SQLException { + return false; + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHDatabaseMetadata.java b/src/main/java/ru/yandex/metrika/clickhouse/CHDatabaseMetadata.java new file mode 100644 index 00000000..24883391 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHDatabaseMetadata.java @@ -0,0 +1,922 @@ +package ru.yandex.metrika.clickhouse; + +import java.sql.*; + +/** + * Created by jkee on 14.03.15. + */ +public class CHDatabaseMetadata implements DatabaseMetaData { + + private String url; + private CHConnection connection; + + public CHDatabaseMetadata(String url, CHConnection connection) { + this.url = url; + this.connection = connection; + } + + @Override + public boolean allProceduresAreCallable() throws SQLException { + return true; + } + + @Override + public boolean allTablesAreSelectable() throws SQLException { + return true; + } + + @Override + public String getURL() throws SQLException { + return url; + } + + @Override + public String getUserName() throws SQLException { + return null; + } + + @Override + public boolean isReadOnly() throws SQLException { + return true; + } + + @Override + public boolean nullsAreSortedHigh() throws SQLException { + return true; + } + + @Override + public boolean nullsAreSortedLow() throws SQLException { + return false; + } + + @Override + public boolean nullsAreSortedAtStart() throws SQLException { + return true; + } + + @Override + public boolean nullsAreSortedAtEnd() throws SQLException { + return false; + } + + @Override + public String getDatabaseProductName() throws SQLException { + return "ClickHouse"; + } + + @Override + public String getDatabaseProductVersion() throws SQLException { + return "0.42"; + } + + @Override + public String getDriverName() throws SQLException { + return "ru.yandex.metrika.clickhouse-jdbc"; + } + + @Override + public String getDriverVersion() throws SQLException { + return "0.1"; + } + + @Override + public int getDriverMajorVersion() { + return 0; + } + + @Override + public int getDriverMinorVersion() { + return 1; + } + + @Override + public boolean usesLocalFiles() throws SQLException { + return false; + } + + @Override + public boolean usesLocalFilePerTable() throws SQLException { + return false; + } + + @Override + public boolean supportsMixedCaseIdentifiers() throws SQLException { + return true; + } + + @Override + public boolean storesUpperCaseIdentifiers() throws SQLException { + return true; + } + + @Override + public boolean storesLowerCaseIdentifiers() throws SQLException { + return true; + } + + @Override + public boolean storesMixedCaseIdentifiers() throws SQLException { + return true; + } + + @Override + public boolean supportsMixedCaseQuotedIdentifiers() throws SQLException { + return true; + } + + @Override + public boolean storesUpperCaseQuotedIdentifiers() throws SQLException { + return true; + } + + @Override + public boolean storesLowerCaseQuotedIdentifiers() throws SQLException { + return true; + } + + @Override + public boolean storesMixedCaseQuotedIdentifiers() throws SQLException { + return true; + } + + @Override + public String getIdentifierQuoteString() throws SQLException { + return "`"; + } + + @Override + public String getSQLKeywords() throws SQLException { + return "GLOBAL,ARRAY"; + } + + @Override + public String getNumericFunctions() throws SQLException { + return ""; + } + + @Override + public String getStringFunctions() throws SQLException { + return ""; + } + + @Override + public String getSystemFunctions() throws SQLException { + return ""; + } + + @Override + public String getTimeDateFunctions() throws SQLException { + return ""; + } + + @Override + public String getSearchStringEscape() throws SQLException { + return "\\"; + } + + @Override + public String getExtraNameCharacters() throws SQLException { + return ""; + } + + @Override + public boolean supportsAlterTableWithAddColumn() throws SQLException { + return true; + } + + @Override + public boolean supportsAlterTableWithDropColumn() throws SQLException { + return true; + } + + @Override + public boolean supportsColumnAliasing() throws SQLException { + return true; + } + + @Override + public boolean nullPlusNonNullIsNull() throws SQLException { + return true; + } + + @Override + public boolean supportsConvert() throws SQLException { + return false; + } + + @Override + public boolean supportsConvert(int fromType, int toType) throws SQLException { + return false; + } + + @Override + public boolean supportsTableCorrelationNames() throws SQLException { + return false; + } + + @Override + public boolean supportsDifferentTableCorrelationNames() throws SQLException { + return false; + } + + @Override + public boolean supportsExpressionsInOrderBy() throws SQLException { + return true; + } + + @Override + public boolean supportsOrderByUnrelated() throws SQLException { + return true; + } + + @Override + public boolean supportsGroupBy() throws SQLException { + return true; + } + + @Override + public boolean supportsGroupByUnrelated() throws SQLException { + return true; + } + + @Override + public boolean supportsGroupByBeyondSelect() throws SQLException { + return true; + } + + @Override + public boolean supportsLikeEscapeClause() throws SQLException { + return true; + } + + @Override + public boolean supportsMultipleResultSets() throws SQLException { + return false; + } + + @Override + public boolean supportsMultipleTransactions() throws SQLException { + return false; + } + + @Override + public boolean supportsNonNullableColumns() throws SQLException { + return true; + } + + @Override + public boolean supportsMinimumSQLGrammar() throws SQLException { + return false; + } + + @Override + public boolean supportsCoreSQLGrammar() throws SQLException { + return false; + } + + @Override + public boolean supportsExtendedSQLGrammar() throws SQLException { + return false; + } + + @Override + public boolean supportsANSI92EntryLevelSQL() throws SQLException { + return false; + } + + @Override + public boolean supportsANSI92IntermediateSQL() throws SQLException { + return false; + } + + @Override + public boolean supportsANSI92FullSQL() throws SQLException { + return false; + } + + @Override + public boolean supportsIntegrityEnhancementFacility() throws SQLException { + return false; + } + + @Override + public boolean supportsOuterJoins() throws SQLException { + return true; + } + + @Override + public boolean supportsFullOuterJoins() throws SQLException { + return false; + } + + @Override + public boolean supportsLimitedOuterJoins() throws SQLException { + return true; + } + + @Override + public String getSchemaTerm() throws SQLException { + return "schema"; + } + + @Override + public String getProcedureTerm() throws SQLException { + return "some bullshit"; + } + + @Override + public String getCatalogTerm() throws SQLException { + return "database"; + } + + @Override + public boolean isCatalogAtStart() throws SQLException { + return false; + } + + @Override + public String getCatalogSeparator() throws SQLException { + return ":"; + } + + @Override + public boolean supportsSchemasInDataManipulation() throws SQLException { + return false; + } + + @Override + public boolean supportsSchemasInProcedureCalls() throws SQLException { + return false; + } + + @Override + public boolean supportsSchemasInTableDefinitions() throws SQLException { + return false; + } + + @Override + public boolean supportsSchemasInIndexDefinitions() throws SQLException { + return false; + } + + @Override + public boolean supportsSchemasInPrivilegeDefinitions() throws SQLException { + return false; + } + + @Override + public boolean supportsCatalogsInDataManipulation() throws SQLException { + return true; + } + + @Override + public boolean supportsCatalogsInProcedureCalls() throws SQLException { + return false; + } + + @Override + public boolean supportsCatalogsInTableDefinitions() throws SQLException { + return true; + } + + @Override + public boolean supportsCatalogsInIndexDefinitions() throws SQLException { + return true; + } + + @Override + public boolean supportsCatalogsInPrivilegeDefinitions() throws SQLException { + return false; + } + + @Override + public boolean supportsPositionedDelete() throws SQLException { + return false; + } + + @Override + public boolean supportsPositionedUpdate() throws SQLException { + return false; + } + + @Override + public boolean supportsSelectForUpdate() throws SQLException { + return false; + } + + @Override + public boolean supportsStoredProcedures() throws SQLException { + return false; + } + + @Override + public boolean supportsSubqueriesInComparisons() throws SQLException { + return true; + } + + @Override + public boolean supportsSubqueriesInExists() throws SQLException { + return false; + } + + @Override + public boolean supportsSubqueriesInIns() throws SQLException { + return true; + } + + @Override + public boolean supportsSubqueriesInQuantifieds() throws SQLException { + return false; + } + + @Override + public boolean supportsCorrelatedSubqueries() throws SQLException { + return false; + } + + @Override + public boolean supportsUnion() throws SQLException { + return true; + } + + @Override + public boolean supportsUnionAll() throws SQLException { + return true; + } + + @Override + public boolean supportsOpenCursorsAcrossCommit() throws SQLException { + return false; + } + + @Override + public boolean supportsOpenCursorsAcrossRollback() throws SQLException { + return false; + } + + @Override + public boolean supportsOpenStatementsAcrossCommit() throws SQLException { + return false; + } + + @Override + public boolean supportsOpenStatementsAcrossRollback() throws SQLException { + return false; + } + + @Override + public int getMaxBinaryLiteralLength() throws SQLException { + return 0; + } + + @Override + public int getMaxCharLiteralLength() throws SQLException { + return 0; + } + + @Override + public int getMaxColumnNameLength() throws SQLException { + return 0; + } + + @Override + public int getMaxColumnsInGroupBy() throws SQLException { + return 0; + } + + @Override + public int getMaxColumnsInIndex() throws SQLException { + return 0; + } + + @Override + public int getMaxColumnsInOrderBy() throws SQLException { + return 0; + } + + @Override + public int getMaxColumnsInSelect() throws SQLException { + return 0; + } + + @Override + public int getMaxColumnsInTable() throws SQLException { + return 0; + } + + @Override + public int getMaxConnections() throws SQLException { + return 0; + } + + @Override + public int getMaxCursorNameLength() throws SQLException { + return 0; + } + + @Override + public int getMaxIndexLength() throws SQLException { + return 0; + } + + @Override + public int getMaxSchemaNameLength() throws SQLException { + return 0; + } + + @Override + public int getMaxProcedureNameLength() throws SQLException { + return 0; + } + + @Override + public int getMaxCatalogNameLength() throws SQLException { + return 0; + } + + @Override + public int getMaxRowSize() throws SQLException { + return 0; + } + + @Override + public boolean doesMaxRowSizeIncludeBlobs() throws SQLException { + return false; + } + + @Override + public int getMaxStatementLength() throws SQLException { + return 0; + } + + @Override + public int getMaxStatements() throws SQLException { + return 0; + } + + @Override + public int getMaxTableNameLength() throws SQLException { + return 0; + } + + @Override + public int getMaxTablesInSelect() throws SQLException { + return 0; + } + + @Override + public int getMaxUserNameLength() throws SQLException { + return 0; + } + + @Override + public int getDefaultTransactionIsolation() throws SQLException { + return Connection.TRANSACTION_NONE; + } + + @Override + public boolean supportsTransactions() throws SQLException { + return false; + } + + @Override + public boolean supportsTransactionIsolationLevel(int level) throws SQLException { + return level == Connection.TRANSACTION_NONE; + } + + @Override + public boolean supportsDataDefinitionAndDataManipulationTransactions() throws SQLException { + return false; + } + + @Override + public boolean supportsDataManipulationTransactionsOnly() throws SQLException { + return false; + } + + @Override + public boolean dataDefinitionCausesTransactionCommit() throws SQLException { + return false; + } + + @Override + public boolean dataDefinitionIgnoredInTransactions() throws SQLException { + return false; + } + + private ResultSet request(String sql) throws SQLException { + Statement statement = connection.createStatement(); + return statement.executeQuery(sql); + } + + @Override + public ResultSet getProcedures(String catalog, String schemaPattern, String procedureNamePattern) throws SQLException { + return request("SELECT 'bullshit_procedure'"); + } + + @Override + public ResultSet getProcedureColumns(String catalog, String schemaPattern, String procedureNamePattern, String columnNamePattern) throws SQLException { + return request("SELECT 'bullshit_procedure_columns'"); + } + + @Override + public ResultSet getTables(String catalog, String schemaPattern, String tableNamePattern, String[] types) throws SQLException { + /** + TABLE_CAT String => table catalog (may be null) + TABLE_SCHEM String => table schema (may be null) + TABLE_NAME String => table name + TABLE_TYPE String => table type. Typical types are "TABLE", "VIEW", "SYSTEM TABLE", "GLOBAL TEMPORARY", "LOCAL TEMPORARY", "ALIAS", "SYNONYM". + REMARKS String => explanatory comment on the table + TYPE_CAT String => the types catalog (may be null) + TYPE_SCHEM String => the types schema (may be null) + TYPE_NAME String => type name (may be null) + SELF_REFERENCING_COL_NAME String => name of the designated "identifier" column of a typed table (may be null) + REF_GENERATION String => specifies how values in SELF_REFERENCING_COL_NAME are created. Values are "SYSTEM", "USER", "DERIVED". (may be null) + */ + if (catalog == null || catalog.isEmpty()) { + catalog = "default"; + } + String sql = "select " + + "database, name, name, 'TABLE', '', '', '', '', '', '' " + + "from system.tables " + + "where database = '" + catalog + "' " + + "order by name"; + return request(sql); + } + + @Override + public ResultSet getSchemas() throws SQLException { + return request("select name, database from system.tables"); + } + + @Override + public ResultSet getCatalogs() throws SQLException { + return request("show databases"); + } + + @Override + public ResultSet getTableTypes() throws SQLException { + return request("select 'TABLE', 'LOCAL TEMPORARY'"); + } + + @Override + public ResultSet getColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) throws SQLException { + return request("desc table " + catalog + '.' + tableNamePattern); + } + + @Override + public ResultSet getColumnPrivileges(String catalog, String schema, String table, String columnNamePattern) throws SQLException { + return null; + } + + @Override + public ResultSet getTablePrivileges(String catalog, String schemaPattern, String tableNamePattern) throws SQLException { + return null; + } + + @Override + public ResultSet getBestRowIdentifier(String catalog, String schema, String table, int scope, boolean nullable) throws SQLException { + return null; + } + + @Override + public ResultSet getVersionColumns(String catalog, String schema, String table) throws SQLException { + return null; + } + + @Override + public ResultSet getPrimaryKeys(String catalog, String schema, String table) throws SQLException { + return null; + } + + @Override + public ResultSet getImportedKeys(String catalog, String schema, String table) throws SQLException { + return null; + } + + @Override + public ResultSet getExportedKeys(String catalog, String schema, String table) throws SQLException { + return null; + } + + @Override + public ResultSet getCrossReference(String parentCatalog, String parentSchema, String parentTable, String foreignCatalog, String foreignSchema, String foreignTable) throws SQLException { + return null; + } + + @Override + public ResultSet getTypeInfo() throws SQLException { + return null; + } + + @Override + public ResultSet getIndexInfo(String catalog, String schema, String table, boolean unique, boolean approximate) throws SQLException { + return null; + } + + @Override + public boolean supportsResultSetType(int type) throws SQLException { + return false; + } + + @Override + public boolean supportsResultSetConcurrency(int type, int concurrency) throws SQLException { + return false; + } + + @Override + public boolean ownUpdatesAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean ownDeletesAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean ownInsertsAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean othersUpdatesAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean othersDeletesAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean othersInsertsAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean updatesAreDetected(int type) throws SQLException { + return false; + } + + @Override + public boolean deletesAreDetected(int type) throws SQLException { + return false; + } + + @Override + public boolean insertsAreDetected(int type) throws SQLException { + return false; + } + + @Override + public boolean supportsBatchUpdates() throws SQLException { + return false; + } + + @Override + public ResultSet getUDTs(String catalog, String schemaPattern, String typeNamePattern, int[] types) throws SQLException { + return null; + } + + @Override + public Connection getConnection() throws SQLException { + return null; + } + + @Override + public boolean supportsSavepoints() throws SQLException { + return false; + } + + @Override + public boolean supportsNamedParameters() throws SQLException { + return false; + } + + @Override + public boolean supportsMultipleOpenResults() throws SQLException { + return false; + } + + @Override + public boolean supportsGetGeneratedKeys() throws SQLException { + return false; + } + + @Override + public ResultSet getSuperTypes(String catalog, String schemaPattern, String typeNamePattern) throws SQLException { + return null; + } + + @Override + public ResultSet getSuperTables(String catalog, String schemaPattern, String tableNamePattern) throws SQLException { + return null; + } + + @Override + public ResultSet getAttributes(String catalog, String schemaPattern, String typeNamePattern, String attributeNamePattern) throws SQLException { + return null; + } + + @Override + public boolean supportsResultSetHoldability(int holdability) throws SQLException { + return false; + } + + @Override + public int getResultSetHoldability() throws SQLException { + return 0; + } + + @Override + public int getDatabaseMajorVersion() throws SQLException { + return 0; + } + + @Override + public int getDatabaseMinorVersion() throws SQLException { + return 0; + } + + @Override + public int getJDBCMajorVersion() throws SQLException { + return 0; + } + + @Override + public int getJDBCMinorVersion() throws SQLException { + return 0; + } + + @Override + public int getSQLStateType() throws SQLException { + return 0; + } + + @Override + public boolean locatorsUpdateCopy() throws SQLException { + return false; + } + + @Override + public boolean supportsStatementPooling() throws SQLException { + return false; + } + + @Override + public RowIdLifetime getRowIdLifetime() throws SQLException { + return null; + } + + @Override + public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLException { + return null; + } + + @Override + public boolean supportsStoredFunctionsUsingCallSyntax() throws SQLException { + return false; + } + + @Override + public boolean autoCommitFailureClosesAllResultSets() throws SQLException { + return false; + } + + @Override + public ResultSet getClientInfoProperties() throws SQLException { + return null; + } + + @Override + public ResultSet getFunctions(String catalog, String schemaPattern, String functionNamePattern) throws SQLException { + return null; + } + + @Override + public ResultSet getFunctionColumns(String catalog, String schemaPattern, String functionNamePattern, String columnNamePattern) throws SQLException { + return null; + } + + @Override + public ResultSet getPseudoColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) throws SQLException { + return null; + } + + @Override + public boolean generatedKeyAlwaysReturned() throws SQLException { + return false; + } + + @Override + public <T> T unwrap(Class<T> iface) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class<?> iface) throws SQLException { + return false; + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHDriver.java b/src/main/java/ru/yandex/metrika/clickhouse/CHDriver.java new file mode 100644 index 00000000..74c371c7 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHDriver.java @@ -0,0 +1,55 @@ +package ru.yandex.metrika.clickhouse; + +import java.sql.*; +import java.util.Properties; +import java.util.logging.Logger; + +/** + * Created by jkee on 14.03.15. + */ +public class CHDriver implements Driver { + + static { + CHDriver driver = new CHDriver(); + try { + DriverManager.registerDriver(driver); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + public Connection connect(String url, Properties info) throws SQLException { + return new CHConnection(url); + } + + @Override + public boolean acceptsURL(String url) throws SQLException { + return true; + } + + @Override + public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException { + return new DriverPropertyInfo[0]; + } + + @Override + public int getMajorVersion() { + return 0; + } + + @Override + public int getMinorVersion() { + return 0; + } + + @Override + public boolean jdbcCompliant() { + return false; + } + + @Override + public Logger getParentLogger() throws SQLFeatureNotSupportedException { + return null; + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java b/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java new file mode 100644 index 00000000..2be88235 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java @@ -0,0 +1,303 @@ +package ru.yandex.metrika.clickhouse; + +import ru.yandex.metrika.clickhouse.copypaste.*; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.nio.charset.StandardCharsets; +import java.sql.*; + +/** + * Created by jkee on 14.03.15. + */ +public class CHStatement implements Statement { + + private final CloseableHttpClient client; + + private final String url; + + private HttpConnectionProperties properties = new HttpConnectionProperties(); + + public CHStatement(CloseableHttpClient client, String url) { + this.client = client; + this.url = url; + } + + public static String clickhousifySql(String sql, String format) { + sql = sql.trim(); + if (!sql.replace(";", "").trim().endsWith(" TabSeparatedWithNamesAndTypes") + && !sql.replace(";", "").trim().endsWith(" TabSeparated") + && !sql.replace(";", "").trim().endsWith(" JSONCompact")) { + if (sql.endsWith(";")) sql = sql.substring(0, sql.length() - 1); + sql += " FORMAT " + format + ';'; + } + return sql; + } + + @Override + public ResultSet executeQuery(String sql) throws SQLException { + String csql = clickhousifySql(sql, "TabSeparatedWithNamesAndTypes"); + CountingInputStream is = getInputStream(csql); + try { + return new HttpResult(properties.isCompress() + ? new ClickhouseLZ4Stream(is) : is, is, properties.getBufferSize()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private CountingInputStream getInputStream(String sql) { + HttpPost post = new HttpPost(url); + post.setEntity(new StringEntity(sql, StandardCharsets.UTF_8)); + HttpEntity entity = null; + InputStream is = null; + try { + HttpResponse response = client.execute(post); + entity = response.getEntity(); + if (response.getStatusLine().getStatusCode() != HttpURLConnection.HTTP_OK) { + String chMessage = null; + try { + chMessage = EntityUtils.toString(response.getEntity()); + } catch (IOException e) { + chMessage = "error while read response "+ e.getMessage(); + } + EntityUtils.consumeQuietly(entity); + throw new RuntimeException("CH error: " + chMessage); + } + if (entity.isStreaming()) { + is = entity.getContent(); + } else { + FastByteArrayOutputStream baos = new FastByteArrayOutputStream(); + entity.writeTo(baos); + is = baos.convertToInputStream(); + } + return new CountingInputStream(is); + } catch (IOException e) { + EntityUtils.consumeQuietly(entity); + try { if (is != null) is.close(); } catch (IOException ignored) { } + throw new RuntimeException(e); + } + } + + @Override + public int executeUpdate(String sql) throws SQLException { + return 0; + } + + @Override + public void close() throws SQLException { + + } + + @Override + public int getMaxFieldSize() throws SQLException { + return 0; + } + + @Override + public void setMaxFieldSize(int max) throws SQLException { + + } + + @Override + public int getMaxRows() throws SQLException { + return 0; + } + + @Override + public void setMaxRows(int max) throws SQLException { + + } + + @Override + public void setEscapeProcessing(boolean enable) throws SQLException { + + } + + @Override + public int getQueryTimeout() throws SQLException { + return 0; + } + + @Override + public void setQueryTimeout(int seconds) throws SQLException { + + } + + @Override + public void cancel() throws SQLException { + + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public void clearWarnings() throws SQLException { + + } + + @Override + public void setCursorName(String name) throws SQLException { + + } + + @Override + public boolean execute(String sql) throws SQLException { + return false; + } + + @Override + public ResultSet getResultSet() throws SQLException { + return null; + } + + @Override + public int getUpdateCount() throws SQLException { + return 0; + } + + @Override + public boolean getMoreResults() throws SQLException { + return false; + } + + @Override + public void setFetchDirection(int direction) throws SQLException { + + } + + @Override + public int getFetchDirection() throws SQLException { + return 0; + } + + @Override + public void setFetchSize(int rows) throws SQLException { + + } + + @Override + public int getFetchSize() throws SQLException { + return 0; + } + + @Override + public int getResultSetConcurrency() throws SQLException { + return 0; + } + + @Override + public int getResultSetType() throws SQLException { + return 0; + } + + @Override + public void addBatch(String sql) throws SQLException { + + } + + @Override + public void clearBatch() throws SQLException { + + } + + @Override + public int[] executeBatch() throws SQLException { + return new int[0]; + } + + @Override + public Connection getConnection() throws SQLException { + return null; + } + + @Override + public boolean getMoreResults(int current) throws SQLException { + return false; + } + + @Override + public ResultSet getGeneratedKeys() throws SQLException { + return null; + } + + @Override + public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException { + return 0; + } + + @Override + public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { + return 0; + } + + @Override + public int executeUpdate(String sql, String[] columnNames) throws SQLException { + return 0; + } + + @Override + public boolean execute(String sql, int autoGeneratedKeys) throws SQLException { + return false; + } + + @Override + public boolean execute(String sql, int[] columnIndexes) throws SQLException { + return false; + } + + @Override + public boolean execute(String sql, String[] columnNames) throws SQLException { + return false; + } + + @Override + public int getResultSetHoldability() throws SQLException { + return 0; + } + + @Override + public boolean isClosed() throws SQLException { + return false; + } + + @Override + public void setPoolable(boolean poolable) throws SQLException { + + } + + @Override + public boolean isPoolable() throws SQLException { + return false; + } + + @Override + public void closeOnCompletion() throws SQLException { + + } + + @Override + public boolean isCloseOnCompletion() throws SQLException { + return false; + } + + @Override + public <T> T unwrap(Class<T> iface) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class<?> iface) throws SQLException { + return false; + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/AbstractResultSet.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/AbstractResultSet.java new file mode 100644 index 00000000..d6a40051 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/AbstractResultSet.java @@ -0,0 +1,990 @@ +package ru.yandex.metrika.clickhouse.copypaste; + +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.*; +import java.util.Calendar; +import java.util.Map; + +/** + * Unsupported chaos + * @author jkee + */ + +public abstract class AbstractResultSet implements ResultSet { + @Override + public boolean next() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean wasNull() throws SQLException { + // no nulls in clickhouse + return false; + } + + @Override + public String getString(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getBoolean(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public byte getByte(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public short getShort(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public int getInt(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public long getLong(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public float getFloat(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public double getDouble(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getBytes(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Date getDate(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Time getTime(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Timestamp getTimestamp(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public InputStream getAsciiStream(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public InputStream getUnicodeStream(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public InputStream getBinaryStream(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public String getString(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getBoolean(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public byte getByte(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public short getShort(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public int getInt(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public long getLong(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public float getFloat(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public double getDouble(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public BigDecimal getBigDecimal(String columnLabel, int scale) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getBytes(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Date getDate(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Time getTime(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Timestamp getTimestamp(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public InputStream getAsciiStream(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public InputStream getUnicodeStream(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public InputStream getBinaryStream(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public SQLWarning getWarnings() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void clearWarnings() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public String getCursorName() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public ResultSetMetaData getMetaData() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Object getObject(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Object getObject(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public int findColumn(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Reader getCharacterStream(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Reader getCharacterStream(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public BigDecimal getBigDecimal(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public BigDecimal getBigDecimal(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isBeforeFirst() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isAfterLast() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isFirst() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isLast() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void beforeFirst() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void afterLast() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean first() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean last() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public int getRow() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean absolute(int row) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean relative(int rows) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean previous() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void setFetchDirection(int direction) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public int getFetchDirection() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void setFetchSize(int rows) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public int getFetchSize() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public int getType() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public int getConcurrency() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean rowUpdated() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean rowInserted() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean rowDeleted() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateNull(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBoolean(int columnIndex, boolean x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateByte(int columnIndex, byte x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateShort(int columnIndex, short x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateInt(int columnIndex, int x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateLong(int columnIndex, long x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateFloat(int columnIndex, float x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateDouble(int columnIndex, double x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBigDecimal(int columnIndex, BigDecimal x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateString(int columnIndex, String x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBytes(int columnIndex, byte[] x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateDate(int columnIndex, Date x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateTime(int columnIndex, Time x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateTimestamp(int columnIndex, Timestamp x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateAsciiStream(int columnIndex, InputStream x, int length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBinaryStream(int columnIndex, InputStream x, int length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateCharacterStream(int columnIndex, Reader x, int length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateObject(int columnIndex, Object x, int scaleOrLength) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateObject(int columnIndex, Object x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateNull(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBoolean(String columnLabel, boolean x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateByte(String columnLabel, byte x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateShort(String columnLabel, short x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateInt(String columnLabel, int x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateLong(String columnLabel, long x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateFloat(String columnLabel, float x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateDouble(String columnLabel, double x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBigDecimal(String columnLabel, BigDecimal x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateString(String columnLabel, String x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBytes(String columnLabel, byte[] x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateDate(String columnLabel, Date x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateTime(String columnLabel, Time x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateTimestamp(String columnLabel, Timestamp x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateAsciiStream(String columnLabel, InputStream x, int length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBinaryStream(String columnLabel, InputStream x, int length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateCharacterStream(String columnLabel, Reader reader, int length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateObject(String columnLabel, Object x, int scaleOrLength) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateObject(String columnLabel, Object x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void insertRow() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateRow() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteRow() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void refreshRow() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void cancelRowUpdates() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void moveToInsertRow() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void moveToCurrentRow() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Statement getStatement() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Object getObject(int columnIndex, Map<String, Class<?>> map) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Ref getRef(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Blob getBlob(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Clob getClob(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Array getArray(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Object getObject(String columnLabel, Map<String, Class<?>> map) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Ref getRef(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Blob getBlob(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Clob getClob(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Array getArray(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Date getDate(int columnIndex, Calendar cal) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Date getDate(String columnLabel, Calendar cal) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Time getTime(int columnIndex, Calendar cal) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Time getTime(String columnLabel, Calendar cal) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Timestamp getTimestamp(int columnIndex, Calendar cal) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Timestamp getTimestamp(String columnLabel, Calendar cal) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public URL getURL(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public URL getURL(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateRef(int columnIndex, Ref x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateRef(String columnLabel, Ref x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBlob(int columnIndex, Blob x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBlob(String columnLabel, Blob x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateClob(int columnIndex, Clob x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateClob(String columnLabel, Clob x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateArray(int columnIndex, Array x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateArray(String columnLabel, Array x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public RowId getRowId(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public RowId getRowId(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateRowId(int columnIndex, RowId x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateRowId(String columnLabel, RowId x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public int getHoldability() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isClosed() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateNString(int columnIndex, String nString) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateNString(String columnLabel, String nString) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateNClob(int columnIndex, NClob nClob) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateNClob(String columnLabel, NClob nClob) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public NClob getNClob(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public NClob getNClob(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public SQLXML getSQLXML(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public SQLXML getSQLXML(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateSQLXML(int columnIndex, SQLXML xmlObject) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateSQLXML(String columnLabel, SQLXML xmlObject) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public String getNString(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public String getNString(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Reader getNCharacterStream(int columnIndex) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public Reader getNCharacterStream(String columnLabel) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateNCharacterStream(int columnIndex, Reader x, long length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateNCharacterStream(String columnLabel, Reader reader, long length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateAsciiStream(int columnIndex, InputStream x, long length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBinaryStream(int columnIndex, InputStream x, long length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateCharacterStream(int columnIndex, Reader x, long length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateAsciiStream(String columnLabel, InputStream x, long length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBinaryStream(String columnLabel, InputStream x, long length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateCharacterStream(String columnLabel, Reader reader, long length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBlob(int columnIndex, InputStream inputStream, long length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBlob(String columnLabel, InputStream inputStream, long length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateClob(int columnIndex, Reader reader, long length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateClob(String columnLabel, Reader reader, long length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateNClob(int columnIndex, Reader reader, long length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateNClob(String columnLabel, Reader reader, long length) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateNCharacterStream(int columnIndex, Reader x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateNCharacterStream(String columnLabel, Reader reader) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateAsciiStream(int columnIndex, InputStream x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBinaryStream(int columnIndex, InputStream x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateCharacterStream(int columnIndex, Reader x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateAsciiStream(String columnLabel, InputStream x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBinaryStream(String columnLabel, InputStream x) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateCharacterStream(String columnLabel, Reader reader) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBlob(int columnIndex, InputStream inputStream) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBlob(String columnLabel, InputStream inputStream) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateClob(int columnIndex, Reader reader) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateClob(String columnLabel, Reader reader) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateNClob(int columnIndex, Reader reader) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public void updateNClob(String columnLabel, Reader reader) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public <T> T unwrap(Class<T> iface) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isWrapperFor(Class<?> iface) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public <T> T getObject(int columnIndex, Class<T> type) throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public <T> T getObject(String columnLabel, Class<T> type) throws SQLException { + throw new UnsupportedOperationException(); + } + + /*default UnsignedInteger getUnsignedInteger(String columnLabel) throws SQLException { + return UnsignedInteger.valueOf(getString(columnLabel)); + } + + public UnsignedLong getUnsignedLong(String columnLabel) throws SQLException { + return UnsignedLong.valueOf(getString(columnLabel)); + } +*/ + + public long[] getLongArray(String column) throws SQLException { + Array array = getArray(column); + return (long[])array.getArray(); // оптимиÑтично + } + + + + +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/ByteFragment.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/ByteFragment.java new file mode 100644 index 00000000..32bcae87 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/ByteFragment.java @@ -0,0 +1,186 @@ +package ru.yandex.metrika.clickhouse.copypaste; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +/** + * @author orantius + * @version $Id$ + * @since 7/16/12 + */ +public class ByteFragment { + + private byte[] buf; + private int start; + private int len; + private static final ByteFragment EMPTY = new ByteFragment(new byte[0], 0, 0); + + public ByteFragment(byte[] buf, int start, int len) { + this.buf = buf; + this.start = start; + this.len = len; + } + + public String asString() { + return new String(buf, start, len, StandardCharsets.UTF_8); + } + + public String asString(boolean unescape) { + if(unescape) { + return new String(unescape(), StandardCharsets.UTF_8); + } else { + return asString(); + } + } + + @Override + public String toString() { + return "ByteFragment{" + + "buf=" + Arrays.toString(buf) + + ", start=" + start + + ", len=" + len + + '}'; + } + + public ByteFragment[] split(byte sep) { + StreamSplitter ss = new StreamSplitter(this, sep); + int c = count(sep)+1; + ByteFragment[] res = new ByteFragment[c]; + try { + int i = 0; + ByteFragment next = null; + while((next = ss.next())!=null) { + res[i++] = next; + } + } catch (IOException ignore) { + } + if(res[c-1] == null) res[c-1] = ByteFragment.EMPTY; + return res; + } + // [45, 49, 57, 52, 49, 51, 56, 48, 57, 49, 52, 9, 9, 50, 48, 49, 50, 45, 48, 55, 45, 49, 55, 32, 49, 51, 58, 49, 50, 58, 50, 49, 9, 49, 50, 49, 50, 55, 53, 53, 9, 50, 57, 57, 57, 55, 55, 57, 57, 55, 56, 9, 48, 9, 52, 48, 57, 49, 57, 55, 52, 49, 49, 51, 50, 56, 53, 53, 50, 54, 57, 51, 9, 51, 9, 54, 9, 50, 48, 9, 48, 92, 48, 9, 104, 116, 116, 112, 58, 47, 47, 119, 119, 119, 46, 97, 118, 105, 116, 111, 46, 114, 117, 47, 99, 97, 116, 97, 108, 111, 103, 47, 103, 97, 114, 97, 122, 104, 105, 95, 105, 95, 109, 97, 115, 104, 105, 110, 111, 109, 101, 115, 116, 97, 45, 56, 53, 47, 116, 97, 116, 97, 114, 115, 116, 97, 110, 45, 54, 53, 48, 49, 51, 48, 47, 112, 97, 103, 101, 56, 9, 104, 116, 116, 112, 58, 47, 47, 119, 119, 119, 46, 97, 118, 105, 116, 111, 46, 114, 117, 47, 99, 97, 116, 97, 108, 111, 103, 47, 103, 97, 114, 97, 122, 104, 105, 95, 105, 95, 109, 97, 115, 104, 105, 110, 111, 109, 101, 115, 116, 97, 45, 56, 53, 47, 116, 97, 116, 97, 114, 115, 116, 97, 110, 45, 54, 53, 48, 49, 51, 48, 47, 112, 97, 103, 101, 55, 9, 48, 9, 48, 9, 50, 56, 53, 55, 48, 56, 48, 9, 45, 49, 9, 48, 9, 9, 48, 9, 48, 9, 48, 9, 45, 49, 9, 48, 48, 48, 48, 45, 48, 48, 45, 48, 48, 32, 48, 48, 58, 48, 48, 58, 48, 48, 9, 9, 48, 9, 48, 9, 103, 9, 45, 49, 9, 45, 49, 9, 45, 49, 9] + public ByteArrayInputStream asStream() { + return new ByteArrayInputStream(buf, start, len); + } + + private int count(byte sep) { + int res = 0; + for (int i = start; i < start+len; i++) { + if (buf[i] == sep) { + res++; + } + } + return res; + } + + public int getLen() { + return len; + } + + + // 2012.07.16 17:30:29 [spring-scheduler-3] DEBUG org.apache.http.wire - << "992541[0x9]1[0x9]0[0x9]8167885961324049050[0x9]5498583669064185695[0x9]1342427653[0x9]1342428010[0x9]4[0x9]http://e.mail.ru/cgi-bin/msglist[0x9]http://sewprice.ru/catg/gpg/otparivatel-dlja-odezhdy-Comfort-NV-338-gindex-881.html[0x9]http://sewprice.ru/content/akcii-cpg-113/[0x9]http://market.yandex.ru/grade-shop.xml?shop_id=4683&cmid=771006781&retpath1=http:%2F%2Fmarket.yandex.ru%2Fshop-opinions.xml%3Fshop_id%3D4683%26cmid%3D771006781[0x9]0[0x9]1[0x9]0[0x9][0x9]0[0x9]0[0x9][0x9][0xee][0xf6][0xb][0x1][0xd5][0xa4][0x8f][0x80][0x5]\0[0x9]2012-07-16 12:34:13[\n]" -- layer:5 + // 5498583669064185695 + // ��դ��\0 [0xee][0xf6][0xb][0x1][0xd5][0xa4][0x8f][0x80][0x5]\0 + // + //int[]gg = {0xee,0xf6,0x0b,0x01,0xd5,0xa4,0x8f,0x80,0x5,0x00}; + + // "\0" => 0 + // "\r" => 13 + // "\n" => 10 + // "\\" => 92 + // "\'" => 39 + // "\b" => 8 + // "\f" => 12 + // "\t" => 9 + private static final byte[] convert = { + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1, // 0.. 9 + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1, //10..19 + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1, //20..29 + -1,-1,-1,-1,-1,-1,-1,-1,-1,39, //30..39 + -1,-1,-1,-1,-1,-1,-1,-1, 0,-1, //40..49 + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1, //50..59 + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1, //60..69 + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1, //70..79 + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1, //80..89 + -1,-1,92,-1,-1,-1,-1,-1, 8,-1, //90..99 + -1,-1,12,-1,-1,-1,-1,-1,-1,-1, //100..109 + 10,-1,-1,-1,13,-1, 9,-1,-1,-1, //110..119 + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1, //120..129 + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1, + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1, + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1, + }; + // [0xb6][0xfe][0x7][0x1][0xd8][0xd6][0x94][0x80][0x5]\0 html. + // [-74,-2,7,1,-40,-42,-108,-128,5,0] real value + // [-74,-2,7,1,-40,-42,-108,-128,5,92,48] parsed value + + public byte[] unescape() { + int resLen = 0; + { + boolean prevSlash = false; + for(int i = start; i < start + len; i++){ + if(prevSlash) { + resLen++; + prevSlash = false; + } else { + if(buf[i] == 92) { // slash character + prevSlash = true; + } else { + resLen++; + } + } + } + } + if (resLen == len) { + return getBytesCopy(); + } + byte[] res = new byte[resLen]; + int index = 0; + { + boolean prevSlash = false; + for(int i = start; i < start + len; i++){ + if(prevSlash) { + prevSlash = false; + res[index++] = convert[buf[i]]; + + } else { + if(buf[i] == 92) { // slash character + prevSlash = true; + } else { + res[index++] = buf[i]; + } + } + + } + } + return res; + } + + private byte[] getBytesCopy() { + byte[] bytes = new byte[len]; + System.arraycopy(buf, start, bytes, 0, len); + return bytes; + } + + public int length() { + return len; + } + + public int charAt(int i) { + return buf[start+i]; + } + + /** + * подÑтрока. маÑÑив не копируем, джентельмены в общий маÑÑив не пишут. + * @param start + * @param len + * @return + */ + public ByteFragment subseq(int start, int len) { + if(start < 0 || start + len > this.len) { + throw new IllegalArgumentException("arg start,len="+(start+","+len)+" while this start,len="+(this.start+","+this.len)); + } + return new ByteFragment(buf, this.start+start, len); + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/ByteFragmentUtils.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/ByteFragmentUtils.java new file mode 100644 index 00000000..705c8de0 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/ByteFragmentUtils.java @@ -0,0 +1,147 @@ +package ru.yandex.metrika.clickhouse.copypaste; + +/** + * @author orantius + * @version $Id$ + * @since 8/1/13 + */ +public final class ByteFragmentUtils { + + private ByteFragmentUtils() { + } + + public static int parseInt(ByteFragment s) throws NumberFormatException { + if (s == null) { + throw new NumberFormatException("null"); + } + + int result = 0; + boolean negative = false; + int i = 0, max = s.length(); + int limit; + int multmin; + int digit; + + if (max > 0) { + if (s.charAt(0) == '-') { + negative = true; + limit = Integer.MIN_VALUE; + i++; + } else { + limit = -Integer.MAX_VALUE; + } + multmin = limit / 10; + if (i < max) { + digit = s.charAt(i++) - 0x30; //Character.digit(s.charAt(i++), 10); + if (digit < 0) { + throw new NumberFormatException("For input string: \"" + s.asString() + '"'); + } else { + result = -digit; + } + } + while (i < max) { + // Accumulating negatively avoids surprises near MAX_VALUE + digit = s.charAt(i++) - 0x30; // Character.digit(s.charAt(i++), 10); + if (digit < 0 || digit > 9) { + throw new NumberFormatException("For input string: \"" + s.asString() + '"'); + } + if (result < multmin) { + throw new NumberFormatException("For input string: \"" + s.asString() + '"'); + } + result *= 10; + if (result < limit + digit) { + throw new NumberFormatException("For input string: \"" + s.asString() + '"'); + } + result -= digit; + } + } else { + throw new NumberFormatException("For input string: \"" + s.asString() + '"'); + } + if (negative) { + if (i > 1) { + return result; + } else { /* Only got "-" */ + throw new NumberFormatException("For input string: \"" + s.asString() + '"'); + } + } else { + return -result; + } + } + + + public static long parseLong(ByteFragment s) throws NumberFormatException { + if (s == null) { + throw new NumberFormatException("null"); + } + + long result = 0; + boolean negative = false; + int i = 0, max = s.length(); + long limit; + long multmin; + int digit; + + if (max > 0) { + if (s.charAt(0) == '-') { + negative = true; + limit = Long.MIN_VALUE; + i++; + } else { + limit = -Long.MAX_VALUE; + } + multmin = limit / 10; + if (i < max) { + digit = s.charAt(i++) - 0x30; // Character.digit(s.charAt(i++), 10); + if (digit < 0 || digit > 9) { + throw new NumberFormatException("For input string: \"" + s.asString() + '"'); + } else { + result = -digit; + } + } + while (i < max) { + // Accumulating negatively avoids surprises near MAX_VALUE + digit = s.charAt(i++) - 0x30; // Character.digit(s.charAt(i++), 10); + if (digit < 0 || digit > 9) { + throw new NumberFormatException("For input string: \"" + s.asString() + '"'); + } + if (result < multmin) { + throw new NumberFormatException("For input string: \"" + s.asString() + '"'); + } + result *= 10; + if (result < limit + digit) { + throw new NumberFormatException("For input string: \"" + s.asString() + '"'); + } + result -= digit; + } + } else { + throw new NumberFormatException("For input string: \"" + s.asString() + '"'); + } + if (negative) { + if (i > 1) { + return result; + } else { /* Only got "-" */ + throw new NumberFormatException("For input string: \"" + s.asString() + '"'); + } + } else { + return -result; + } + } + +/* //todo: может быть, Ñто медленно, и надо Ñделать как выше + public static UnsignedLong parseUnsignedLong(ByteFragment s) throws NumberFormatException { + if (s == null) { + throw new NumberFormatException("null"); + } + return UnsignedLong.valueOf(s.asString()); + } + + //todo: может быть, Ñто медленно, и надо Ñделать как выше + public static UnsignedInteger parseUnsignedInteger(ByteFragment s) throws NumberFormatException { + if (s == null) { + throw new NumberFormatException("null"); + } + return UnsignedInteger.valueOf(s.asString()); + }*/ + + +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/ClickhouseLZ4Stream.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/ClickhouseLZ4Stream.java new file mode 100644 index 00000000..20618eeb --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/ClickhouseLZ4Stream.java @@ -0,0 +1,110 @@ +package ru.yandex.metrika.clickhouse.copypaste; + +import net.jpountz.lz4.LZ4Decompressor; +import net.jpountz.lz4.LZ4Factory; + +import java.io.IOException; +import java.io.InputStream; + +/** + * CONV-8365 + * Читалка из кликхауÑа в lz4 + * @author jkee + */ + +public class ClickhouseLZ4Stream extends InputStream { + + private static final LZ4Factory factory = LZ4Factory.safeInstance(); + + private static final int MAGIC = 0x82; + + private final InputStream stream; + private final LittleEndianDataInputStream dataWrapper; + + private byte[] currentBlock; + private int pointer; + + public ClickhouseLZ4Stream(InputStream stream) { + this.stream = stream; + dataWrapper = new LittleEndianDataInputStream(stream); + } + + @Override + public int read() throws IOException { + if (!checkNext()) return -1; + byte b = currentBlock[pointer]; + pointer += 1; + return b & 0xFF; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + if (!checkNext()) return -1; + + int copied = 0; + int targetPointer = off; + while(copied != len) { + int toCopy = Math.min(currentBlock.length - pointer, len - copied); + System.arraycopy(currentBlock, pointer, b, targetPointer, toCopy); + targetPointer += toCopy; + pointer += toCopy; + copied += toCopy; + if (!checkNext()) { //закончили + return copied; + } + } + return copied; + } + + @Override + public void close() throws IOException { + stream.close(); + } + + private boolean checkNext() throws IOException { + if (currentBlock == null || pointer == currentBlock.length) { + currentBlock = readNextBlock(); + pointer = 0; + } + return currentBlock != null; + } + + //Каждый блок, Ñто: + private byte[] readNextBlock() throws IOException { + int read = stream.read(); + if (read < 0) return null; + + byte[] checksum = new byte[16]; + checksum[0] = (byte)read; + // ЧекÑумма - 16 байт. + dataWrapper.readFully(checksum, 1, 15); + // Заголовок: + // 1 байт - 0x82 (обозначает, что Ñто LZ4) + int magic = dataWrapper.readUnsignedByte(); + if (magic != MAGIC) throw new IOException("Magic is not correct: " + magic); + // 4 байта - размер Ñжатых данных Ñ ÑƒÑ‡Ñ‘Ñ‚Ð¾Ð¼ дополнительных 9 байт заголовка (compressed_size) + int compressedSizeWithHeader = dataWrapper.readInt(); + // 4 байта - размер неÑжатых данных + int uncompressedSize = dataWrapper.readInt(); + int compressedSize = compressedSizeWithHeader - 9; //header + byte[] block = new byte[compressedSize]; + // Сжатые данные: compressed_size - 9 байт. + dataWrapper.readFully(block); + + byte[] decompressed = new byte[uncompressedSize]; + LZ4Decompressor decompressor = factory.decompressor(); + decompressor.decompress(block, 0, decompressed, 0, uncompressedSize); + + return decompressed; + } + + +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CountingInputStream.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CountingInputStream.java new file mode 100644 index 00000000..b38d8808 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/CountingInputStream.java @@ -0,0 +1,87 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ru.yandex.metrika.clickhouse.copypaste; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * An {@link java.io.InputStream} that counts the number of bytes read. + * + * @author Chris Nokleberg + * @since 1.0 + */ +public final class CountingInputStream extends FilterInputStream { + + private long count; + private long mark = -1; + + /** + * Wraps another input stream, counting the number of bytes read. + * + * @param in the input stream to be wrapped + */ + public CountingInputStream(InputStream in) { + super(in); + } + + /** Returns the number of bytes read. */ + public long getCount() { + return count; + } + + @Override public int read() throws IOException { + int result = in.read(); + if (result != -1) { + count++; + } + return result; + } + + @Override public int read(byte[] b, int off, int len) throws IOException { + int result = in.read(b, off, len); + if (result != -1) { + count += result; + } + return result; + } + + @Override public long skip(long n) throws IOException { + long result = in.skip(n); + count += result; + return result; + } + + @Override public synchronized void mark(int readlimit) { + in.mark(readlimit); + mark = count; + // it's okay to mark even if mark isn't supported, as reset won't work + } + + @Override public synchronized void reset() throws IOException { + if (!in.markSupported()) { + throw new IOException("Mark not supported"); + } + if (mark == -1) { + throw new IOException("Mark not set"); + } + + in.reset(); + count = mark; + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/FastByteArrayInputStream.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/FastByteArrayInputStream.java new file mode 100644 index 00000000..254f0078 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/FastByteArrayInputStream.java @@ -0,0 +1,121 @@ +package ru.yandex.metrika.clickhouse.copypaste; + +import java.io.IOException; +import java.io.InputStream; + +/** + * ÐеÑÐ¸Ð½Ñ…Ñ€Ð¾Ð½Ð¸Ð·Ð¸Ñ€Ð¾Ð²Ð°Ð½Ð½Ð°Ñ Ð±Ñ‹ÑÑ‚Ñ€Ð°Ñ Ð²ÐµÑ€ÑÐ¸Ñ {@link java.io.ByteArrayInputStream}, Ñ Ð±Ñ€Ð¸Ð´Ð¶ÐµÐ¼ и поÑтеÑÑами + * @author Artur + * @version $Id: FastByteArrayInputStream.java 4065 2009-08-10 14:04:26Z artur $ + * @since 07.05.2008 + */ +public final class FastByteArrayInputStream extends InputStream { + private final byte[] buf; + + private int pos; + + private final int count; + + public FastByteArrayInputStream(byte[] buf) { + this.buf = buf; + pos = 0; + count = buf.length; + } + + /** + * Специальный конÑтруктор Ð´Ð»Ñ ÑÐ¾Ð·Ð´Ð°Ð½Ð¸Ñ InputStream поверх не до конца заполненного маÑÑива + * @param buf МаÑÑив байт + * @param count Кол-во заполненных Ñл-тов маÑÑива + */ + public FastByteArrayInputStream(byte[] buf, int count) { + this.buf = buf; + pos = 0; + this.count = count; + } + + + @Override + public int read() { + return pos < count ? buf[pos++] & 0xff : -1; + } + + + @Override + public int read(byte[] b, int off, int len) { + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + if (pos >= count) { + return -1; + } + if (pos + len > count) { + //noinspection AssignmentToMethodParameter + len = count - pos; + } + if (len <= 0) { + return 0; + } + System.arraycopy(buf, pos, b, off, len); + pos += len; + return len; + } + + + @Override + public long skip(long n) { + if (pos + n > count) { + //noinspection AssignmentToMethodParameter + n = count - pos; + } + if (n < 0) { + return 0; + } + pos += (int) n; + return n; + } + + + @Override + public int available() { + return count - pos; + } + + @Override + public boolean markSupported() { + return false; + } + + + @Override + public void close() throws IOException { + } + + public int getPos() { + return pos; + } + + public int getCount() { + return count; + } + + public byte[] getBuf() { + return buf; + } + + public byte[] getData() { + if (buf.length > count) { + byte[] data = new byte[count]; + System.arraycopy(buf, 0, data, 0, count); + return data; + } else { + return buf; + } + } + + @Override + public void reset() { + pos = 0; + } + + +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/FastByteArrayOutputStream.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/FastByteArrayOutputStream.java new file mode 100644 index 00000000..b1927e46 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/FastByteArrayOutputStream.java @@ -0,0 +1,179 @@ +package ru.yandex.metrika.clickhouse.copypaste; + + +import java.io.DataOutput; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; + +/** + * ÐеÑÐ¸Ð½Ñ…Ñ€Ð¾Ð½Ð¸Ð·Ð¸Ñ€Ð¾Ð²Ð°Ð½Ð½Ð°Ñ Ð±Ñ‹ÑÑ‚Ñ€Ð°Ñ Ð²ÐµÑ€ÑÐ¸Ñ {@link java.io.ByteArrayOutputStream} + * @author Artur + * @version $Id: FastByteArrayOutputStream.java 5083 2009-11-11 12:46:49Z dedmajor $ + * @since 07.05.2008 + */ +public final class FastByteArrayOutputStream extends OutputStream { + + /** + * The buffer where data is stored. + */ + private byte[] buf; + + /** + * The number of valid bytes in the buffer. + */ + private int count; + + /** + * Creates a new byte array output stream. The buffer capacity is + * initially 32 bytes, though its size increases if necessary. + */ + public FastByteArrayOutputStream() { + this(1024); + } + + /** + * Creates a new byte array output stream, with a buffer capacity of + * the specified size, in bytes. + * + * @param size the initial size. + * @exception IllegalArgumentException if size is negative. + */ + public FastByteArrayOutputStream(int size) { + super(); + if (size < 0) { + throw new IllegalArgumentException("Negative initial size: " + + size); + } + buf = new byte[size]; + } + + private int ensureCapacity(int datalen) { + int newcount = count + datalen; + if (newcount > buf.length) { + buf = Arrays.copyOf(buf, Math.max(buf.length << 1, newcount)); + } + return newcount; + } + + + /** + * Writes the specified byte to this byte array output stream. + * + * @param b the byte to be written. + */ + @Override + public void write(int b) { + int newcount = ensureCapacity(1); + buf[count] = (byte)b; + count = newcount; + } + + /** + * Writes <code>len</code> bytes from the specified byte array + * starting at offset <code>off</code> to this byte array output stream. + * + * @param b the data. + * @param off the start offset in the data. + * @param len the number of bytes to write. + */ + @Override + public void write(byte[] b, int off, int len) { + if (off < 0 || off > b.length || len < 0 || off + len > b.length || off + len < 0) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return; + } + int newcount = ensureCapacity(len); + System.arraycopy(b, off, buf, count, len); + count = newcount; + } + + + + /** + * Возвращает напрÑмую внутренний маÑÑив + * + * @return the current contents of this output stream, as a byte array. + */ + public byte[] toByteArray() { + byte[] result = new byte[count]; + System.arraycopy(buf, 0, result, 0, count); + return result; + } + + public void writeTo(OutputStream output) throws IOException { + output.write(buf, 0, count); + } + + /** + * Returns the current size of the buffer. + * + * @return the value of the <code>count</code> field, which is the number + * of valid bytes in this output stream. + */ + public int size() { + return count; + } + + + /** + * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods in + * this class can be called after the stream has been closed without + * generating an <tt>IOException</tt>. + * <p> + * + */ + @Override + public void close() throws IOException { + } + + /** + * Копирует данные из Input потока + * @param source Поток-иÑточник данных + * @param offset Смещение от начала данных в иÑточнике + * @param count Кол-во байт к копированию + */ + public void copyFrom(FastByteArrayInputStream source, int offset, int count) { + if (offset + count > source.getCount()) { + throw new IndexOutOfBoundsException( + "Trying to copy data past the end of source" + + ", source.size=" + source.getCount() + + ", offset=" + offset + ", count=" + count + ); + } + byte[] srcBuf = source.getBuf(); + write(srcBuf, offset, count); + } + + public void copyTo(OutputStream dest) throws IOException { + dest.write(buf, 0, count); + } + + public void copyTo(DataOutput dest) throws IOException { + dest.write(buf, 0, count); + } + + /** + * Создает InputStream на оÑнове тех же данных, которые уже запиÑаны в Ñтот поток, + * без ÐºÐ¾Ð¿Ð¸Ñ€Ð¾Ð²Ð°Ð½Ð¸Ñ Ð´Ð°Ð½Ð½Ñ‹Ñ… в памÑти. + */ + public FastByteArrayInputStream convertToInputStream() { + return new FastByteArrayInputStream(buf, count); + } + + public ByteBuffer toByteBuffer() { + return ByteBuffer.wrap(buf, 0, count); + } + + public byte[] getBuffer() { + return buf; + } + + + public void reset() { + count = 0; + } +} + diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/HttpConnectionProperties.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/HttpConnectionProperties.java new file mode 100644 index 00000000..7f73092d --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/HttpConnectionProperties.java @@ -0,0 +1,137 @@ +package ru.yandex.metrika.clickhouse.copypaste; + +/** + * User: hamilkar + * Date: 10/17/13 + * Time: 2:48 PM + */ +public class HttpConnectionProperties { + + // ÐаÑтройки кликхауÑа + + /** + * profile=web&sign_rewrite=0 + * Ðа Ñтороне clickhouse Ñделаны Ð¾Ð³Ñ€Ð°Ð½Ð¸Ñ‡ÐµÐ½Ð¸Ñ Ð½Ð° запроÑÑ‹. + * https://svn.yandex.ru/websvn/wsvn/conv/trunk/metrica/src/dbms/src/Server/config.conf + */ + private String profile; + private boolean compress = true; + // asynchronous=0&max_threads=1 + private boolean async; + private Integer maxThreads; + private Integer maxBlockSize; + + private int bufferSize = 65536; + private int apacheBufferSize = 65536; + + //наÑтройки Ð´Ð»Ñ Ð´ÐµÐ¼Ð¾Ð½Ð¾Ð² + private int socketTimeout = 30000; + private int connectionTimeout = 50; + + //METR-9568: параметр user Ð´Ð»Ñ Ð¾Ð¿Ñ€ÐµÐ´ÐµÐ»ÐµÐ½Ð¸Ñ Ð¿Ñ€Ð¾Ñ„Ð¸Ð»Ñ Ð½Ð°Ñтроек(?). + private String user = null; + + /* + * Ñто таймаут на передачу данных. + * ЧиÑло socketTimeout + dataTransferTimeout отправлÑетÑÑ Ð² clickhouse в параметре max_execution_time + * ПоÑле чего ÐºÐ»Ð¸ÐºÑ…Ð°ÑƒÑ Ñам оÑтанавливает Ð·Ð°Ð¿Ñ€Ð¾Ñ ÐµÑли Ð²Ñ€ÐµÐ¼Ñ ÐµÐ³Ð¾ Ð²Ñ‹Ð¿Ð¾Ð»Ð½ÐµÐ½Ð¸Ñ Ð¿Ñ€ÐµÐ²Ñ‹ÑˆÐ°ÐµÑ‚ max_execution_time + * */ + private int dataTransferTimeout = 10000; + private int keepAliveTimeout = 30 * 1000; + + public String getProfile() { + return profile; + } + + public void setProfile(String profile) { + this.profile = profile; + } + + public boolean isCompress() { + return compress; + } + + public void setCompress(boolean compress) { + this.compress = compress; + } + + public boolean isAsync() { + return async; + } + + public void setAsync(boolean async) { + this.async = async; + } + + public Integer getMaxThreads() { + return maxThreads; + } + + public void setMaxThreads(Integer maxThreads) { + this.maxThreads = maxThreads; + } + + public Integer getMaxBlockSize() { + return maxBlockSize; + } + + public void setMaxBlockSize(Integer maxBlockSize) { + this.maxBlockSize = maxBlockSize; + } + + public int getBufferSize() { + return bufferSize; + } + + public void setBufferSize(int bufferSize) { + this.bufferSize = bufferSize; + } + + public int getApacheBufferSize() { + return apacheBufferSize; + } + + public void setApacheBufferSize(int apacheBufferSize) { + this.apacheBufferSize = apacheBufferSize; + } + + public int getSocketTimeout() { + return socketTimeout; + } + + public void setSocketTimeout(int socketTimeout) { + this.socketTimeout = socketTimeout; + } + + public int getConnectionTimeout() { + return connectionTimeout; + } + + public void setConnectionTimeout(int connectionTimeout) { + this.connectionTimeout = connectionTimeout; + } + + public int getDataTransferTimeout() { + return dataTransferTimeout; + } + + public void setDataTransferTimeout(int dataTransferTimeout) { + this.dataTransferTimeout = dataTransferTimeout; + } + + public int getKeepAliveTimeout() { + return keepAliveTimeout; + } + + public void setKeepAliveTimeout(int keepAliveTimeout) { + this.keepAliveTimeout = keepAliveTimeout; + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/HttpResult.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/HttpResult.java new file mode 100644 index 00000000..3f43b0d6 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/HttpResult.java @@ -0,0 +1,417 @@ +package ru.yandex.metrika.clickhouse.copypaste; + +import java.io.IOException; +import java.io.InputStream; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** + * формат полей. + * 0000-00-00 00:00:00 - timestamp + * \0 - null? + * <p/> + * запроÑÑ‹, которые работают + * select * from WatchLog_Chunk_2012071003020404100 limit 10 + * show tables + * + * @author orantius + * @version $Id$ + * @since 7/12/12 + */ +public class HttpResult extends AbstractResultSet { + + private ByteFragment nextLine; + private final StreamSplitter bis; + + private final Map<String, Integer> col = new HashMap<String, Integer>(); // column name -> 1-based index + private final String[] columns; + private final String[] types; + // number of characters read from is stream + private long bytes = 0; + private final CountingInputStream cis; + + private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // + + private ByteFragment[] values; + + public HttpResult(InputStream is, CountingInputStream cis, int bufferSize) throws IOException { + this.cis = cis; + bis = new StreamSplitter(is, (byte) 0x0A, bufferSize); /// \n + ByteFragment headerFragment = bis.next(); + if (headerFragment == null) { + throw new IllegalArgumentException("ru.yandex.metrika.clickhouse response without column names"); + } + String header = headerFragment.asString(); + if (header.startsWith("Code: ") && !header.contains("\t")) { + is.close(); + throw new IOException("Clickhouse error: " + header); + } + columns = Patterns.TAB.split(header); + ByteFragment typesFragment = bis.next(); + if (typesFragment == null) { + throw new IllegalArgumentException("ru.yandex.metrika.clickhouse response without column types"); + } + types = Patterns.TAB.split(typesFragment.asString()); + + for (int i = 0; i < columns.length; i++) { + String s = columns[i]; + col.put(s, i + 1); + } + } + + public boolean hasNext() throws SQLException { + if (nextLine == null) { + try { + nextLine = bis.next(); + if (nextLine == null || nextLine.length() == 0) { + bis.close(); + } else { + bytes += nextLine.getLen(); + } + } catch (IOException e) { + throw new SQLException(e); + } + } + return nextLine != null && nextLine.length() > 0; + } + @Override + public boolean next() throws SQLException { + if (hasNext()) { + values = nextLine.split((byte) 0x09); + nextLine = null; + return true; + } else return false; + } + + @Override + public void close() throws SQLException { + try { + bis.close(); + } catch (IOException e) { + throw new SQLException(e); + } + } + + ///////////////////////////////////////////////////////// + + String[] getTypes() { + return types; + } + + public String[] getColumnNames() { + return columns; + } + + Map<String, Integer> getCol() { + return col; + } + + @Override + public ResultSetMetaData getMetaData() throws SQLException { + return new ResultSetMetaData() { + @Override + public int getColumnCount() throws SQLException { + return columns.length; + } + + @Override + public boolean isAutoIncrement(int column) throws SQLException { + return false; + } + + @Override + public boolean isCaseSensitive(int column) throws SQLException { + return true; + } + + @Override + public boolean isSearchable(int column) throws SQLException { + return true; + } + + @Override + public boolean isCurrency(int column) throws SQLException { + return false; + } + + @Override + public int isNullable(int column) throws SQLException { + return columnNoNulls; + } + + @Override + public boolean isSigned(int column) throws SQLException { + return !types[column - 1].startsWith("U"); + } + + @Override + public int getColumnDisplaySize(int column) throws SQLException { + return 80; + } + + @Override + public String getColumnLabel(int column) throws SQLException { + return columns[column - 1]; + } + + @Override + public String getColumnName(int column) throws SQLException { + return columns[column - 1]; + } + + @Override + public String getSchemaName(int column) throws SQLException { + return ""; + } + + @Override + public int getPrecision(int column) throws SQLException { + return 0; + } + + @Override + public int getScale(int column) throws SQLException { + return 0; + } + + @Override + public String getTableName(int column) throws SQLException { + throw new UnsupportedOperationException("table name unknown at this stage"); + } + + @Override + public String getCatalogName(int column) throws SQLException { + throw new UnsupportedOperationException("catalog name unknown at this stage"); + } + + @Override + public int getColumnType(int column) throws SQLException { + return toSqlType(getColumnTypeName(column)); + } + + @Override + public String getColumnTypeName(int column) throws SQLException { + if (types.length < column) { + throw new ArrayIndexOutOfBoundsException("Array length: " + types.length + " requested: " + (column - 1)); + } + return types[column - 1]; + } + + @Override + public boolean isReadOnly(int column) throws SQLException { + return true; + } + + @Override + public boolean isWritable(int column) throws SQLException { + return false; + } + + @Override + public boolean isDefinitelyWritable(int column) throws SQLException { + return false; + } + + @Override + public String getColumnClassName(int column) throws SQLException { + throw new UnsupportedOperationException("no classes for now"); + } + + @Override + public <T> T unwrap(Class<T> iface) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class<?> iface) throws SQLException { + return false; + } + }; + } + + + ///////////////////////////////////////////////////////// + + + @Override + public boolean wasNull() throws SQLException { + return super.wasNull(); + } + + @Override + public int getInt(String column) { + return getInt(asColNum(column)); + } + + @Override + public boolean getBoolean(String column) { + return getBoolean(asColNum(column)); + } + + @Override + public long getLong(String column) { + return getLong(asColNum(column)); + } + + @Override + public String getString(String column) { + return getString(asColNum(column)); + } + + @Override + public byte[] getBytes(String column) { + return getBytes(asColNum(column)); + } + + public long getTimestampAsLong(String column) { + return getTimestampAsLong(asColNum(column)); + } + + @Override + public Timestamp getTimestamp(String column) throws SQLException { + return new Timestamp(getTimestampAsLong(column)); + } + + @Override + public short getShort(String column) { + return getShort(asColNum(column)); + } + + @Override + public byte getByte(String column) { + return getByte(asColNum(column)); + } + + @Override + public long[] getLongArray(String column) { + return getLongArray(asColNum(column)); + } + + + ///////////////////////////////////////////////////////// + + @Override + public String getString(int colNum) { + return toString(values[colNum - 1]); + } + + @Override + public int getInt(int colNum) { + return ByteFragmentUtils.parseInt(values[colNum - 1]); + } + + @Override + public boolean getBoolean(int colNum) { + return toBoolean(values[colNum - 1]); + } + + @Override + public long getLong(int colNum) { + return ByteFragmentUtils.parseLong(values[colNum - 1]); + } + + @Override + public byte[] getBytes(int colNum) { + return toBytes(values[colNum - 1]); + } + + public long getTimestampAsLong(int colNum) { + return toTimestamp(values[colNum - 1]); + } + + @Override + public Timestamp getTimestamp(int columnIndex) throws SQLException { + return new Timestamp(getTimestampAsLong(columnIndex)); + } + + @Override + public short getShort(int colNum) { + return toShort(values[colNum - 1]); + } + + @Override + public byte getByte(int colNum) { + return toByte(values[colNum - 1]); + } + + public long[] getLongArray(int colNum) { + return toLongArray(values[colNum - 1]); + } + + + ///////////////////////////////////////////////////////// + + private static byte toByte(ByteFragment value) { + return Byte.parseByte(value.asString()); + } + + private static short toShort(ByteFragment value) { + return Short.parseShort(value.asString()); + } + + private static boolean toBoolean(ByteFragment value) { + return "1".equals(value.asString()); //вроде бы там 1/0 + } + + private static byte[] toBytes(ByteFragment value) { + return value.unescape(); + } + + private static String toString(ByteFragment value) { + return value.asString(true); + } + + private static long[] toLongArray(ByteFragment value) { + if (value.charAt(0) != '[' || value.charAt(value.length()-1) != ']') { + throw new IllegalArgumentException("not an array: "+value); + } + ByteFragment trim = value.subseq(1, value.length() - 2); + ByteFragment[] values = trim.split((byte) ','); + long[] result = new long[values.length]; + for (int i = 0; i < values.length; i++) { + result[i] = ByteFragmentUtils.parseLong(values[i]); + } + return result; + } + + private static long toTimestamp(ByteFragment value) { + try { + return sdf.parse(value.asString()).getTime(); + } catch (ParseException e) { + return 0; + } + } + + // 1-based insex in column list + private int asColNum(String column) { + if (col.containsKey(column)) { + return col.get(column); + } else { + throw new RuntimeException("no column " + column + " in columns list " + Arrays.toString(getColumnNames())); + } + } + + private int toSqlType(String type) { + + if (type.startsWith("Int") || type.startsWith("UInt")) { + if (type.endsWith("64")) return Types.BIGINT; + else return Types.INTEGER; + } + if ("String".equals(type)) return Types.VARCHAR; + if (type.startsWith("Float")) return Types.FLOAT; + if ("Date".equals(type)) return Types.DATE; + if ("DateTime".equals(type)) return Types.TIMESTAMP; + if ("FixedString".equals(type)) return Types.BLOB; + + // don't know what to return actually + return Types.VARCHAR; + + } +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/LittleEndianDataInputStream.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/LittleEndianDataInputStream.java new file mode 100644 index 00000000..bcd71ce5 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/LittleEndianDataInputStream.java @@ -0,0 +1,265 @@ +/* + * Copyright (C) 2007 The Guava Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ru.yandex.metrika.clickhouse.copypaste; + +import java.io.*; + +/** + * An implementation of {@link java.io.DataInput} that uses little-endian byte ordering + * for reading {@code short}, {@code int}, {@code float}, {@code double}, and + * {@code long} values. + * <p> + * <b>Note:</b> This class intentionally violates the specification of its + * supertype {@code DataInput}, which explicitly requires big-endian byte order. + * + * @author Chris Nokleberg + * @author Keith Bottner + * @since 8.0 + */ +public final class LittleEndianDataInputStream extends FilterInputStream + implements DataInput { + + /** + * Creates a {@code LittleEndianDataInputStream} that wraps the given stream. + * + * @param in the stream to delegate to + */ + public LittleEndianDataInputStream(InputStream in) { + super(in); + } + + /** + * This method will throw an {@link UnsupportedOperationException}. + */ + @Override + public String readLine() { + throw new UnsupportedOperationException("readLine is not supported"); + } + + @Override + public void readFully(byte[] b) throws IOException { + readFully(this, b); + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + readFully(this, b, off, len); + } + + public static void readFully(InputStream in, byte[] b) throws IOException { + readFully(in, b, 0, b.length); + } + + public static void readFully(InputStream in, byte[] b, int off, int len) + throws IOException { + if (read(in, b, off, len) != len) { + throw new EOFException(); + } + } + + + public static int read(InputStream in, byte[] b, int off, int len) + throws IOException { + if (len < 0) { + throw new IndexOutOfBoundsException("len is negative"); + } + int total = 0; + while (total < len) { + int result = in.read(b, off + total, len - total); + if (result == -1) { + break; + } + total += result; + } + return total; + } + + @Override + public int skipBytes(int n) throws IOException { + return (int) in.skip(n); + } + + @Override + public int readUnsignedByte() throws IOException { + int b1 = in.read(); + if (0 > b1) { + throw new EOFException(); + } + + return b1; + } + + /** + * Reads an unsigned {@code short} as specified by + * {@link java.io.DataInputStream#readUnsignedShort()}, except using little-endian + * byte order. + * + * @return the next two bytes of the input stream, interpreted as an + * unsigned 16-bit integer in little-endian byte order + * @throws java.io.IOException if an I/O error occurs + */ + @Override + public int readUnsignedShort() throws IOException { + byte b1 = readAndCheckByte(); + byte b2 = readAndCheckByte(); + + return fromBytes((byte) 0, (byte) 0, b2, b1); + } + + /** + * Reads an integer as specified by {@link java.io.DataInputStream#readInt()}, except + * using little-endian byte order. + * + * @return the next four bytes of the input stream, interpreted as an + * {@code int} in little-endian byte order + * @throws java.io.IOException if an I/O error occurs + */ + @Override + public int readInt() throws IOException { + byte b1 = readAndCheckByte(); + byte b2 = readAndCheckByte(); + byte b3 = readAndCheckByte(); + byte b4 = readAndCheckByte(); + + return fromBytes( b4, b3, b2, b1); + } + + public static int fromBytes(byte b1, byte b2, byte b3, byte b4) { + return b1 << 24 | (b2 & 0xFF) << 16 | (b3 & 0xFF) << 8 | (b4 & 0xFF); + } + + /** + * Reads a {@code long} as specified by {@link java.io.DataInputStream#readLong()}, + * except using little-endian byte order. + * + * @return the next eight bytes of the input stream, interpreted as a + * {@code long} in little-endian byte order + * @throws java.io.IOException if an I/O error occurs + */ + @Override + public long readLong() throws IOException { + byte b1 = readAndCheckByte(); + byte b2 = readAndCheckByte(); + byte b3 = readAndCheckByte(); + byte b4 = readAndCheckByte(); + byte b5 = readAndCheckByte(); + byte b6 = readAndCheckByte(); + byte b7 = readAndCheckByte(); + byte b8 = readAndCheckByte(); + + return fromBytes(b8, b7, b6, b5, b4, b3, b2, b1); + } + + public static long fromBytes(byte b1, byte b2, byte b3, byte b4, + byte b5, byte b6, byte b7, byte b8) { + return (b1 & 0xFFL) << 56 + | (b2 & 0xFFL) << 48 + | (b3 & 0xFFL) << 40 + | (b4 & 0xFFL) << 32 + | (b5 & 0xFFL) << 24 + | (b6 & 0xFFL) << 16 + | (b7 & 0xFFL) << 8 + | (b8 & 0xFFL); + } + + /** + * Reads a {@code float} as specified by {@link java.io.DataInputStream#readFloat()}, + * except using little-endian byte order. + * + * @return the next four bytes of the input stream, interpreted as a + * {@code float} in little-endian byte order + * @throws java.io.IOException if an I/O error occurs + */ + @Override + public float readFloat() throws IOException { + return Float.intBitsToFloat(readInt()); + } + + /** + * Reads a {@code double} as specified by + * {@link java.io.DataInputStream#readDouble()}, except using little-endian byte + * order. + * + * @return the next eight bytes of the input stream, interpreted as a + * {@code double} in little-endian byte order + * @throws java.io.IOException if an I/O error occurs + */ + @Override + public double readDouble() throws IOException { + return Double.longBitsToDouble(readLong()); + } + + @Override + public String readUTF() throws IOException { + return new DataInputStream(in).readUTF(); + } + + /** + * Reads a {@code short} as specified by {@link java.io.DataInputStream#readShort()}, + * except using little-endian byte order. + * + * @return the next two bytes of the input stream, interpreted as a + * {@code short} in little-endian byte order. + * @throws java.io.IOException if an I/O error occurs. + */ + @Override + public short readShort() throws IOException { + return (short) readUnsignedShort(); + } + + /** + * Reads a char as specified by {@link java.io.DataInputStream#readChar()}, except + * using little-endian byte order. + * + * @return the next two bytes of the input stream, interpreted as a + * {@code char} in little-endian byte order + * @throws java.io.IOException if an I/O error occurs + */ + @Override + public char readChar() throws IOException { + return (char) readUnsignedShort(); + } + + @Override + public byte readByte() throws IOException { + return (byte) readUnsignedByte(); + } + + @Override + public boolean readBoolean() throws IOException { + return readUnsignedByte() != 0; + } + + /** + * Reads a byte from the input stream checking that the end of file (EOF) + * has not been encountered. + * + * @return byte read from input + * @throws java.io.IOException if an error is encountered while reading + * @throws java.io.EOFException if the end of file (EOF) is encountered. + */ + private byte readAndCheckByte() throws IOException, EOFException { + int b1 = in.read(); + + if (-1 == b1) { + throw new EOFException(); + } + + return (byte) b1; + } + +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/Patterns.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/Patterns.java new file mode 100644 index 00000000..9b524998 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/Patterns.java @@ -0,0 +1,45 @@ +package ru.yandex.metrika.clickhouse.copypaste; + +import java.util.regex.Pattern; + +/** + * ДоÑтали регулÑрные Ð²Ñ‹Ñ€Ð°Ð¶ÐµÐ½Ð¸Ñ Ñ‚ÑƒÑ‚ и там? + * Рдавайте Ñделаем еще один утилитный клаÑÑ, а? + * + * @author lopashev + * @since 18.07.14 + */ +public final class Patterns { + + private Patterns() { + } + + public static final Pattern COMMA = Pattern.compile(","); + + public static final Pattern COLON = Pattern.compile(":"); + + public static final Pattern SEMICOLON = Pattern.compile(";"); + + public static final Pattern DOT = Pattern.compile("\\."); + + public static final Pattern HYPHEN = Pattern.compile("-"); + + public static final Pattern SLASH = Pattern.compile("/"); + + public static final Pattern TAB = Pattern.compile("\t"); + + public static final Pattern NEWLINE = Pattern.compile("\n"); + + public static final Pattern SPACE = Pattern.compile(" "); + + public static final Pattern PIPE = Pattern.compile("\\|"); + + public static final Pattern SMILE = Pattern.compile("☺"); + + public static final Pattern COMMA_SPACE = Pattern.compile(", "); + + public static final Pattern UNDERSCORE = Pattern.compile("_"); + + public static final Pattern COLON_DSLASH_WWW = Pattern.compile("://www."); + +} diff --git a/src/main/java/ru/yandex/metrika/clickhouse/copypaste/StreamSplitter.java b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/StreamSplitter.java new file mode 100644 index 00000000..b017a878 --- /dev/null +++ b/src/main/java/ru/yandex/metrika/clickhouse/copypaste/StreamSplitter.java @@ -0,0 +1,137 @@ +package ru.yandex.metrika.clickhouse.copypaste; + +import java.io.IOException; +import java.io.InputStream; + +/** + * что мы тут делаем. + * на вход приезжает поток байт и разделитель + * на выходе выезжает некоторое количеÑтво байтовых маÑÑивов, + * которые в оригинальном потоке были разделены разделителем + * + * делаетÑÑ Ñто путем Ñ‡Ñ‚ÐµÐ½Ð¸Ñ Ð¸Ð· потока данных в буфер и выдачи их из буфера. + * @author orantius + * @version $Id$ + * @since 7/16/12 + */ +public class StreamSplitter { + private static final int buflen = 65536; + + // начальные параметры + private final InputStream delegate; + private final byte sep; + + private byte[] buf; + // позициÑ, до которой buf заполнен прочтенным из delegate + private int posRead; + // позициÑ, до которой из buf уже отдано наружу фрагментов через next() + private int posNext; + + // флаг который Ñимволизирует, что из потока надо прочитать один раз вÑÑŽ длни буфера, и больше читать не надо. + private boolean readOnce; + + + public StreamSplitter(ByteFragment bf, byte sep) { + this.delegate = bf.asStream(); + this.sep = sep; + buf = new byte[bf.getLen()]; + readOnce = true; + } + + public StreamSplitter(InputStream delegate, byte sep, int buflen) { + this.delegate = delegate; + this.sep = sep; + buf = new byte[buflen]; + } + + public StreamSplitter(InputStream delegate, byte sep) { + this(delegate,sep, buflen); + } + + public ByteFragment next() throws IOException { + // еÑли заÑлали наружу вÑе что прочитали из потока + if (posNext >= posRead) { + // надо прочитать из потока еще + int readBytes = readFromStream(); + if(readBytes <= 0) { + // еÑли вÑе отдали, и из потока больше не читаетÑÑ - то не отдаем ничего + return null; + } + } + // ищем в прочитанном разделитель + int positionSep; + while((positionSep = indexOf(buf, sep, posNext, posRead)) < posNext) { + // пока не нашли разделитель надо прочитать из потока еще + int readBytes = readFromStream(); + if(readBytes <= 0) { + // еÑли уже ничего не читаетÑÑ - отдаем веÑÑŒ хвоÑÑ‚ как результат. + positionSep = posRead; + break; + /*int fragmentStart = posNext; + posNext = positionSep+1; + System.out.println("return "+(positionSep-fragmentStart)+" bytes as next() "); + return new ByteFragment(buf, fragmentStart, positionSep-fragmentStart); */ + } + } + // еÑли нашли разделитель - отдаем куÑок. + int fragmentStart = posNext; + posNext = positionSep+1; + // System.out.println("return "+(positionSep-fragmentStart)+" bytes as next() "); + return new ByteFragment(buf, fragmentStart, positionSep-fragmentStart); + } + + // еÑли в прочитанном но не отправленном куÑке данных нет Ñ€Ð°Ð·Ð´ÐµÐ»Ð¸Ñ‚ÐµÐ»Ñ - читаем из потока еще данных + protected int readFromStream() throws IOException { + if (readOnce) { + if (posRead >= buf.length) { + return -1; + } else { + int read = delegate.read(buf, posRead, buf.length - posRead); + if(read > 0) + posRead += read; + return read; + } + } else { + if (posRead >= buf.length) { // буфер закончилÑÑ + shiftOrResize(); + } + // еÑли буфер не заполнен до конца + int read = delegate.read(buf, posRead, buf.length - posRead); + //System.out.println("read "+read+" bytes from stream"); + if(read > 0) + posRead += read; + return read; + } + } + + // еÑли поток дочитали до конца буфера - надо Ñоздать новый буфер и передвинуть данные на уже отправленную величину. + // еÑли отправленных данных нет, а буфер вÑе равно дочитан до конца - надо увеличить размер буфера. + private void shiftOrResize() { + if(posNext > 0) { + // System.out.println("shift "+posNext+" bytes"); + byte[] oldBuf = buf; + buf = new byte[buf.length]; + System.arraycopy(oldBuf, posNext, buf, 0, oldBuf.length-posNext); + posRead -= posNext; + posNext = 0; + } else { + //System.out.println("double size"); + byte[] oldBuf = buf; + buf = new byte[buf.length*2]; + System.arraycopy(oldBuf, 0, buf, 0, oldBuf.length); + } + } + + private static int indexOf(byte[] array, byte target, int start, int end) { + for (int i = start; i < end; i++) { + if (array[i] == target) { + return i; + } + } + return -1; + } + + public void close() throws IOException { + delegate.close(); + } +} -- GitLab