Added logic to extract primary key from Kafka connect key, used for alter table.

This commit is contained in:
Kanthi Subramanian 2022-07-07 22:18:26 -04:00
parent 28fc66213d
commit 6c1f24524b
2 changed files with 33 additions and 4 deletions

View File

@ -4,6 +4,7 @@ import com.altinity.clickhouse.sink.connector.converters.ClickHouseConverter;
import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,6 +39,10 @@ public class ClickHouseStruct {
@Setter
private String key;
@Getter
@Setter
private String primaryKey;
@Getter
@Setter
private long ts_ms;
@ -106,6 +111,16 @@ public class ClickHouseStruct {
this.timestamp = timestamp;
if(key != null) {
this.key = key.toString();
Schema pkSchema = key.schema();
if(pkSchema != null) {
List<Field> fields = pkSchema.fields();
if(fields != null && fields.isEmpty() == false) {
// ToDO: can the Key be more than one field.
if(fields.get(0) != null) {
this.primaryKey = fields.get(0).name();
}
}
}
}
setBeforeStruct(beforeStruct);
setAfterStruct(afterStruct);

View File

@ -159,7 +159,7 @@ class MyTestCase(unittest.TestCase):
conn = MySqlConnection()
conn.create_connection()
#conn.execute_sql("alter table products add column source json");
conn.execute_sql("alter table products add column source json");
conn.execute_sql("insert into products(source, productCode, productName, productLine, productScale,productVendor, "
"productDescription, quantityInStock, buyPrice, MSRP) "
@ -171,13 +171,27 @@ class MyTestCase(unittest.TestCase):
# clickhouse_conn.execute_sql('alter table products add column source String')
# #conn.execute_sql("")
def test_multiple_tables(self):
def test_alter_table(self):
conn = MySqlConnection()
conn.create_connection()
conn.execute_sql("alter table products add column emp_user_id varchar(15)");
conn.close()
conn.create_connection()
conn.execute_sql("insert into products(emp_user_id, productCode, productName, productLine, productScale,productVendor, "
"productDescription, quantityInStock, buyPrice, MSRP) "
"values('11', 'S10_122225', 'TEST', 'TEST', '1:10', 'TEST', 'TEST', 100, 1.0, 1.50)")
conn.close()
def test_multiple_tables(self):
#self.generate_employees_records_with_datetime()
#self.generate_employees_fake_records()
#self.generate_products_fake_records()
#self.generate_products_fake_records()
self.generate_update_records()
self.test_json_data_type()
#self.generate_update_records()
#self.test_json_data_type()
self.test_alter_table()
#self.generate_delete_records()
#self.test_duplicate_inserts()