diff --git a/README.md b/README.md index 25a2f37..fc606e8 100644 --- a/README.md +++ b/README.md @@ -21,5 +21,4 @@ The connector is tested with the following converters - [Local Setup - Docker Compose](doc/setup.md) - [Kubernetes Setup](doc/k8s_pipeline_setup.md) - [Sink Configuration](doc/sink_configuration.md) -- [Testing](doc/TESTING.md) -- [Glossary](doc/glossary.md) +- [Testing](doc/TESTING.md) \ No newline at end of file diff --git a/deploy/sql/init_clickhouse.sql b/deploy/sql/init_clickhouse.sql index 5b4dde6..c2d7620 100644 --- a/deploy/sql/init_clickhouse.sql +++ b/deploy/sql/init_clickhouse.sql @@ -97,19 +97,7 @@ PRIMARY KEY id; CREATE TABLE topic_offset_metadata( `_topic` String, `_partition` UInt64, -`_offset` UInt64 +`_offset` SimpleAggregateFunction(max, UInt64) ) -ENGINE = MergeTree -PRIMARY KEY _topic; - -CREATE TABLE topic_offset_metadata_view -( -`_topic` String, -`_partition` UInt64, -`_offset` UInt64 -) Engine=MergeTree -PRIMARY KEY _topic; - - -CREATE MATERIALIZED VIEW mv_topic_offset_metadata_view to topic_offset_metadata_view -AS select _topic, _partition, MAX(_offset) as _offset from topic_offset_metadata tom group by _topic, _partition; \ No newline at end of file +ENGINE = AggregatingMergeTree +ORDER BY (_topic, _partition); \ No newline at end of file diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java b/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java index d398aac..668a0f8 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java @@ -275,7 +275,7 @@ public class ClickHouseSinkConnectorConfig extends AbstractConfig { .define( ClickHouseSinkConnectorConfigVariables.ENABLE_KAFKA_OFFSET, Type.BOOLEAN, - true, + false, Importance.HIGH, "If enabled, topic offsets are stored in CH, if false topic offsets are managed in kafka topics", CONFIG_GROUP_CONNECTOR_CONFIG, diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java b/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java index 3d889e9..0e2ae41 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkTask.java @@ -151,6 +151,14 @@ public class ClickHouseSinkTask extends SinkTask { return committedOffsets; } +/** + @Override + public void flush(Map currentOffsets) { + // No-op. The connector is managing the offsets. + if(!this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.ENABLE_KAFKA_OFFSET)) { + return currentOffsets; + } + }**/ @Override public String version() { diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/db/DbKafkaOffsetWriter.java b/src/main/java/com/altinity/clickhouse/sink/connector/db/DbKafkaOffsetWriter.java index cc9b48e..6c4f108 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/db/DbKafkaOffsetWriter.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/db/DbKafkaOffsetWriter.java @@ -77,7 +77,7 @@ public class DbKafkaOffsetWriter { Map result = new HashMap(); Statement stmt = this.writer.get().getConnection().createStatement(); - ResultSet rs = stmt.executeQuery("select * from mv_topic_offset_metadata_view"); + ResultSet rs = stmt.executeQuery("select * from topic_offset_metadata"); while (rs.next()) { String topicName = rs.getString(KafkaMetaData.TOPIC.getColumn()); diff --git a/src/test/com/altinity/clickhouse/sink/connector/db/DbWriterTest.java b/src/test/com/altinity/clickhouse/sink/connector/db/DbWriterTest.java index fd35f0d..3a1af90 100644 --- a/src/test/com/altinity/clickhouse/sink/connector/db/DbWriterTest.java +++ b/src/test/com/altinity/clickhouse/sink/connector/db/DbWriterTest.java @@ -194,7 +194,8 @@ public class DbWriterTest { Map> queryToRecordsMap = new HashMap>(); - Map result = dbWriter.groupQueryWithRecords(getSampleRecords(), queryToRecordsMap); + Map result = dbWriter.groupQueryWithRecords(getSampleRecords() + , queryToRecordsMap); Assert.assertTrue(result.isEmpty() == false);