Fixed NPE in ClickHouseConverter

This commit is contained in:
Kanthi Subramanian 2022-06-14 10:29:28 -04:00
parent 89da1a391d
commit de7f50e14f
2 changed files with 11 additions and 7 deletions

View File

@ -267,14 +267,8 @@ public class ClickHouseConverter implements AbstractConverter {
ClickHouseStruct chStruct = null;
if (convertedValue.containsKey(sectionKey)) {
Object beforeSection = convertedValue.get(SinkRecordColumns.BEFORE);
if(beforeSection != null) {
chStruct.setBeforeStruct((Struct) beforeSection);
}
Object afterSection = convertedValue.get(SinkRecordColumns.AFTER);
if(afterSection != null) {
chStruct.setAfterStruct((Struct) afterSection);
}
chStruct = new ClickHouseStruct(record.kafkaOffset(),
record.topic(), (Struct) record.key(), record.kafkaPartition(),
record.timestamp(), (Struct) beforeSection, (Struct) afterSection,

View File

@ -4,6 +4,7 @@ import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
import com.altinity.clickhouse.sink.connector.common.Metrics;
import com.altinity.clickhouse.sink.connector.common.Utils;
import com.altinity.clickhouse.sink.connector.db.DbKafkaOffsetWriter;
import com.altinity.clickhouse.sink.connector.db.DbWriter;
import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import com.codahale.metrics.Timer;
@ -11,6 +12,8 @@ import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.ref.WeakReference;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@ -81,6 +84,13 @@ public class ClickHouseBatchRunnable implements Runnable {
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password, this.config);
Map<TopicPartition, Long> partitionToOffsetMap = writer.insert(entry.getValue());
DbKafkaOffsetWriter dbKafkaOffsetWriter = new DbKafkaOffsetWriter(new WeakReference<>(writer),
"topic_offset_metadata");
try {
dbKafkaOffsetWriter.insertTopicOffsetMetadata(partitionToOffsetMap);
} catch (SQLException e) {
log.error("Error persisting offsets to CH", e);
}
context.stop();
// Metrics.updateSinkRecordsCounter(blockUuid.toString(), taskId, topicName, tableName,