Changed offset management from MV to AggregatingMergeTree

This commit is contained in:
Kanthi Subramanian 2022-06-15 21:36:19 -04:00
parent 4f31af4fe3
commit 6cdd5a957e
6 changed files with 16 additions and 20 deletions

View File

@ -21,5 +21,4 @@ The connector is tested with the following converters
- [Local Setup - Docker Compose](doc/setup.md)
- [Kubernetes Setup](doc/k8s_pipeline_setup.md)
- [Sink Configuration](doc/sink_configuration.md)
- [Testing](doc/TESTING.md)
- [Glossary](doc/glossary.md)
- [Testing](doc/TESTING.md)

View File

@ -97,19 +97,7 @@ PRIMARY KEY id;
CREATE TABLE topic_offset_metadata(
`_topic` String,
`_partition` UInt64,
`_offset` UInt64
`_offset` SimpleAggregateFunction(max, UInt64)
)
ENGINE = MergeTree
PRIMARY KEY _topic;
CREATE TABLE topic_offset_metadata_view
(
`_topic` String,
`_partition` UInt64,
`_offset` UInt64
) Engine=MergeTree
PRIMARY KEY _topic;
CREATE MATERIALIZED VIEW mv_topic_offset_metadata_view to topic_offset_metadata_view
AS select _topic, _partition, MAX(_offset) as _offset from topic_offset_metadata tom group by _topic, _partition;
ENGINE = AggregatingMergeTree
ORDER BY (_topic, _partition);

View File

@ -275,7 +275,7 @@ public class ClickHouseSinkConnectorConfig extends AbstractConfig {
.define(
ClickHouseSinkConnectorConfigVariables.ENABLE_KAFKA_OFFSET,
Type.BOOLEAN,
true,
false,
Importance.HIGH,
"If enabled, topic offsets are stored in CH, if false topic offsets are managed in kafka topics",
CONFIG_GROUP_CONNECTOR_CONFIG,

View File

@ -151,6 +151,14 @@ public class ClickHouseSinkTask extends SinkTask {
return committedOffsets;
}
/**
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
// No-op. The connector is managing the offsets.
if(!this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.ENABLE_KAFKA_OFFSET)) {
return currentOffsets;
}
}**/
@Override
public String version() {

View File

@ -77,7 +77,7 @@ public class DbKafkaOffsetWriter {
Map<TopicPartition, Long> result = new HashMap<TopicPartition, Long>();
Statement stmt = this.writer.get().getConnection().createStatement();
ResultSet rs = stmt.executeQuery("select * from mv_topic_offset_metadata_view");
ResultSet rs = stmt.executeQuery("select * from topic_offset_metadata");
while (rs.next()) {
String topicName = rs.getString(KafkaMetaData.TOPIC.getColumn());

View File

@ -194,7 +194,8 @@ public class DbWriterTest {
Map<String, List<ClickHouseStruct>> queryToRecordsMap = new HashMap<String, List<ClickHouseStruct>>();
Map<TopicPartition, Long> result = dbWriter.groupQueryWithRecords(getSampleRecords(), queryToRecordsMap);
Map<TopicPartition, Long> result = dbWriter.groupQueryWithRecords(getSampleRecords()
, queryToRecordsMap);
Assert.assertTrue(result.isEmpty() == false);