Fixed convertRecord() so that SinkRecords are converted to Struct. Fixed debezium and Sink configuration variables.

This commit is contained in:
Kanthi Subramanian 2022-04-06 09:52:58 -04:00
parent c979f93b8e
commit 740071a842
6 changed files with 28 additions and 9 deletions

View File

@ -14,7 +14,7 @@ MYSQL_PASSWORD="root"
# Comma-separated list of regular expressions that match the databases for which to capture changes
MYSQL_DBS="test"
# Comma-separated list of regular expressions that match fully-qualified table identifiers of tables
MYSQL_TABLES=""
MYSQL_TABLES="employees"
#KAFKA_BOOTSTRAP_SERVERS="one-node-cluster-0.one-node-cluster.redpanda.svc.cluster.local:9092"
KAFKA_BOOTSTRAP_SERVERS="kafka:9092"
KAFKA_TOPIC="schema-changes.test_db"
@ -26,21 +26,29 @@ DATABASE_SERVER_ID="5432"
# Alphanumeric characters, hyphens, dots and underscores only.
DATABASE_SERVER_NAME="SERVER5432"
#"database.include.list": "${MYSQL_DBS}",
#"table.include.list": "${MYSQL_TABLES}",
cat <<EOF | curl --request POST --url "${CONNECT_URL}" --header 'Content-Type: application/json' --data @-
{
"name": "${CONNECTOR_NAME}",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"snapshot.mode": "initial",
"snapshot.locking.mode": "minimal",
"snapshot.delay.ms": 10000,
"include.schema.changes":"true",
"database.hostname": "${MYSQL_HOST}",
"database.port": "${MYSQL_PORT}",
"database.user": "${MYSQL_USER}",
"database.password": "${MYSQL_PASSWORD}",
"database.server.id": "${DATABASE_SERVER_ID}",
"database.server.name": "${DATABASE_SERVER_NAME}",
"database.include.list": "${MYSQL_DBS}",
"table.include.list": "${MYSQL_TABLES}",
"database.whitelist": "${MYSQL_DBS}",
"database.history.kafka.bootstrap.servers": "${KAFKA_BOOTSTRAP_SERVERS}",
"database.history.kafka.topic": "${KAFKA_TOPIC}"
}

View File

@ -0,0 +1,4 @@
#!/bin/sh
echo "Deleting Source Connector"
curl -X DELETE -H "Accept:application/json" localhost:8083/connectors/test-connector 2>/dev/null | jq .

View File

@ -27,6 +27,8 @@ services:
- MYSQL_DATABASE=test
volumes:
- ../sql/init_mysql.sql:/docker-entrypoint-initdb.d/init_mysql.sql
security_opt:
- seccomp:unconfined
#zookeeper:
# container_name: zookeeper
# image: zookeeper

View File

@ -28,7 +28,9 @@ cat <<EOF | curl --request POST --url "${CONNECT_URL}" --header 'Content-Type: a
"clickhouse.server.pass": "${CLICKHOUSE_PASSWORD}",
"clickhouse.server.database": "${CLICKHOUSE_DATABASE}",
"clickhouse.server.port": ${CLICKHOUSE_PORT},
"clickhouse.table.name": "${CLICKHOUSE_TABLE}"
"clickhouse.table.name": "${CLICKHOUSE_TABLE}",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
EOF

View File

@ -93,6 +93,9 @@ public class ClickHouseSinkTask extends SinkTask {
@Override
public void stop() {
log.info("stop({})", this.id);
if(this.executor != null) {
this.executor.shutdown();
}
}
@Override

View File

@ -173,8 +173,8 @@ public class ClickHouseConverter implements AbstractConverter {
// Check "after" value represented by this record.
if (convertedValue.containsKey("after")) {
Struct afterValue = (Struct) convertedValue.get("after");
List<Field> fields = afterValue.schema().fields();
afterRecord = (Struct) convertedValue.get("after");
List<Field> fields = afterRecord.schema().fields();
List<String> cols = new ArrayList<String>();
List<Object> values = new ArrayList<Object>();
@ -182,10 +182,10 @@ public class ClickHouseConverter implements AbstractConverter {
for (Field field : fields) {
log.info("Key" + field.name());
log.info("Value" + afterValue.get(field));
log.info("Value" + afterRecord.get(field));
cols.add(field.name());
values.add(afterValue.get(field));
values.add(afterRecord.get(field));
}
}