Fixed DBKafkaOffsetWriterTest using TestContainers. Added logic to create kafka offset table.

This commit is contained in:
Kanthi Subramanian 2022-11-28 14:17:38 -05:00
parent 114a99b5a6
commit b6f3aba78d
6 changed files with 112 additions and 57 deletions

30
pom.xml
View File

@ -147,7 +147,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.0</version>
<configuration>
<excludedGroups>IntegrationTest</excludedGroups>
<!-- <excludedGroups>IntegrationTest</excludedGroups> -->
<excludes>
<exclude>**/*IT.java</exclude>
</excludes>
@ -521,5 +521,33 @@
<version>3.1.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.17.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.17.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>clickhouse</artifactId>
<version>1.17.6</version>
<scope>test</scope>
</dependency>
</dependencies>
<profiles>
<profile>
<id>allTests</id>
<properties>
<groups>IntegrationTest</groups>
</properties>
</profile>
</profiles>
</project>

View File

@ -22,4 +22,6 @@ public class ClickHouseDbConstants {
public static final String ORDER_BY = "ORDER BY";
public static final String ORDER_BY_TUPLE = "ORDER BY tuple()";
public static final String OFFSET_TABLE_CREATE_SQL = "CREATE TABLE topic_offset_metadata(`_topic` String, `_partition` UInt64,`_offset` SimpleAggregateFunction(max, UInt64))ENGINE = AggregatingMergeTree ORDER BY (_topic, _partition)";
}

View File

@ -33,11 +33,24 @@ public class DbKafkaOffsetWriter extends BaseDbWriter {
super(hostName, port, database, userName, password, config);
createOffsetTable();
this.columnNamesToDataTypesMap = this.getColumnsDataTypesForTable(tableName);
this.query = new QueryFormatter().getInsertQueryUsingInputFunction(tableName, columnNamesToDataTypesMap);
}
/**
* Function to create kafka offset table.
*/
public void createOffsetTable() {
try {
PreparedStatement ps = this.getConnection().prepareStatement(ClickHouseDbConstants.OFFSET_TABLE_CREATE_SQL);
ps.execute();
} catch(SQLException se) {
log.error("Error creating Kafka offset table");
}
}
/**
* @param topicPartitionToOffsetMap

View File

@ -159,10 +159,22 @@ public class QueryFormatter {
colNamesToDataTypes.append(entry.getKey()).append(" ").append(entry.getValue()).append(",");
}
//Remove terminating comma
colNamesDelimited.deleteCharAt(colNamesDelimited.lastIndexOf(","));
colNamesToDataTypes.deleteCharAt(colNamesToDataTypes.lastIndexOf(","));
if(colNamesDelimited.length() != 0) {
//Remove terminating comma
int indexOfComma = colNamesDelimited.lastIndexOf(",");
if (indexOfComma != -1) {
colNamesDelimited.deleteCharAt(indexOfComma);
}
}
if(colNamesToDataTypes.length() != 0) {
int indexOfComma = colNamesToDataTypes.lastIndexOf(",");
if(indexOfComma != -1) {
colNamesToDataTypes.deleteCharAt(indexOfComma);
}
}
return String.format("insert into %s select %s from input('%s')", tableName, colNamesDelimited, colNamesToDataTypes);
}
}

View File

@ -9,23 +9,33 @@ import org.apache.kafka.common.TopicPartition;
import org.junit.Assert;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Testcontainers
public class DbKafkaOffsetWriterTest {
@Container
private ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:latest")
.withInitScript("./init_clickhouse.sql");
@Test
@Tag("IntegrationTest")
public void testInsertTopicOffsetMetadata() throws SQLException {
String dbHostName = "localhost";
Integer port = 8123;
String database = "test";
String userName = "root";
String password = "root";
String dbHostName = clickHouseContainer.getHost();
Integer port = clickHouseContainer.getFirstMappedPort();
String database = "default";
String userName = clickHouseContainer.getUsername();
String password = clickHouseContainer.getPassword();
String tableName = "employees";
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
@ -38,26 +48,6 @@ public class DbKafkaOffsetWriterTest {
Map<TopicPartition, Long> result = writer.groupQueryWithRecords(com.clickhouse.sink.connector.db.DbWriterTest.getSampleRecords(), queryToRecordsMap);
dbKafkaOffsetWriter.insertTopicOffsetMetadata(result);
}
@Test
@Tag("IntegrationTest")
public void testGetStoredOffsets() throws SQLException {
String dbHostName = "localhost";
Integer port = 8123;
String database = "test";
String userName = "root";
String password = "root";
String tableName = "employees";
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null);
DbKafkaOffsetWriter dbKafkaOffsetWriter = new DbKafkaOffsetWriter(dbHostName, port, database, "topic_offset_metadata", userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()));
Map<TopicPartition, Long> offsetsMap = dbKafkaOffsetWriter.getStoredOffsets();
Assert.assertTrue(offsetsMap.isEmpty() == false);

View File

@ -17,23 +17,36 @@ import org.junit.Assert;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.sql.PreparedStatement;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
@Testcontainers
public class DbWriterTest {
static DbWriter writer;
// will be started before and stopped after each test method
@Container
private ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:latest")
.withInitScript("./init_clickhouse.sql");
//.withDatabaseName("test")
// .withUsername("test")
// .withPassword("test")
// .withExposedPorts(8123);
@BeforeAll
public static void init() {
String hostName = "remoteClickHouse";
Integer port = 8000;
String database = "employees";
String userName = "test";
String password = "test";
Integer port = 8123;
String database = "default";
String userName = "default";
String password = "";
String tableName = "employees";
ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>());
@ -81,34 +94,35 @@ public class DbWriterTest {
@Tag("IntegrationTest")
public void testGetColumnsDataTypesForTable() {
String dbHostName = "localhost";
Integer port = 8123;
String database = "test";
String userName = "root";
String password = "root";
String dbHostName = clickHouseContainer.getHost();
Integer port = clickHouseContainer.getFirstMappedPort();
String database = "default";
String userName = clickHouseContainer.getUsername();
String password = clickHouseContainer.getPassword();
String tableName = "employees";
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null);
writer.getColumnsDataTypesForTable("employees");
Map<String, String> columnDataTypesMap = writer.getColumnsDataTypesForTable("employees");
Assert.assertTrue(columnDataTypesMap.isEmpty() == false);
}
@Test
@Tag("IntegrationTest")
public void testGetEngineType() {
String dbHostName = "localhost";
Integer port = 8123;
String database = "test";
String userName = "root";
String password = "root";
String dbHostName = clickHouseContainer.getHost();
Integer port = clickHouseContainer.getFirstMappedPort();
String database = "default";
String userName = clickHouseContainer.getUsername();
String password = clickHouseContainer.getPassword();
String tableName = "employees";
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null);
MutablePair<DBMetadata.TABLE_ENGINE, String> result = new DBMetadata().getTableEngineUsingShowTable(writer.getConnection(), "employees");
Assert.assertTrue(result.getLeft() == DBMetadata.TABLE_ENGINE.REPLACING_MERGE_TREE);
Assert.assertTrue(result.getRight().equalsIgnoreCase("ver"));
Assert.assertTrue(result.getRight().equalsIgnoreCase("_version"));
MutablePair<DBMetadata.TABLE_ENGINE, String> resultProducts = new DBMetadata().getTableEngineUsingShowTable(writer.getConnection(), "products");
Assert.assertTrue(resultProducts.getLeft() == DBMetadata.TABLE_ENGINE.COLLAPSING_MERGE_TREE);
@ -118,32 +132,28 @@ public class DbWriterTest {
@Test
@Tag("IntegrationTest")
public void testGetEngineTypeUsingSystemTables() {
String dbHostName = "localhost";
Integer port = 8123;
String database = "test";
String userName = "root";
String password = "root";
String dbHostName = clickHouseContainer.getHost();
Integer port = clickHouseContainer.getFirstMappedPort();
String database = "default";
String userName = clickHouseContainer.getUsername();
String password = clickHouseContainer.getPassword();
String tableName = "employees";
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null);
MutablePair< DBMetadata.TABLE_ENGINE, String> result = new DBMetadata().getTableEngineUsingSystemTables(writer.getConnection(),
"test", "employees");
database, "employees");
Assert.assertTrue(result.getLeft() == DBMetadata.TABLE_ENGINE.REPLACING_MERGE_TREE);
MutablePair<DBMetadata.TABLE_ENGINE, String> result_products = new DBMetadata().getTableEngineUsingSystemTables(writer.getConnection(),
"test", "products");
database, "products");
Assert.assertTrue(result_products.getLeft() == DBMetadata.TABLE_ENGINE.COLLAPSING_MERGE_TREE);
// Table does not exist.
MutablePair<DBMetadata.TABLE_ENGINE, String> result_registration = new DBMetadata().getTableEngineUsingSystemTables(writer.getConnection(),
"test", "registration");
database, "registration");
Assert.assertNull(result_registration.getLeft());
MutablePair<DBMetadata.TABLE_ENGINE, String> result_t1 = new DBMetadata().getTableEngineUsingSystemTables(writer.getConnection(),
"test", "t1");
Assert.assertTrue(result_t1.getLeft() == DBMetadata.TABLE_ENGINE.MERGE_TREE);
}
public static ConcurrentLinkedQueue<ClickHouseStruct> getSampleRecords() {