From 2a0431614d96333f61f4f4983a32db24d1c43a87 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 4 Apr 2022 09:41:54 -0400 Subject: [PATCH] Added logic to create a new thread to handle flushing of records to Clickhouse --- .../sink/connector/ClickHouseSinkTask.java | 25 ++++-- .../converters/ClickHouseConverter.java | 79 +++++++++++++----- .../sink/connector/db/DbWriter.java | 83 +++++++++++-------- .../executor/ClickHouseBatchExecutor.java | 10 +++ .../executor/ClickHouseBatchRunnable.java | 26 ++++++ 5 files changed, 164 insertions(+), 59 deletions(-) create mode 100644 src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchExecutor.java create mode 100644 src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java b/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java index 7758aff..0e6627c 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java @@ -1,9 +1,10 @@ package com.altinity.clickhouse.sink.connector; -import com.altinity.clickhouse.sink.connector.converters.ClickHouseConverter; -import com.clickhouse.client.*; +import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchExecutor; +import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; @@ -13,6 +14,8 @@ import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; /** *

Creates sink service instance, takes records loaded from those @@ -33,11 +36,22 @@ public class ClickHouseSinkTask extends SinkTask { return; } + private ClickHouseBatchExecutor executor; + private ClickHouseBatchRunnable runnable; + private ConcurrentLinkedQueue records; + @Override public void start(Map config) { this.id = config.getOrDefault(Const.TASK_ID, "-1"); final long count = Long.parseLong(config.get(ClickHouseSinkConnectorConfigVariables.BUFFER_COUNT)); log.info("start({}):{}", this.id, count); + + this.records = new ConcurrentLinkedQueue(); + this.executor = new ClickHouseBatchExecutor(2); + this.runnable = new ClickHouseBatchRunnable(this.records); + + this.executor.scheduleAtFixedRate(this.runnable, 0, 30, TimeUnit.SECONDS); + /* @@ -81,10 +95,11 @@ public class ClickHouseSinkTask extends SinkTask { @Override public void put(Collection records) { - log.info("out({}):{}", this.id, records.size()); + com.kafka.connect.clickhouse.converters.ClickHouseConverter converter = new com.kafka.connect.clickhouse.converters.ClickHouseConverter(); + log.debug("CLICKHOUSE received records" + records.size()); BufferedRecords br = new BufferedRecords(); - for (SinkRecord record : records) { - new ClickHouseConverter().convert(record); + for (SinkRecord record: records) { + this.records.add(converter.convert(record)); } } diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseConverter.java b/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseConverter.java index 757de4a..f7711a5 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseConverter.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseConverter.java @@ -1,14 +1,15 @@ -package com.altinity.clickhouse.sink.connector.converters; +package com.kafka.connect.clickhouse.converters; -import com.altinity.clickhouse.sink.connector.db.DbWriter; +import com.altinity.clickhouse.sink.connector.converters.AbstractConverter; import com.altinity.clickhouse.sink.connector.metadata.KafkaSchemaRecordType; -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.json.JsonConverter; +import org.apache.kafka.connect.data.*; import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.data.Struct; + +import org.apache.kafka.connect.json.JsonConverter; + import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; @@ -16,9 +17,31 @@ import java.util.List; import java.util.Map; public class ClickHouseConverter implements AbstractConverter { + + /** + * + */ + public enum CDC_OPERATION { + // Sql updates + UPDATE("U"), + // Inserts + CREATE("C"), + + DELETE("D"); + + private String operation; + CDC_OPERATION(String op) { + this.operation = op; + } + } @Override public Map convertKey(SinkRecord record) { + /** + * Struct{before=Struct{id=1,message=Hello from MySQL}, + * after=Struct{id=1,message=Mysql update},source=Struct{version=1.8.1.Final,connector=mysql, + * name=local_mysql3,ts_ms=1648575279000,snapshot=false,db=test,table=test_hello2,server_id=1,file=binlog.000002,pos=4414,row=0},op=u,ts_ms=1648575279856} + */ KafkaSchemaRecordType recordType = KafkaSchemaRecordType.KEY; Schema kafkaConnectSchema = recordType == KafkaSchemaRecordType.KEY ? record.keySchema() : record.valueSchema(); Object kafkaConnectStruct = recordType == KafkaSchemaRecordType.KEY ? record.key() : record.value(); @@ -74,42 +97,56 @@ public class ClickHouseConverter implements AbstractConverter { return result; } - public void convert(SinkRecord record) { + /** + * Primary functionality of parsing a CDC event in a SinkRecord. + * This checks the operation flag( if its 'C' or 'U') + * @param record + */ + public Struct convert(SinkRecord record) { Map convertedKey = convertKey(record); Map convertedValue = convertValue(record); - System.out.println("Converted Key"); - System.out.println("Converted Value"); - - if (convertedValue.containsKey("after")) { - Struct afterValue = (Struct) convertedValue.get("after"); - List fields = afterValue.schema().fields(); - System.out.println("DONE"); + Struct afterRecord = null; + if(convertedValue.containsKey("op")) { + // Operation (u, c) + String operation = (String) convertedValue.get("op"); + if (operation.equalsIgnoreCase(CDC_OPERATION.CREATE.operation)) { + // Inserts. + } else if(operation.equalsIgnoreCase(CDC_OPERATION.UPDATE.operation)) { + // Updates. + } else if(operation.equalsIgnoreCase(CDC_OPERATION.DELETE.operation)) { + // Deletes. + } + } + if(convertedValue.containsKey("after")) { + afterRecord = (Struct) convertedValue.get("after"); + List fields = afterRecord.schema().fields(); List cols = new ArrayList(); List values = new ArrayList(); - for (Field f : fields) { + List types = new ArrayList(); + for(Field f: fields) { System.out.println("Key" + f.name()); cols.add(f.name()); - System.out.println("Value" + afterValue.get(f)); - values.add(afterValue.get(f)); + System.out.println("Value"+ afterRecord.get(f)); + values.add(afterRecord.get(f)); } - DbWriter writer = new DbWriter(); - //writer.insert(record.topic(), String.join(' ', cols.), String.join(' ', values)); - } + //ToDO: Remove the following code after evaluating try { byte[] rawJsonPayload = new JsonConverter().fromConnectData(record.topic(), record.valueSchema(), record.value()); String stringPayload = new String(rawJsonPayload, StandardCharsets.UTF_8); System.out.println("STRING PAYLOAD" + stringPayload); - } catch (Exception e) { + } catch(Exception e) { } + + return afterRecord; } private Map convertStruct(Object kafkaConnectObject, Schema kafkaConnectSchema) { diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java b/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java index eb92908..b86f037 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java @@ -11,8 +11,11 @@ import org.apache.kafka.connect.data.Struct; import io.debezium.time.Date; import java.sql.PreparedStatement; +import java.util.Iterator; import java.util.List; import java.util.Properties; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; /** * Class that abstracts all functionality @@ -27,7 +30,7 @@ public class DbWriter { */ public DbWriter() { //ToDo: Read from Config - String url = "jdbc:ch://localhost/test"; + String url = "jdbc:ch://localhost/default"; String clientName = "Agent_1"; String userName = "admin"; String password = "root"; @@ -86,47 +89,61 @@ public class DbWriter { } /** - * Function where the Kafka connect data types - * are mapped to Clickhouse data types and a batch insert is performed. - * @param table Table Name - * @param afterValue after Value (With Insert: before is always empty_ - * @param fields Kafka connect fields + * + * @param records */ - public void insert(String table, Struct afterValue, List fields){ + public void insert(ConcurrentLinkedQueue records){ + + String table = "employees"; + + if(records.isEmpty()) { + System.out.println("No Records to process"); + return; + } + // Get the first record to get length of columns + Struct peekRecord = records.peek(); + String insertQueryTemplate = this.getInsertQuery(table, peekRecord.schema().fields().size()); - table = "test"; - String insertQueryTemplate = this.getInsertQuery(table, fields.size()); try (PreparedStatement ps = this.conn.prepareStatement(insertQueryTemplate)) { - int index = 1; - for(Field f: fields) { - Schema.Type fieldType = f.schema().type(); - Object value = afterValue.get(f); + Iterator iterator = records.iterator(); + while(iterator.hasNext()) { + Struct afterRecord = (Struct) iterator.next(); - // Text columns - if(fieldType == Schema.Type.STRING) { + List fields = afterRecord.schema().fields(); - ps.setString(index, (String) value); - } - else if(fieldType == Schema.INT8_SCHEMA.type() || - fieldType == Schema.INT16_SCHEMA.type() || - fieldType == Schema.INT32_SCHEMA.type()) { - if(f.schema().name() == Date.SCHEMA_NAME) { - // Date field arrives as INT32 with schema name set to io.debezium.time.Date - ps.setDate(index, (java.sql.Date) value); - } else { - ps.setInt(index, (Integer) value); + int index = 1; + for(Field f: fields) { + Schema.Type fieldType = f.schema().type(); + String schemaName = f.schema().name(); + Object value = afterRecord.get(f); + + // Text columns + if (fieldType == Schema.Type.STRING) { + + ps.setString(index, (String) value); + } else if (fieldType == Schema.INT8_SCHEMA.type() || + fieldType == Schema.INT16_SCHEMA.type() || + fieldType == Schema.INT32_SCHEMA.type()) { + if (schemaName != null && schemaName.equalsIgnoreCase(Date.SCHEMA_NAME)) { + // Date field arrives as INT32 with schema name set to io.debezium.time.Date + long msSinceEpoch = TimeUnit.DAYS.toMillis((Integer) value); + java.util.Date date = new java.util.Date(msSinceEpoch); + java.sql.Date sqlDate = new java.sql.Date(date.getTime()); + ps.setDate(index, sqlDate); + } else { + ps.setInt(index, (Integer) value); + } + } else if (fieldType == Schema.FLOAT32_SCHEMA.type() || + fieldType == Schema.FLOAT64_SCHEMA.type()) { + ps.setFloat(index, (Float) value); + } else if (fieldType == Schema.BOOLEAN_SCHEMA.type()) { + ps.setBoolean(index, (Boolean) value); } - } else if(fieldType == Schema.FLOAT32_SCHEMA.type() || - fieldType == Schema.FLOAT64_SCHEMA.type()) { - ps.setFloat(index, (Float) value); - } else if(fieldType == Schema.BOOLEAN_SCHEMA.type()) { - ps.setBoolean(index, (Boolean) value); + index++; } - index++; - + ps.addBatch(); // append parameters to the query } - ps.addBatch(); // append parameters to the query ps.executeBatch(); // issue the composed query: insert into mytable values(...)(...)...(...) } catch(Exception e) { diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchExecutor.java b/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchExecutor.java new file mode 100644 index 0000000..82425a4 --- /dev/null +++ b/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchExecutor.java @@ -0,0 +1,10 @@ +package com.altinity.clickhouse.sink.connector.executor; + +import java.util.concurrent.ScheduledThreadPoolExecutor; + +public class ClickHouseBatchExecutor extends ScheduledThreadPoolExecutor { + + public ClickHouseBatchExecutor(int corePoolSize) { + super(corePoolSize); + } +} diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java b/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java new file mode 100644 index 0000000..e0f44fb --- /dev/null +++ b/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java @@ -0,0 +1,26 @@ +package com.altinity.clickhouse.sink.connector.executor; + +import com.altinity.clickhouse.sink.connector.db.DbWriter; +import org.apache.kafka.connect.data.Struct; + +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * Runnable object that will be called on + * a schedule to perform the batch insert of + * records to Clickhouse. + */ +public class ClickHouseBatchRunnable implements Runnable{ + + private ConcurrentLinkedQueue records; + public ClickHouseBatchRunnable(ConcurrentLinkedQueue records) { + this.records = records; + } + + @Override + public void run() { + System.out.println("*************** BULK INSERT TO CLICKHOUSE **************"); + DbWriter writer = new DbWriter(); + writer.insert(this.records); + } +}