diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java b/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java index bd41cf461d0f3c628e3a0caffa1095274d70cd60..d14364c7fb6cb0ac960fc6253b7c1c5cb5b90560 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHStatement.java @@ -2,6 +2,7 @@ package ru.yandex.metrika.clickhouse; import ru.yandex.metrika.clickhouse.copypaste.ClickhouseResponse; +import java.io.InputStream; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -16,4 +17,5 @@ public interface CHStatement extends Statement { ClickhouseResponse executeQueryClickhouseResponse(String sql, Map<String, String> additionalDBParams) throws SQLException; ClickhouseResponse executeQueryClickhouseResponse(String sql, Map<String, String> additionalDBParams, boolean ignoreDatabase) throws SQLException; ResultSet executeQuery(String sql, Map<String, String> additionalDBParams) throws SQLException; + void sendStream(InputStream content, String table) throws SQLException; } diff --git a/src/main/java/ru/yandex/metrika/clickhouse/CHStatementImpl.java b/src/main/java/ru/yandex/metrika/clickhouse/CHStatementImpl.java index 707e46d2b3f94025b7927be4db74b53ed0eb5202..0a98e0ff7e378e9d4377bf07f24886cac448d88f 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/CHStatementImpl.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/CHStatementImpl.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.InputStreamEntity; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.util.EntityUtils; @@ -441,6 +442,35 @@ public class CHStatementImpl implements CHStatement { } } + public void sendStream(InputStream content, String table) throws CHException { + // echo -ne '10\n11\n12\n' | POST 'http://localhost:8123/?query=INSERT INTO t FORMAT TabSeparated' + HttpEntity entity = null; + try { + URI uri = new URI("http", null, source.getHost(), source.getPort(), + "/", (CopypasteUtils.isEmpty(source.getDatabase()) ? "" : "database=" + source.getDatabase() + '&') + + "query=INSERT INTO " + table + " FORMAT TabSeparated", null); + HttpPost httpPost = new HttpPost(uri); + httpPost.setEntity(new InputStreamEntity(content, -1)); + HttpResponse response = client.execute(httpPost); + entity = response.getEntity(); + if (response.getStatusLine().getStatusCode() != HttpURLConnection.HTTP_OK) { + String chMessage; + try { + chMessage = EntityUtils.toString(response.getEntity()); + } catch (IOException e) { + chMessage = "error while read response "+ e.getMessage(); + } + throw ClickhouseExceptionSpecifier.specify(chMessage, source.getHost(), source.getPort()); + } + } catch (CHException e) { + throw e; + } catch (Exception e) { + throw ClickhouseExceptionSpecifier.specify(e, source.getHost(), source.getPort()); + } finally { + EntityUtils.consumeQuietly(entity); + } + } + public Map<String, String> getParams(boolean ignoreDatabase) { Map<String, String> params = new HashMap<String, String>(); //в clickhouse бывают таблички без базы (т.е. в базе default) diff --git a/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java b/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java index 69d1753c7cb1878bff86cf0378e900c585db9af5..681c6e19503aeacd93dc20c90adc428865606275 100644 --- a/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java +++ b/src/main/java/ru/yandex/metrika/clickhouse/util/CopypasteUtils.java @@ -125,6 +125,10 @@ public class CopypasteUtils { return true; } + public static boolean isEmpty(String str) { + return str == null || str.isEmpty(); + } + public static String join(final Iterable<?> iterable, final char separator) { Iterator<?> iterator = iterable.iterator();