Added logic to create a new thread to handle flushing of records to Clickhouse

This commit is contained in:
Kanthi Subramanian 2022-04-04 09:41:54 -04:00
parent 245d80ebb6
commit 2a0431614d
5 changed files with 164 additions and 59 deletions

View File

@ -1,9 +1,10 @@
package com.altinity.clickhouse.sink.connector;
import com.altinity.clickhouse.sink.connector.converters.ClickHouseConverter;
import com.clickhouse.client.*;
import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchExecutor;
import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
@ -13,6 +14,8 @@ import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
/**
* <p>Creates sink service instance, takes records loaded from those
@ -33,11 +36,22 @@ public class ClickHouseSinkTask extends SinkTask {
return;
}
private ClickHouseBatchExecutor executor;
private ClickHouseBatchRunnable runnable;
private ConcurrentLinkedQueue<Struct> records;
@Override
public void start(Map<String, String> config) {
this.id = config.getOrDefault(Const.TASK_ID, "-1");
final long count = Long.parseLong(config.get(ClickHouseSinkConnectorConfigVariables.BUFFER_COUNT));
log.info("start({}):{}", this.id, count);
this.records = new ConcurrentLinkedQueue();
this.executor = new ClickHouseBatchExecutor(2);
this.runnable = new ClickHouseBatchRunnable(this.records);
this.executor.scheduleAtFixedRate(this.runnable, 0, 30, TimeUnit.SECONDS);
/*
@ -81,10 +95,11 @@ public class ClickHouseSinkTask extends SinkTask {
@Override
public void put(Collection<SinkRecord> records) {
log.info("out({}):{}", this.id, records.size());
com.kafka.connect.clickhouse.converters.ClickHouseConverter converter = new com.kafka.connect.clickhouse.converters.ClickHouseConverter();
log.debug("CLICKHOUSE received records" + records.size());
BufferedRecords br = new BufferedRecords();
for (SinkRecord record : records) {
new ClickHouseConverter().convert(record);
for (SinkRecord record: records) {
this.records.add(converter.convert(record));
}
}

View File

@ -1,14 +1,15 @@
package com.altinity.clickhouse.sink.connector.converters;
package com.kafka.connect.clickhouse.converters;
import com.altinity.clickhouse.sink.connector.db.DbWriter;
import com.altinity.clickhouse.sink.connector.converters.AbstractConverter;
import com.altinity.clickhouse.sink.connector.metadata.KafkaSchemaRecordType;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.data.*;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
@ -16,9 +17,31 @@ import java.util.List;
import java.util.Map;
public class ClickHouseConverter implements AbstractConverter {
/**
*
*/
public enum CDC_OPERATION {
// Sql updates
UPDATE("U"),
// Inserts
CREATE("C"),
DELETE("D");
private String operation;
CDC_OPERATION(String op) {
this.operation = op;
}
}
@Override
public Map<String, Object> convertKey(SinkRecord record) {
/**
* Struct{before=Struct{id=1,message=Hello from MySQL},
* after=Struct{id=1,message=Mysql update},source=Struct{version=1.8.1.Final,connector=mysql,
* name=local_mysql3,ts_ms=1648575279000,snapshot=false,db=test,table=test_hello2,server_id=1,file=binlog.000002,pos=4414,row=0},op=u,ts_ms=1648575279856}
*/
KafkaSchemaRecordType recordType = KafkaSchemaRecordType.KEY;
Schema kafkaConnectSchema = recordType == KafkaSchemaRecordType.KEY ? record.keySchema() : record.valueSchema();
Object kafkaConnectStruct = recordType == KafkaSchemaRecordType.KEY ? record.key() : record.value();
@ -74,42 +97,56 @@ public class ClickHouseConverter implements AbstractConverter {
return result;
}
public void convert(SinkRecord record) {
/**
* Primary functionality of parsing a CDC event in a SinkRecord.
* This checks the operation flag( if its 'C' or 'U')
* @param record
*/
public Struct convert(SinkRecord record) {
Map<String, Object> convertedKey = convertKey(record);
Map<String, Object> convertedValue = convertValue(record);
System.out.println("Converted Key");
System.out.println("Converted Value");
if (convertedValue.containsKey("after")) {
Struct afterValue = (Struct) convertedValue.get("after");
List<Field> fields = afterValue.schema().fields();
System.out.println("DONE");
Struct afterRecord = null;
if(convertedValue.containsKey("op")) {
// Operation (u, c)
String operation = (String) convertedValue.get("op");
if (operation.equalsIgnoreCase(CDC_OPERATION.CREATE.operation)) {
// Inserts.
} else if(operation.equalsIgnoreCase(CDC_OPERATION.UPDATE.operation)) {
// Updates.
} else if(operation.equalsIgnoreCase(CDC_OPERATION.DELETE.operation)) {
// Deletes.
}
}
if(convertedValue.containsKey("after")) {
afterRecord = (Struct) convertedValue.get("after");
List<Field> fields = afterRecord.schema().fields();
List<String> cols = new ArrayList<String>();
List<Object> values = new ArrayList<Object>();
for (Field f : fields) {
List<Schema.Type> types = new ArrayList<Schema.Type>();
for(Field f: fields) {
System.out.println("Key" + f.name());
cols.add(f.name());
System.out.println("Value" + afterValue.get(f));
values.add(afterValue.get(f));
System.out.println("Value"+ afterRecord.get(f));
values.add(afterRecord.get(f));
}
DbWriter writer = new DbWriter();
//writer.insert(record.topic(), String.join(' ', cols.), String.join(' ', values));
}
//ToDO: Remove the following code after evaluating
try {
byte[] rawJsonPayload = new JsonConverter().fromConnectData(record.topic(), record.valueSchema(), record.value());
String stringPayload = new String(rawJsonPayload, StandardCharsets.UTF_8);
System.out.println("STRING PAYLOAD" + stringPayload);
} catch (Exception e) {
} catch(Exception e) {
}
return afterRecord;
}
private Map<String, Object> convertStruct(Object kafkaConnectObject, Schema kafkaConnectSchema) {

View File

@ -11,8 +11,11 @@ import org.apache.kafka.connect.data.Struct;
import io.debezium.time.Date;
import java.sql.PreparedStatement;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
/**
* Class that abstracts all functionality
@ -27,7 +30,7 @@ public class DbWriter {
*/
public DbWriter() {
//ToDo: Read from Config
String url = "jdbc:ch://localhost/test";
String url = "jdbc:ch://localhost/default";
String clientName = "Agent_1";
String userName = "admin";
String password = "root";
@ -86,47 +89,61 @@ public class DbWriter {
}
/**
* Function where the Kafka connect data types
* are mapped to Clickhouse data types and a batch insert is performed.
* @param table Table Name
* @param afterValue after Value (With Insert: before is always empty_
* @param fields Kafka connect fields
*
* @param records
*/
public void insert(String table, Struct afterValue, List<Field> fields){
public void insert(ConcurrentLinkedQueue<Struct> records){
String table = "employees";
if(records.isEmpty()) {
System.out.println("No Records to process");
return;
}
// Get the first record to get length of columns
Struct peekRecord = records.peek();
String insertQueryTemplate = this.getInsertQuery(table, peekRecord.schema().fields().size());
table = "test";
String insertQueryTemplate = this.getInsertQuery(table, fields.size());
try (PreparedStatement ps = this.conn.prepareStatement(insertQueryTemplate)) {
int index = 1;
for(Field f: fields) {
Schema.Type fieldType = f.schema().type();
Object value = afterValue.get(f);
Iterator iterator = records.iterator();
while(iterator.hasNext()) {
Struct afterRecord = (Struct) iterator.next();
// Text columns
if(fieldType == Schema.Type.STRING) {
List<Field> fields = afterRecord.schema().fields();
ps.setString(index, (String) value);
}
else if(fieldType == Schema.INT8_SCHEMA.type() ||
fieldType == Schema.INT16_SCHEMA.type() ||
fieldType == Schema.INT32_SCHEMA.type()) {
if(f.schema().name() == Date.SCHEMA_NAME) {
// Date field arrives as INT32 with schema name set to io.debezium.time.Date
ps.setDate(index, (java.sql.Date) value);
} else {
ps.setInt(index, (Integer) value);
int index = 1;
for(Field f: fields) {
Schema.Type fieldType = f.schema().type();
String schemaName = f.schema().name();
Object value = afterRecord.get(f);
// Text columns
if (fieldType == Schema.Type.STRING) {
ps.setString(index, (String) value);
} else if (fieldType == Schema.INT8_SCHEMA.type() ||
fieldType == Schema.INT16_SCHEMA.type() ||
fieldType == Schema.INT32_SCHEMA.type()) {
if (schemaName != null && schemaName.equalsIgnoreCase(Date.SCHEMA_NAME)) {
// Date field arrives as INT32 with schema name set to io.debezium.time.Date
long msSinceEpoch = TimeUnit.DAYS.toMillis((Integer) value);
java.util.Date date = new java.util.Date(msSinceEpoch);
java.sql.Date sqlDate = new java.sql.Date(date.getTime());
ps.setDate(index, sqlDate);
} else {
ps.setInt(index, (Integer) value);
}
} else if (fieldType == Schema.FLOAT32_SCHEMA.type() ||
fieldType == Schema.FLOAT64_SCHEMA.type()) {
ps.setFloat(index, (Float) value);
} else if (fieldType == Schema.BOOLEAN_SCHEMA.type()) {
ps.setBoolean(index, (Boolean) value);
}
} else if(fieldType == Schema.FLOAT32_SCHEMA.type() ||
fieldType == Schema.FLOAT64_SCHEMA.type()) {
ps.setFloat(index, (Float) value);
} else if(fieldType == Schema.BOOLEAN_SCHEMA.type()) {
ps.setBoolean(index, (Boolean) value);
index++;
}
index++;
ps.addBatch(); // append parameters to the query
}
ps.addBatch(); // append parameters to the query
ps.executeBatch(); // issue the composed query: insert into mytable values(...)(...)...(...)
} catch(Exception e) {

View File

@ -0,0 +1,10 @@
package com.altinity.clickhouse.sink.connector.executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
public class ClickHouseBatchExecutor extends ScheduledThreadPoolExecutor {
public ClickHouseBatchExecutor(int corePoolSize) {
super(corePoolSize);
}
}

View File

@ -0,0 +1,26 @@
package com.altinity.clickhouse.sink.connector.executor;
import com.altinity.clickhouse.sink.connector.db.DbWriter;
import org.apache.kafka.connect.data.Struct;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Runnable object that will be called on
* a schedule to perform the batch insert of
* records to Clickhouse.
*/
public class ClickHouseBatchRunnable implements Runnable{
private ConcurrentLinkedQueue<Struct> records;
public ClickHouseBatchRunnable(ConcurrentLinkedQueue<Struct> records) {
this.records = records;
}
@Override
public void run() {
System.out.println("*************** BULK INSERT TO CLICKHOUSE **************");
DbWriter writer = new DbWriter();
writer.insert(this.records);
}
}