Fixed bug with schema inference when auto creating tables, Write kafka offset data only when config variable is set to true.

This commit is contained in:
Kanthi Subramanian 2022-07-14 09:53:28 -05:00
parent c436a14198
commit 5dfdf7beb4
8 changed files with 106 additions and 28 deletions

View File

@ -16,10 +16,12 @@ CLICKHOUSE_DATABASE="test"
BUFFER_COUNT=10000
#SERVER5432.transaction
TOPICS="SERVER5432.test.employees_predated, SERVER5432.test.products, SERVER5432.test.t1, SERVER5432.sbtest.sbtest1, SERVER5432.public.Employee"
TOPICS="SERVER5432.test.employees_predated, SERVER5432.test.products, , SERVER5432.test.customers, SERVER5432.test.t1, SERVER5432.sbtest.sbtest1, SERVER5432.public.Employee"
TOPICS_TABLE_MAP="SERVER5432.test.employees_predated:employees, SERVER5432.test.products:products"
#TOPICS="SERVER5432"
#"topics.regex": "SERVER5432.sbtest.(.*), SERVER5432.test.(.*)",
#"topics": "${TOPICS}",
cat <<EOF | curl --request POST --url "${CONNECTORS_MANAGEMENT_URL}" --header 'Content-Type: application/json' --data @-
@ -28,7 +30,7 @@ cat <<EOF | curl --request POST --url "${CONNECTORS_MANAGEMENT_URL}" --header 'C
"config": {
"connector.class": "com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector",
"tasks.max": "10",
"topics.regex": "SERVER5432.sbtest.(.*)",
"topics": "${TOPICS}",
"clickhouse.topic2table.map": "${TOPICS_TABLE_MAP}",
"clickhouse.server.url": "${CLICKHOUSE_HOST}",
"clickhouse.server.user": "${CLICKHOUSE_USER}",

View File

@ -99,13 +99,13 @@ PRIMARY KEY intcol1;
--ORDER by(id, k);
-- Metadata tables --
CREATE TABLE topic_offset_metadata(
`_topic` String,
`_partition` UInt64,
`_offset` SimpleAggregateFunction(max, UInt64)
)
ENGINE = AggregatingMergeTree
ORDER BY (_topic, _partition);
--CREATE TABLE topic_offset_metadata(
--`_topic` String,
--`_partition` UInt64,
--`_offset` SimpleAggregateFunction(max, UInt64)
--)
--ENGINE = AggregatingMergeTree
--ORDER BY (_topic, _partition);
-- Postgres tables --
CREATE TABLE Employee(

View File

@ -0,0 +1,55 @@
#!/bin/bash
sysbench \
/usr/share/sysbench/oltp_delete.lua \
--report-interval=2 \
--threads=16 \
--rate=0 \
--time=0 \
--db-driver=mysql \
--mysql-host=127.0.0.1 \
--mysql-port=3306 \
--mysql-user=root \
--mysql-db=sbtest \
--mysql-password=root \
--tables=1 \
--table-size=10000 \
--debug \
--verbosity=5 \
cleanup
sysbench \
/usr/share/sysbench/oltp_delete.lua \
--report-interval=2 \
--threads=16 \
--rate=0 \
--time=0 \
--db-driver=mysql \
--mysql-host=127.0.0.1 \
--mysql-port=3306 \
--mysql-user=root \
--mysql-db=sbtest \
--mysql-password=root \
--tables=1 \
--table-size=10000 \
--debug \
--verbosity=5 \
prepare
sysbench \
/usr/share/sysbench/oltp_delete.lua \
--report-interval=2 \
--threads=16 \
--rate=0 \
--time=0 \
--db-driver=mysql \
--mysql-host=127.0.0.1 \
--mysql-port=3306 \
--mysql-user=root \
--mysql-db=sbtest \
--mysql-password=root \
--tables=1 \
--table-size=10000 \
--debug \
--verbosity=5 \
run

View File

@ -89,7 +89,7 @@ public class DbWriter extends BaseDbWriter {
log.info("**** AUTO CREATE TABLE" + tableName);
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();
try {
act.createNewTable(record.getPrimaryKey(), tableName, record.getAfterModifiedFields().toArray(new Field[0]), this.conn);
act.createNewTable(record.getPrimaryKey(), tableName, record.getAfterStruct().schema().fields().toArray(new Field[0]), this.conn);
this.columnNameToDataTypeMap = this.getColumnsDataTypesForTable(tableName);
response = metadata.getTableEngine(this.conn, database, tableName);
this.engine = response.getLeft();

View File

@ -21,7 +21,7 @@ public class ClickHouseAutoCreateTable extends ClickHouseTableOperationsBase{
public void createNewTable(ArrayList<String> primaryKey, String tableName, Field[] fields, ClickHouseConnection connection) throws SQLException {
Map<String, String> colNameToDataTypeMap = this.getColumnNameToCHDataTypeMapping(fields);
String createTableQuery = this.createTableSyntax(primaryKey, tableName, colNameToDataTypeMap);
String createTableQuery = this.createTableSyntax(primaryKey, tableName, fields, colNameToDataTypeMap);
this.runQuery(createTableQuery, connection);
}
@ -33,15 +33,31 @@ public class ClickHouseAutoCreateTable extends ClickHouseTableOperationsBase{
* @param columnToDataTypesMap
* @return CREATE TABLE query
*/
public java.lang.String createTableSyntax(ArrayList<String> primaryKey, String tableName, Map<String, String> columnToDataTypesMap) {
public java.lang.String createTableSyntax(ArrayList<String> primaryKey, String tableName, Field[] fields, Map<String, String> columnToDataTypesMap) {
StringBuilder createTableSyntax = new StringBuilder();
createTableSyntax.append("CREATE TABLE").append(" ").append(tableName).append("(");
for(Map.Entry<String, String> entry: columnToDataTypesMap.entrySet()) {
createTableSyntax.append("`").append(entry.getKey()).append("`").append(" ").append(entry.getValue()).append(",");
for(Field f: fields) {
String colName = f.name();
String dataType = columnToDataTypesMap.get(colName);
boolean isNull = false;
if(f.schema().isOptional() == true) {
isNull = true;
}
createTableSyntax.append("`").append(colName).append("`").append(" ").append(dataType);
if(isNull) {
createTableSyntax.append(" NULL");
} else {
createTableSyntax.append(" NOT NULL");
}
createTableSyntax.append(",");
}
// for(Map.Entry<String, String> entry: columnToDataTypesMap.entrySet()) {
// createTableSyntax.append("`").append(entry.getKey()).append("`").append(" ").append(entry.getValue()).append(",");
// }
//createTableSyntax.deleteCharAt(createTableSyntax.lastIndexOf(","));
// Append sign and version columns

View File

@ -86,11 +86,14 @@ public class ClickHouseBatchRunnable implements Runnable {
synchronized (this.records) {
partitionToOffsetMap = writer.insert(entry.getValue());
}
DbKafkaOffsetWriter dbKafkaOffsetWriter = new DbKafkaOffsetWriter(dbHostName, port, database, "topic_offset_metadata", userName, password, this.config);
try {
dbKafkaOffsetWriter.insertTopicOffsetMetadata(partitionToOffsetMap);
} catch (SQLException e) {
log.error("Error persisting offsets to CH", e);
if(this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.ENABLE_KAFKA_OFFSET)) {
log.info("***** KAFKA OFFSET MANAGEMENT ENABLED *****");
DbKafkaOffsetWriter dbKafkaOffsetWriter = new DbKafkaOffsetWriter(dbHostName, port, database, "topic_offset_metadata", userName, password, this.config);
try {
dbKafkaOffsetWriter.insertTopicOffsetMetadata(partitionToOffsetMap);
} catch (SQLException e) {
log.error("Error persisting offsets to CH", e);
}
}
context.stop();

View File

@ -25,12 +25,12 @@ public class ClickHouseAutoCreateTableTest {
ClickHouseConnection conn;
@Before
public void initialize() throws SQLException {
this.columnToDataTypesMap = new HashMap<>();
this.columnToDataTypesMap = getExpectedColumnToDataTypesMap();
this.columnToDataTypesMap.put("customer_id", "Int32");
this.columnToDataTypesMap.put("address", "String");
this.columnToDataTypesMap.put("first_name", "String");
this.columnToDataTypesMap.put("amount", "Int32");
// this.columnToDataTypesMap.put("customer_id", "Int32");
// this.columnToDataTypesMap.put("address", "String");
// this.columnToDataTypesMap.put("first_name", "String");
// this.columnToDataTypesMap.put("amount", "Int32");
String hostName = "localhost";
Integer port = 8123;
@ -100,11 +100,11 @@ public class ClickHouseAutoCreateTableTest {
@Test
public void testCreateTableSyntax() {
ArrayList<String> primaryKeys = new ArrayList<>();
primaryKeys.add("customer_id");
primaryKeys.add("customerName");
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();
String query = act.createTableSyntax(primaryKeys, "auto_create_table", this.columnToDataTypesMap);
String query = act.createTableSyntax(primaryKeys, "auto_create_table", createFields(), this.columnToDataTypesMap);
String expectedQuery = "CREATE TABLE auto_create_table(`amount` Int32,`address` String,`customer_id` Int32,`first_name` String,`sign` Int8,`ver` UInt64) ENGINE = ReplacingMergeTree(ver) PRIMARY KEY(customer_id) ORDER BY(customer_id)";
Assert.assertTrue(query.equalsIgnoreCase(expectedQuery));
@ -118,7 +118,7 @@ public class ClickHouseAutoCreateTableTest {
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();
String query = act.createTableSyntax(primaryKeys, "auto_create_table", this.columnToDataTypesMap);
String query = act.createTableSyntax(primaryKeys, "auto_create_table", createFields(), this.columnToDataTypesMap);
String expectedQuery = "CREATE TABLE auto_create_table(`amount` Int32,`address` String,`customer_id` Int32,`first_name` String,`sign` Int8,`ver` UInt64) ENGINE = ReplacingMergeTree(ver) PRIMARY KEY(customer_id,customer_name) ORDER BY(customer_id,customer_name)";
Assert.assertTrue(query.equalsIgnoreCase(expectedQuery));

View File

@ -1,3 +1,5 @@
clickhouse-driver==0.2.3
mysql-connector-python
faker==13.3.5
testcontainers
testcontainers[clickhouse]