diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseConverter.java b/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseConverter.java index 089785c..47ee8e7 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseConverter.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/converters/ClickHouseConverter.java @@ -267,14 +267,8 @@ public class ClickHouseConverter implements AbstractConverter { ClickHouseStruct chStruct = null; if (convertedValue.containsKey(sectionKey)) { Object beforeSection = convertedValue.get(SinkRecordColumns.BEFORE); - if(beforeSection != null) { - chStruct.setBeforeStruct((Struct) beforeSection); - } - Object afterSection = convertedValue.get(SinkRecordColumns.AFTER); - if(afterSection != null) { - chStruct.setAfterStruct((Struct) afterSection); - } + chStruct = new ClickHouseStruct(record.kafkaOffset(), record.topic(), (Struct) record.key(), record.kafkaPartition(), record.timestamp(), (Struct) beforeSection, (Struct) afterSection, diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java b/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java index 924b343..b61bbe4 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java @@ -4,6 +4,7 @@ import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables; import com.altinity.clickhouse.sink.connector.common.Metrics; import com.altinity.clickhouse.sink.connector.common.Utils; +import com.altinity.clickhouse.sink.connector.db.DbKafkaOffsetWriter; import com.altinity.clickhouse.sink.connector.db.DbWriter; import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct; import com.codahale.metrics.Timer; @@ -11,6 +12,8 @@ import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.ref.WeakReference; +import java.sql.SQLException; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -81,6 +84,13 @@ public class ClickHouseBatchRunnable implements Runnable { DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password, this.config); Map partitionToOffsetMap = writer.insert(entry.getValue()); + DbKafkaOffsetWriter dbKafkaOffsetWriter = new DbKafkaOffsetWriter(new WeakReference<>(writer), + "topic_offset_metadata"); + try { + dbKafkaOffsetWriter.insertTopicOffsetMetadata(partitionToOffsetMap); + } catch (SQLException e) { + log.error("Error persisting offsets to CH", e); + } context.stop(); // Metrics.updateSinkRecordsCounter(blockUuid.toString(), taskId, topicName, tableName,