clickhouse-sink-connector/doc/mutable_data.md
2022-08-02 17:41:38 -04:00

5.5 KiB

CollapsingMergeTree

Sink Connector will attempt to read the engine_full column from system.tables for the corresponding table and will identify the engine and the sign column.

CollapsingMergeTree(sign) PRIMARY KEY productCode ORDER BY productCode SETTINGS index_granularity = 8192

Updates:

For inserts, record will be inserted with sign set to 1 For updates, before value will be inserted with sign set to -1 and after value will be inserted with sign set to 1

\

When optimize table <table_name> final of select .. final is performed and when the merges are performed by ClickHouse in the background, the initial insert record will be merged along the before record.

Updates on non primary keys: Debezium

Non Primary key updates create a record with operation as 'u'

SinkRecord{kafkaOffset=62984, timestampType=CreateTime} ConnectRecord{topic='SERVER5432.sbtest.sbtest1', kafkaPartition=0, key=Struct{id=2317,k=3739}, keySchema=Schema{SERVER5432.sbtest.sbtest1.Key:STRUCT}, value=Struct{before=Struct{id=2317,k=3739,c=20488251985-66135155553-00362235007-72249840112-70784105787-84584360668-65106023418-49140058226-99031281108-48426083028,pad=18846546959-44726413785-66695616247-63594911107-83062207348},after=Struct{id=2317,k=3739,c=20488251985-66135155553-00362235007-72249840112-70784105787-84584360668-65106023418-49140058226-99031281108-48426083029,pad=18846546959-44726413785-66695616247-63594911107-83062207348},source=Struct{version=1.9.2.Final,connector=mysql,name=SERVER5432,ts_ms=1657658606000,snapshot=false,db=sbtest,table=sbtest1,server_id=842,file=mysql-bin.000003,pos=16210729,row=0,thread=22},op=u,ts_ms=1657658606611,transaction=Struct{id=file=mysql-bin.000003,pos=16210580,total_order=1,data_collection_order=1}}, valueSchema=Schema{SERVER5432.sbtest.sbtest1.Envelope:STRUCT}, timestamp=1657658607050, headers=ConnectHeaders(headers=)}

Updates on Primary Key: Debezium

Debezium handles updates on Primary key in the same way as Primary Key changes.

Table Schema:

use sbtest;
CREATE TABLE `sbtest1` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`k` int(11) NOT NULL DEFAULT '0',
`c` char(120) NOT NULL DEFAULT '',
`pad` char(60) NOT NULL DEFAULT '',
PRIMARY KEY (`id`,`k`)
)
PARTITION BY RANGE (k) (
PARTITION p1 VALUES LESS THAN (499999),
PARTITION p2 VALUES LESS THAN MAXVALUE
);

The following update statement in MySQL, will create 3 Debezium records

update sbtest.sbtest1 set k=k+1 where id=2317

Record 1: A Delete record with the old values with key== __debezium.newkey

SinkRecord{kafkaOffset=62978, timestampType=CreateTime} ConnectRecord{topic='SERVER5432.sbtest.sbtest1', kafkaPartition=0, key=Struct{id=2317,k=3737},
 keySchema=Schema{SERVER5432.sbtest.sbtest1.Key:STRUCT}, 
 value=Struct{before=Struct{id=2317,k=3737,c=20488251985-66135155553-00362235007-72249840112-70784105787-84584360668-65106023418-49140058226-99031281108-48426083028,
 pad=18846546959-44726413785-66695616247-63594911107-83062207348},source=Struct{version=1.9.2.Final,connector=mysql,name=SERVER5432,ts_ms=1657655632000,
 snapshot=false,db=sbtest,table=sbtest1,server_id=842,file=mysql-bin.000003,pos=16209369,row=0,thread=172},
 op=d,ts_ms=1657655632066,transaction=Struct{id=file=mysql-bin.000003,pos=16209220,total_order=1,data_collection_order=1}}, 
 valueSchema=Schema{SERVER5432.sbtest.sbtest1.Envelope:STRUCT}, timestamp=1657655632487, 
 headers=ConnectHeaders(headers=[ConnectHeader(key=__debezium.newkey, value={id=2317, k=3738}, schema=Schema{MAP})])}

Record 2: No Operation: key= __debezium.newkey

SinkRecord{kafkaOffset=62979, timestampType=CreateTime} ConnectRecord{topic='SERVER5432.sbtest.sbtest1', kafkaPartition=0, 
key=Struct{id=2317,k=3737}, keySchema=Schema{SERVER5432.sbtest.sbtest1.Key:STRUCT}, value=null, valueSchema=null, 
timestamp=1657655632487, headers=ConnectHeaders(headers=[ConnectHeader(key=__debezium.newkey, value={id=2317, k=3738}, schema=Schema{MAP})])}

Record 3: A create record with new values

SinkRecord{kafkaOffset=62980, timestampType=CreateTime} ConnectRecord{topic='SERVER5432.sbtest.sbtest1', kafkaPartition=0, 
key=Struct{id=2317,k=3738}, keySchema=Schema{SERVER5432.sbtest.sbtest1.Key:STRUCT}, value=Struct{after=Struct{id=2317,k=3738,
c=20488251985-66135155553-00362235007-72249840112-70784105787-84584360668-65106023418-49140058226-99031281108-48426083028,
pad=18846546959-44726413785-66695616247-63594911107-83062207348},source=Struct{version=1.9.2.Final,connector=mysql,name=SERVER5432,ts_ms=1657655632000,
snapshot=false,db=sbtest,table=sbtest1,server_id=842,file=mysql-bin.000003,pos=16209369,row=0,thread=172},op=c,ts_ms=1657655632066,
transaction=Struct{id=file=mysql-bin.000003,pos=16209220,total_order=2,data_collection_order=2}}, valueSchema=Schema{SERVER5432.sbtest.sbtest1.Envelope:STRUCT}, 
timestamp=1657655632487, headers=ConnectHeaders(headers=[ConnectHeader(key=__debezium.oldkey, value={id=2317, k=3737}, schema=Schema{MAP})])}

Current behavior

The sink connector performs a delete first of the old values and an insert with the new values, Record 2 is not handled currently.

Deletes:

For deletes, record will be inserted with sign set to -1

ReplacingMergeTree

For updates, only the after record will be inserted with version set to timestamp in milliseconds. After merging, ClickHouse will drop the previous insert since the update version column value is greater than the insert record version value.

For Deletes, the user provided replacingmergetree.delete.column will be set to -1

Other table Engines(MergeTree ...)