Merge remote-tracking branch 'origin/main'

This commit is contained in:
Kanthi Subramanian 2022-04-05 11:03:57 -04:00
commit 82092270f1
6 changed files with 177 additions and 33 deletions

3
deploy/docker/start-remove.sh Executable file
View File

@ -0,0 +1,3 @@
#!/bin/bash
docker-compose up --remove-orphans --force-recreate --renew-anon-volumes

3
deploy/docker/start.sh Executable file
View File

@ -0,0 +1,3 @@
#!/bin/bash
docker-compose up --remove-orphans

View File

@ -6,5 +6,5 @@ curl -H "Accept:application/json" 127.0.0.1:18083 2>/dev/null | jq .
echo "Connectors:"
curl -H "Accept:application/json" 127.0.0.1:18083/connectors/ 2>/dev/null | jq .
echo "Test connector status:"
curl -X GET -H "Accept:application/json" localhost:18083/connectors/test-connector 2>/dev/null | jq .
echo "Sink connector status:"
curl -X GET -H "Accept:application/json" localhost:18083/connectors/sink-connector 2>/dev/null | jq .

View File

@ -1,5 +1,6 @@
package com.altinity.clickhouse.sink.connector;
import com.altinity.clickhouse.sink.connector.deduplicator.DeDuplicator;
import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchExecutor;
import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable;
import com.altinity.clickhouse.sink.connector.converters.ClickHouseConverter;
@ -41,6 +42,8 @@ public class ClickHouseSinkTask extends SinkTask {
private ClickHouseBatchRunnable runnable;
private ConcurrentLinkedQueue<Struct> records;
private DeDuplicator deduplicator;
@Override
public void start(Map<String, String> config) {
this.id = config.getOrDefault(Const.TASK_ID, "-1");
@ -52,6 +55,8 @@ public class ClickHouseSinkTask extends SinkTask {
this.executor = new ClickHouseBatchExecutor(2);
this.executor.scheduleAtFixedRate(this.runnable, 0, 30, TimeUnit.SECONDS);
this.deduplicator = new DeDuplicator();
/*
@ -99,7 +104,14 @@ public class ClickHouseSinkTask extends SinkTask {
ClickHouseConverter converter = new ClickHouseConverter();
BufferedRecords br = new BufferedRecords();
for (SinkRecord record : records) {
this.records.add(converter.convert(record));
if (this.deduplicator.isNew(record)) {
Struct c = converter.convert(record);
if (c != null) {
this.records.add(c);
}
} else {
log.info("skip already seen record: " + record);
}
}
}

View File

@ -41,30 +41,108 @@ public class ClickHouseConverter implements AbstractConverter {
}
/**
* Struct{
* before=Struct{
* id=1,
* message=Hello from MySQL
* },
* after=Struct{
* id=1,
* message=Mysql update
* },
* source=Struct{
* version=1.8.1.Final,
* connector=mysql,
* name=local_mysql3,
* ts_ms=1648575279000,
* snapshot=false,
* db=test,
* table=test_hello2,
* server_id=1,
* file=binlog.000002,
* pos=4414,
* row=0
* },
* op=u,
* ts_ms=1648575279856
* SinkRecord
*
* SinkRecord{
* kafkaOffset=300023,
* timestampType=CreateTime
* }
* ConnectRecord{
* topic='SERVER5432.test.employees',
* kafkaPartition=0,
* key=Struct{
* emp_no=499999
* },
* keySchema=Schema{
* SERVER5432.test.employees.Key:STRUCT
* },
* value=Struct{
* after=Struct{
* emp_no=499999,
* birth_date=-4263,
* first_name=Sachin,
* last_name=Tsukuda,
* gender=M,
* hire_date=10195
* },
* source=Struct{
* version=1.9.0.CR1,
* connector=mysql,
* name=SERVER5432,
* ts_ms=1649152583000,
* snapshot=false,
* db=test,
* table=employees,
* server_id=1,
* file=binlog.000002,
* pos=8249512,
* row=104,
* thread=13
* },
* op=c,
* ts_ms=1649152741745
* },
* valueSchema=Schema{
* SERVER5432.test.employees.Envelope:STRUCT
* },
* timestamp=1649152746408,
* headers=ConnectHeaders(headers=)
* }
*
* Value struct
* CREATE
* value=Struct{
* after=Struct{
* emp_no=499999,
* birth_date=-4263,
* first_name=Sachin,
* last_name=Tsukuda,
* gender=M,
* hire_date=10195
* },
* source=Struct{
* version=1.9.0.CR1,
* connector=mysql,
* name=SERVER5432,
* ts_ms=1649152583000,
* snapshot=false,
* db=test,
* table=employees,
* server_id=1,
* file=binlog.000002,
* pos=8249512,
* row=104,
* thread=13
* },
* op=c,
* ts_ms=1649152741745
* },
*
* UPDATE
* value=Struct{
* before=Struct{
* id=1,
* message=Hello from MySQL
* },
* after=Struct{
* id=1,
* message=Mysql update
* },
* source=Struct{
* version=1.8.1.Final,
* connector=mysql,
* name=local_mysql3,
* ts_ms=1648575279000,
* snapshot=false,
* db=test,
* table=test_hello2,
* server_id=1,
* file=binlog.000002,
* pos=4414,
* row=0
* },
* op=u,
* ts_ms=1648575279856
* }
*/
@ -80,6 +158,7 @@ public class ClickHouseConverter implements AbstractConverter {
Map<String, Object> convertedValue = convertValue(record);
Struct afterRecord = null;
// Check "operation" represented by this record.
if (convertedValue.containsKey("op")) {
// Operation (u, c)
String operation = (String) convertedValue.get("op");
@ -92,6 +171,7 @@ public class ClickHouseConverter implements AbstractConverter {
}
}
// Check "after" value represented by this record.
if (convertedValue.containsKey("after")) {
Struct afterValue = (Struct) convertedValue.get("after");
List<Field> fields = afterValue.schema().fields();
@ -100,14 +180,13 @@ public class ClickHouseConverter implements AbstractConverter {
List<Object> values = new ArrayList<Object>();
List<Schema.Type> types = new ArrayList<Schema.Type>();
for (Field f : fields) {
log.info("Key" + f.name());
log.info("Value" + afterValue.get(f));
for (Field field : fields) {
log.info("Key" + field.name());
log.info("Value" + afterValue.get(field));
cols.add(f.name());
values.add(afterValue.get(f));
cols.add(field.name());
values.add(afterValue.get(field));
}
}
//ToDO: Remove the following code after evaluating

View File

@ -0,0 +1,47 @@
package com.altinity.clickhouse.sink.connector.deduplicator;
import com.altinity.clickhouse.sink.connector.converters.ClickHouseConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class DeDuplicator {
private static final Logger log = LoggerFactory.getLogger(DeDuplicator.class);
private Map<Object, Object> records;
private LinkedList<Object> queue;
private int maxPoolSize = 10;
public DeDuplicator() {
this.records = new HashMap<Object, Object>();
this.queue = new LinkedList<Object>();
}
public boolean isNew(SinkRecord record) {
if (this.records.containsKey(record.key())) {
log.warn("already seen this key:" + record.key());
return false;
}
log.debug("add new key to the pool:" + record.key());
this.records.put(record.key(), record);
this.queue.add(record.key());
while (this.queue.size() > this.maxPoolSize) {
log.info("records pool is too big, need to flush:" + this.queue.size());
Object key = this.queue.removeFirst();
if (key == null) {
log.warn("unable to removeFirst() in the queue");
} else {
this.records.remove(key);
log.info("removed key: " + key);
}
}
return true;
}
}