From 09bf233675590652073c9fe2ae0765178597d09a Mon Sep 17 00:00:00 2001 From: serebrserg <serebrserg@yandex-team.ru> Date: Thu, 24 Mar 2016 15:17:20 +0300 Subject: [PATCH] METR-20494: add sendStream method for data load --- .../metrika/clickhouse/CHStatement.java | 2 ++ .../metrika/clickhouse/CHStatementImpl.java | 30 +++++++++++++++++++ .../clickhouse/util/CopypasteUtils.java | 4 +++ 3 files changed, 36 insertions(+) diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java b/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java index bd41cf46..d14364c7 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java @@ -2,6 +2,7 @@ package ru.yandex.metrika.clickhouse; import ru.yandex.metrika.clickhouse.copypaste.ClickhouseResponse; +import java.io.InputStream; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -16,4 +17,5 @@ public interface CHStatement extends Statement { ClickhouseResponse executeQueryClickhouseResponse(String sql, Map<String, String> additionalDBParams) throws SQLException; ClickhouseResponse executeQueryClickhouseResponse(String sql, Map<String, String> additionalDBParams, boolean ignoreDatabase) throws SQLException; ResultSet executeQuery(String sql, Map<String, String> additionalDBParams) throws SQLException; + void sendStream(InputStream content, String table) throws SQLException; } diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHStatementImpl.java b/src/main/java/ru/yandex/metrika/clickhouse/CHStatementImpl.java index 707e46d2..0a98e0ff 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHStatementImpl.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHStatementImpl.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.InputStreamEntity; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; @@ -441,6 +442,35 @@ public class CHStatementImpl implements CHStatement { } } + public void sendStream(InputStream content, String table) throws CHException { + // echo -ne '10\n11\n12\n' | POST 'http://localhost:8123/?query=INSERT INTO t FORMAT TabSeparated' + HttpEntity entity = null; + try { + URI uri = new URI("http", null, source.getHost(), source.getPort(), + "/", (CopypasteUtils.isEmpty(source.getDatabase()) ? "" : "database=" + source.getDatabase() + '&') + + "query=INSERT INTO " + table + " FORMAT TabSeparated", null); + HttpPost httpPost = new HttpPost(uri); + httpPost.setEntity(new InputStreamEntity(content, -1)); + HttpResponse response = client.execute(httpPost); + entity = response.getEntity(); + if (response.getStatusLine().getStatusCode() != HttpURLConnection.HTTP_OK) { + String chMessage; + try { + chMessage = EntityUtils.toString(response.getEntity()); + } catch (IOException e) { + chMessage = "error while read response "+ e.getMessage(); + } + throw ClickhouseExceptionSpecifier.specify(chMessage, source.getHost(), source.getPort()); + } + } catch (CHException e) { + throw e; + } catch (Exception e) { + throw ClickhouseExceptionSpecifier.specify(e, source.getHost(), source.getPort()); + } finally { + EntityUtils.consumeQuietly(entity); + } + } + public Map<String, String> getParams(boolean ignoreDatabase) { Map<String, String> params = new HashMap<String, String>(); //в clickhouse бывают таблички без базы (т.е. в базе default) diff --git a/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java b/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java index 69d1753c..681c6e19 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java @@ -125,6 +125,10 @@ public class CopypasteUtils { return true; } + public static boolean isEmpty(String str) { + return str == null || str.isEmpty(); + } + public static String join(final Iterable<?> iterable, final char separator) { Iterator<?> iterator = iterable.iterator(); -- GitLab