Fixed unit tests and pom file for maven surefile plugin

This commit is contained in:
Kanthi Subramanian 2022-09-08 13:56:21 -04:00
parent fd3038e33a
commit 8d7e85243e
18 changed files with 6 additions and 1267 deletions

11
pom.xml
View File

@ -19,10 +19,6 @@
<organization>Altinity</organization> <organization>Altinity</organization>
<organizationUrl>https://altinity.com</organizationUrl> <organizationUrl>https://altinity.com</organizationUrl>
</developer> </developer>
<developer>
<name>Vladislav Klimenko</name>
<email>sunsingerus@gmail.com</email>
</developer>
</developers> </developers>
<licenses> <licenses>
@ -154,8 +150,11 @@
<excludes> <excludes>
<exclude>**/*IT.java</exclude> <exclude>**/*IT.java</exclude>
</excludes> </excludes>
<includes>
<include>**/*Test.java</include>
</includes>
</configuration> </configuration>
<executions> <!-- <executions>
<execution> <execution>
<id>integration-test</id> <id>integration-test</id>
<goals> <goals>
@ -171,7 +170,7 @@
</includes> </includes>
</configuration> </configuration>
</execution> </execution>
</executions> </executions> -->
</plugin> </plugin>
<plugin> <plugin>

View File

@ -62,7 +62,7 @@ public class BaseDbWriter {
* Function that uses the DatabaseMetaData JDBC functionality * Function that uses the DatabaseMetaData JDBC functionality
* to get the column name and column data type as key/value pair. * to get the column name and column data type as key/value pair.
*/ */
protected Map<String, String> getColumnsDataTypesForTable(String tableName) { public Map<String, String> getColumnsDataTypesForTable(String tableName) {
LinkedHashMap<String, String> result = new LinkedHashMap<>(); LinkedHashMap<String, String> result = new LinkedHashMap<>();
try { try {

View File

@ -1,4 +0,0 @@
package com.altinity.clickhouse.sink.connector;
public class ClickHouseSinkConnectorTest {
}

View File

@ -1,54 +0,0 @@
package com.altinity.clickhouse.sink.connector;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import java.util.concurrent.atomic.AtomicLong;
public class ClickHouseSinkTaskTest {
private static AtomicLong spoofedRecordOffset = new AtomicLong();
/**
* Function to create spoofed sink record.
* @param topic
* @param keyField
* @param key
* @param valueField
* @param value
* @param timestampType
* @param timestamp
* @return
*/
public static SinkRecord spoofSinkRecord(String topic, String keyField, String key,
String valueField, String value,
TimestampType timestampType, Long timestamp) {
Schema basicKeySchema = null;
Struct basicKey = null;
if (keyField != null) {
basicKeySchema = SchemaBuilder
.struct()
.field(keyField, Schema.STRING_SCHEMA)
.build();
basicKey = new Struct(basicKeySchema);
basicKey.put(keyField, key);
}
Schema basicValueSchema = null;
Struct basicValue = null;
if (valueField != null) {
basicValueSchema = SchemaBuilder
.struct()
.field(valueField, Schema.STRING_SCHEMA)
.build();
basicValue = new Struct(basicValueSchema);
basicValue.put(valueField, value);
}
return new SinkRecord(topic, 0, basicKeySchema, basicKey,
basicValueSchema, basicValue, spoofedRecordOffset.getAndIncrement(), timestamp, timestampType);
}
}

View File

@ -1,35 +0,0 @@
package com.altinity.clickhouse.sink.connector;
import com.altinity.clickhouse.sink.connector.common.Utils;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class UtilsTest {
@Test
public void testParseTopicToTableMap() throws Exception {
String topicsToTable = "cluster1.topic1:table1, cluster2.topic2:table2";
Map<String, String> result = Utils.parseTopicToTableMap(topicsToTable);
Map<String, String> expectedHashMap = new HashMap<String, String>();
expectedHashMap.put("cluster1.topic1", "table1");
expectedHashMap.put("cluster2.topic2", "table2");
Assert.assertEquals(result, expectedHashMap);
}
@Test
public void testGetTableNameFromTopic() {
String topicName = "SERVER5432.test.employees";
String tableName = Utils.getTableNameFromTopic(topicName);
Assert.assertEquals(tableName, "employees");
}
}

View File

@ -1,62 +0,0 @@
package com.altinity.clickhouse.sink.connector.converters;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Test;
public class ClickHouseConverterTest {
@Test
public void testConvert() {
ClickHouseConverter converter = new ClickHouseConverter();
SinkRecord record = spoofSinkRecord("test", "key", "k", "value", "v",
TimestampType.NO_TIMESTAMP_TYPE, null);
converter.convert(record);
}
/**
* Utility method for spoofing SinkRecords that should be passed to SinkTask.put()
* @param topic The topic of the record.
* @param keyField The field name for the record key; may be null.
* @param key The content of the record key; may be null.
* @param valueField The field name for the record value; may be null
* @param value The content of the record value; may be null
* @param timestampType The type of timestamp embedded in the message
* @param timestamp The timestamp in milliseconds
* @return The spoofed SinkRecord.
*/
public static SinkRecord spoofSinkRecord(String topic, String keyField, String key,
String valueField, String value,
TimestampType timestampType, Long timestamp) {
Schema basicKeySchema = null;
Struct basicKey = null;
if (keyField != null) {
basicKeySchema = SchemaBuilder
.struct()
.field(keyField, Schema.STRING_SCHEMA)
.build();
basicKey = new Struct(basicKeySchema);
basicKey.put(keyField, key);
}
Schema basicValueSchema = null;
Struct basicValue = null;
if (valueField != null) {
basicValueSchema = SchemaBuilder
.struct()
.field(valueField, Schema.STRING_SCHEMA)
.build();
basicValue = new Struct(basicValueSchema);
basicValue.put(valueField, value);
}
return new SinkRecord(topic, 0, basicKeySchema, basicKey,
basicValueSchema, basicValue, 0, timestamp, timestampType);
}
}

View File

@ -1,31 +0,0 @@
package com.altinity.clickhouse.sink.connector.converters;
import com.clickhouse.client.ClickHouseDataType;
import io.debezium.time.Date;
import io.debezium.time.Time;
import org.apache.kafka.connect.data.Schema;
import org.junit.Assert;
import org.junit.Test;
public class ClickHouseDataTypeMapperTest {
@Test
public void getClickHouseDataType() {
ClickHouseDataType chDataType = ClickHouseDataTypeMapper.getClickHouseDataType(Schema.Type.INT16, null);
Assert.assertTrue(chDataType.name().equalsIgnoreCase("INT16"));
chDataType = ClickHouseDataTypeMapper.getClickHouseDataType(Schema.Type.INT32, null);
Assert.assertTrue(chDataType.name().equalsIgnoreCase("INT32"));
chDataType = ClickHouseDataTypeMapper.getClickHouseDataType(Schema.BYTES_SCHEMA.type(), null);
Assert.assertTrue(chDataType.name().equalsIgnoreCase("String"));
chDataType = ClickHouseDataTypeMapper.getClickHouseDataType(Schema.INT32_SCHEMA.type(), Time.SCHEMA_NAME);
Assert.assertTrue(chDataType.name().equalsIgnoreCase("String"));
chDataType = ClickHouseDataTypeMapper.getClickHouseDataType(Schema.INT32_SCHEMA.type(), Date.SCHEMA_NAME);
Assert.assertTrue(chDataType.name().equalsIgnoreCase("Date32"));
}
}

View File

@ -1,85 +0,0 @@
package com.altinity.clickhouse.sink.connector.converters;
import com.altinity.clickhouse.sink.connector.metadata.DataTypeRange;
import org.junit.Assert;
import org.junit.Test;
public class DebeziumConverterTest {
@Test
public void testMicroTimeConverter() {
Object timeInMicroSeconds = 3723000000L;
String formattedTime = DebeziumConverter.MicroTimeConverter.convert(timeInMicroSeconds);
Assert.assertTrue(formattedTime.equalsIgnoreCase("19:02:03"));
}
@Test
public void testTimestampConverter() {
Object timestampEpoch = 1640995260000L;
String formattedTimestamp = DebeziumConverter.TimestampConverter.convert(timestampEpoch, false);
Assert.assertTrue(formattedTimestamp.equalsIgnoreCase("2021-12-31T18:01:00"));
}
@Test
public void testTimestampConverterMinRange() {
Object timestampEpoch = -2166681362000L;
String formattedTimestamp = DebeziumConverter.TimestampConverter.convert(timestampEpoch, false);
Assert.assertTrue(formattedTimestamp.equalsIgnoreCase(DataTypeRange.CLICKHOUSE_MIN_SUPPORTED_DATETIME));
}
@Test
public void testTimestampConverterMaxRange() {
Object timestampEpoch = 4807440238000L;
String formattedTimestamp = DebeziumConverter.TimestampConverter.convert(timestampEpoch, false);
Assert.assertTrue(formattedTimestamp.equalsIgnoreCase(DataTypeRange.CLICKHOUSE_MAX_SUPPORTED_DATETIME));
}
@Test
public void testDateConverter() {
Integer date = 3652;
java.sql.Date formattedDate = DebeziumConverter.DateConverter.convert(date);
Assert.assertTrue(formattedDate.toString().equalsIgnoreCase("1979-12-31"));
}
@Test
public void testDateConverterMinRange() {
Integer date = -144450000;
java.sql.Date formattedDate = DebeziumConverter.DateConverter.convert(date);
Assert.assertTrue(formattedDate.toString().equalsIgnoreCase(DataTypeRange.CLICKHOUSE_MIN_SUPPORTED_DATE));
}
@Test
public void testDateConverterMaxRange() {
Integer date = 450000;
java.sql.Date formattedDate = DebeziumConverter.DateConverter.convert(date);
Assert.assertTrue(formattedDate.toString().equalsIgnoreCase(DataTypeRange.CLICKHOUSE_MAX_SUPPORTED_DATE));
}
@Test
public void testZonedTimestampConverter() {
String formattedTimestamp = DebeziumConverter.ZonedTimestampConverter.convert("2021-12-31T19:01:00Z");
Assert.assertTrue(formattedTimestamp.equalsIgnoreCase("2021-12-31 19:01:00"));
}
@Test
public void testMicroTimestampConverter() {
String convertedString = DebeziumConverter.MicroTimestampConverter.convert(-248313600000000L);
Assert.assertTrue(convertedString.equalsIgnoreCase("1962-02-18 00:00:00"));
}
}

View File

@ -1,40 +0,0 @@
package com.altinity.clickhouse.sink.connector.db;
import org.junit.Assert;
import org.junit.Test;
public class DBMetadataTest {
@Test
public void testGetSignColumnForCollapsingMergeTree() {
DBMetadata metadata = new DBMetadata();
String createTableDML = "CollapsingMergeTree(signNumberCol) PRIMARY KEY productCode ORDER BY productCode SETTINGS index_granularity = 8192";
String signColumn = metadata.getSignColumnForCollapsingMergeTree(createTableDML);
Assert.assertTrue(signColumn.equalsIgnoreCase("signNumberCol"));
}
@Test
public void testDefaultGetSignColumnForCollapsingMergeTree() {
DBMetadata metadata = new DBMetadata();
String createTableDML = "ReplacingMergeTree() PRIMARY KEY productCode ORDER BY productCode SETTINGS index_granularity = 8192";
String signColumn = metadata.getSignColumnForCollapsingMergeTree(createTableDML);
Assert.assertTrue(signColumn.equalsIgnoreCase("sign"));
}
@Test
public void testGetVersionColumnForReplacingMergeTree() {
DBMetadata metadata = new DBMetadata();
String createTableDML = "ReplacingMergeTree(versionNo) PRIMARY KEY productCode ORDER BY productCode SETTINGS index_granularity = 8192";
String signColumn = metadata.getVersionColumnForReplacingMergeTree(createTableDML);
Assert.assertTrue(signColumn.equalsIgnoreCase("versionNo"));
}
}

View File

@ -1,63 +0,0 @@
package com.altinity.clickhouse.sink.connector.db;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.kafka.common.TopicPartition;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.api.Tag;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DbKafkaOffsetWriterTest {
@Test
@Tag("IntegrationTest")
public void testInsertTopicOffsetMetadata() throws SQLException {
String dbHostName = "localhost";
Integer port = 8123;
String database = "test";
String userName = "root";
String password = "root";
String tableName = "employees";
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null);
DbKafkaOffsetWriter dbKafkaOffsetWriter = new DbKafkaOffsetWriter(dbHostName, port, database, "topic_offset_metadata", userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()));
Map<MutablePair<String, Map<String, Integer>>, List<ClickHouseStruct>> queryToRecordsMap = new HashMap<>();
Map<TopicPartition, Long> result = writer.groupQueryWithRecords(new DbWriterTest().getSampleRecords(), queryToRecordsMap);
dbKafkaOffsetWriter.insertTopicOffsetMetadata(result);
}
@Test
@Tag("IntegrationTest")
public void testGetStoredOffsets() throws SQLException {
String dbHostName = "localhost";
Integer port = 8123;
String database = "test";
String userName = "root";
String password = "root";
String tableName = "employees";
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null);
DbKafkaOffsetWriter dbKafkaOffsetWriter = new DbKafkaOffsetWriter(dbHostName, port, database, "topic_offset_metadata", userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()));
Map<TopicPartition, Long> offsetsMap = dbKafkaOffsetWriter.getStoredOffsets();
Assert.assertTrue(offsetsMap.isEmpty() == false);
}
}

View File

@ -1,352 +0,0 @@
package com.altinity.clickhouse.sink.connector.db;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.converters.ClickHouseConverter;
import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import com.clickhouse.client.data.ClickHouseArrayValue;
import com.clickhouse.jdbc.ClickHouseConnection;
import com.clickhouse.jdbc.ClickHouseDataSource;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Tag;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
public class DbWriterTest {
DbWriter writer;
@Before
public void init() throws SQLException {
String hostName = "remoteClickHouse";
Integer port = 8000;
String database = "employees";
String userName = "test";
String password = "test";
String tableName = "employees";
ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>());
this.writer = new DbWriter(hostName, port, tableName, database, userName, password, config, null);
}
public Struct getKafkaStruct() {
Schema kafkaConnectSchema = SchemaBuilder
.struct()
.field("first_name", Schema.STRING_SCHEMA)
.field("last_name", Schema.STRING_SCHEMA)
.field("quantity", Schema.INT32_SCHEMA)
.field("amount", Schema.FLOAT64_SCHEMA)
.field("employed", Schema.BOOLEAN_SCHEMA)
.build();
Struct kafkaConnectStruct = new Struct(kafkaConnectSchema);
kafkaConnectStruct.put("first_name", "John");
kafkaConnectStruct.put("last_name", "Doe");
kafkaConnectStruct.put("quantity", 100);
kafkaConnectStruct.put("amount", 23.223);
kafkaConnectStruct.put("employed", true);
return kafkaConnectStruct;
}
@Test
public void testGetConnectionUrl() {
String hostName = "remoteClickHouse";
Integer port = 8123;
String database = "employees";
String connectionUrl = writer.getConnectionString(hostName, port, database);
Assert.assertEquals(connectionUrl, "jdbc:clickhouse://remoteClickHouse:8123/employees");
}
@Test
public void testIsColumnTypeDate64() {
boolean result = DbWriter.isColumnDateTime64("Nullable(DateTime64(3))");
}
@Test
@Tag("IntegrationTest")
public void testGetColumnsDataTypesForTable() {
String dbHostName = "localhost";
Integer port = 8123;
String database = "test";
String userName = "root";
String password = "root";
String tableName = "employees";
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null);
writer.getColumnsDataTypesForTable("employees");
}
@Test
@Tag("IntegrationTest")
public void testGetEngineType() {
String dbHostName = "localhost";
Integer port = 8123;
String database = "test";
String userName = "root";
String password = "root";
String tableName = "employees";
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null);
MutablePair<DBMetadata.TABLE_ENGINE, String> result = new DBMetadata().getTableEngineUsingShowTable(writer.conn, "employees");
Assert.assertTrue(result.getLeft() == DBMetadata.TABLE_ENGINE.REPLACING_MERGE_TREE);
Assert.assertTrue(result.getRight().equalsIgnoreCase("ver"));
MutablePair<DBMetadata.TABLE_ENGINE, String> resultProducts = new DBMetadata().getTableEngineUsingShowTable(writer.conn, "products");
Assert.assertTrue(resultProducts.getLeft() == DBMetadata.TABLE_ENGINE.COLLAPSING_MERGE_TREE);
Assert.assertTrue(resultProducts.getRight().equalsIgnoreCase("sign"));
}
@Test
@Tag("IntegrationTest")
public void testGetEngineTypeUsingSystemTables() {
String dbHostName = "localhost";
Integer port = 8123;
String database = "test";
String userName = "root";
String password = "root";
String tableName = "employees";
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null);
MutablePair< DBMetadata.TABLE_ENGINE, String> result = new DBMetadata().getTableEngineUsingSystemTables(writer.conn,
"test", "employees");
Assert.assertTrue(result.getLeft() == DBMetadata.TABLE_ENGINE.REPLACING_MERGE_TREE);
MutablePair<DBMetadata.TABLE_ENGINE, String> result_products = new DBMetadata().getTableEngineUsingSystemTables(writer.conn,
"test", "products");
Assert.assertTrue(result_products.getLeft() == DBMetadata.TABLE_ENGINE.COLLAPSING_MERGE_TREE);
// Table does not exist.
MutablePair<DBMetadata.TABLE_ENGINE, String> result_registration = new DBMetadata().getTableEngineUsingSystemTables(writer.conn,
"test", "registration");
Assert.assertNull(result_registration.getLeft());
MutablePair<DBMetadata.TABLE_ENGINE, String> result_t1 = new DBMetadata().getTableEngineUsingSystemTables(writer.conn,
"test", "t1");
Assert.assertTrue(result_t1.getLeft() == DBMetadata.TABLE_ENGINE.MERGE_TREE);
}
public ConcurrentLinkedQueue<ClickHouseStruct> getSampleRecords() {
ConcurrentLinkedQueue<ClickHouseStruct> records = new ConcurrentLinkedQueue<ClickHouseStruct>();
ClickHouseStruct ch1 = new ClickHouseStruct(10, "topic_1", getKafkaStruct(), 2, System.currentTimeMillis(), null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ClickHouseStruct ch2 = new ClickHouseStruct(8, "topic_1", getKafkaStruct(), 2, System.currentTimeMillis() ,null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ClickHouseStruct ch3 = new ClickHouseStruct(1000, "topic_1", getKafkaStruct(), 2, System.currentTimeMillis(), null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ClickHouseStruct ch4 = new ClickHouseStruct(1020, "topic_1", getKafkaStruct(), 3, System.currentTimeMillis(), null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ClickHouseStruct ch5 = new ClickHouseStruct(1400, "topic_2", getKafkaStruct(), 2, System.currentTimeMillis(), null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ClickHouseStruct ch6 = new ClickHouseStruct(1010, "topic_2", getKafkaStruct(), 2, System.currentTimeMillis(), null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ClickHouseStruct ch7 = new ClickHouseStruct(-1, "topic_2", getKafkaStruct(), 2, System.currentTimeMillis(), null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ClickHouseStruct ch8 = new ClickHouseStruct(210, "topic_2", getKafkaStruct(), 2, System.currentTimeMillis(), null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
records.add(ch1);
records.add(ch2);
records.add(ch3);
records.add(ch4);
records.add(ch5);
records.add(ch6);
records.add(ch7);
records.add(ch8);
return records;
}
@Test
public void testGroupRecords() {
String hostName = "remoteClickHouse";
Integer port = 8123;
String database = "test";
String userName = "root";
String password = "root";
String tableName = "employees";
String connectionUrl = writer.getConnectionString(hostName, port, database);
Properties properties = new Properties();
properties.setProperty("client_name", "Test_1");
ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>());
DbWriter dbWriter = new DbWriter(hostName, port, database, tableName, userName, password, config, null);
Map<MutablePair<String, Map<String, Integer>>, List<ClickHouseStruct>> queryToRecordsMap = new HashMap<>();
Map<TopicPartition, Long> result = dbWriter.groupQueryWithRecords(getSampleRecords()
, queryToRecordsMap);
Assert.assertTrue(result.isEmpty() == false);
long topic_1_2_offset = result.get(new TopicPartition("topic_1", 2));
Assert.assertTrue(topic_1_2_offset == 1000);
long topic_1_3_offset = result.get(new TopicPartition("topic_1", 3));
Assert.assertTrue(topic_1_3_offset == 1020);
long topic_2_2_offset = result.get(new TopicPartition("topic_2", 2));
Assert.assertTrue(topic_2_2_offset == 1400);
}
@Test
@Tag("IntegrationTest")
public void testBatchArrays() {
String hostName = "localhost";
Integer port = 8123;
String database = "test";
String userName = "root";
String password = "root";
String tableName = "test_ch_jdbc_complex_2";
Properties properties = new Properties();
properties.setProperty("client_name", "Test_1");
ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>());
DbWriter dbWriter = new DbWriter(hostName, port, database, tableName, userName, password, config, null);
String url = dbWriter.getConnectionString(hostName, port, database);
String insertQueryTemplate = "insert into test_ch_jdbc_complex_2(col1, col2, col3, col4, col5, col6) values(?, ?, ?, ?, ?, ?)";
try {
ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties);
ClickHouseConnection conn = dataSource.getConnection(userName, password);
PreparedStatement ps = conn.prepareStatement(insertQueryTemplate);
boolean[] boolArray = {true, false, true};
float[] floatArray = {0.012f, 0.1255f, 1.22323f};
ps.setObject(1, "test_string");
ps.setBoolean(2, true);
ps.setObject(3, ClickHouseArrayValue.of(new Object[] {Arrays.asList("one", "two", "three")}));
ps.setObject(4, ClickHouseArrayValue.ofEmpty().update(boolArray));
ps.setObject(5, ClickHouseArrayValue.ofEmpty().update(floatArray));
Map<String, Float> test_map = new HashMap<String, Float>();
test_map.put("2", 0.02f);
test_map.put("3", 0.02f);
ps.setObject(6, Collections.unmodifiableMap(test_map));
// ps.setObject(5, ClickHouseArrayValue.of(new Object[]
// {
// Arrays.asList(new Float(0.2), new Float(0.3))
// }));
ps.addBatch();
ps.executeBatch();
} catch(Exception e) {
System.out.println("Error connecting" + e);
}
}
@Test
@Tag("IntegrationTest")
public void testBatchInsert() {
String hostName = "localhost";
Integer port = 8123;
String database = "test";
String userName = "root";
String password = "root";
String tableName = "employees";
Properties properties = new Properties();
properties.setProperty("client_name", "Test_1");
ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>());
DbWriter dbWriter = new DbWriter(hostName, port, database, tableName, userName, password, config, null);
String url = dbWriter.getConnectionString(hostName, port, database);
String insertQueryTemplate = "insert into employees values(?,?,?,?,?,?)";
try {
ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties);
ClickHouseConnection conn = dataSource.getConnection(userName, password);
PreparedStatement ps = conn.prepareStatement(insertQueryTemplate);
ps.setInt(1, 1);
ps.setDate(2, new java.sql.Date(10000));
ps.setString(3, "John1");
ps.setString(4, "Doe1");
ps.setString(5, "M");
ps.setDate(6, new java.sql.Date(10000));
ps.addBatch();
ps.setInt(1, 2);
ps.setDate(2, new java.sql.Date(10000));
ps.setString(3, "John2");
ps.setString(4, "Doe2");
ps.setString(5, "M");
ps.setDate(6, new java.sql.Date(10000));
ps.addBatch();
ps.setInt(1, 3);
ps.setDate(2, new java.sql.Date(10000));
ps.setInt(3, 22);
ps.setString(4, "012-03-01 08:32:53,431 WARN [2-BAM::Default Agent::Agent:pool-8-thread-1] [JDBCExceptionReporter] SQL Error: 0, SQLState: 22001\n" +
"2012-03-01 08:32:53,431 ERROR [2-BAM::Default Agent::Agent:pool-8-thread-1] [JDBCExceptionReporter] Batch entry 0 insert into TEST_CASE (TEST_CLASS_ID, TEST_CASE_NAME, SUCCESSFUL_RUNS, FAILED_RUNS, AVG_DURATION, FIRST_BUILD_NUM, LAST_BUILD_NUM, TEST_CASE_ID) values ('11993104', A_lot_of_data_goes_here_as_TEST_CASE_NAME, '0', '0', '0', '-1', '-1', '11960535') was aborted. Call getNextException to see the cause.\n" +
"2012-03-01 08:32:53,556 WARN [2-BAM::Default Agent::Agent:pool-8-thread-1] [JDBCExceptionReporter] SQL Error: 0, SQLState: 22001\n" +
"2012-03-01 08:32:53,556 ERROR [2-BAM::Default Agent::Agent:pool-8-thread-1] [JDBCExceptionReporter] ERROR: value too long for type character varying(4000)\n" +
"2012-03-01 08:32:53,556 ERROR [2-BAM::Default Agent::Agent:pool-8-thread-1] [SessionImpl] Could not synchronize database state with session\n" +
"2012-03-01 08:32:53,556 INFO [2-BAM::Default Agent::Agent:pool-8-thread-1] [DefaultErrorHandler] Recording an error: Could not save the build results. Data could be in an inconsistent state. : TWTHREE-MAIN-JOB1 : Hibernate operation: Could not execute JDBC batch update; SQL []; Batch entry 0 insert into TEST_CASE (TEST_CLASS_ID, TEST_CASE_NAME, SUCCESSFUL_RUNS, FAILED_RUNS, AVG_DURATION, FIRST_BUILD_NUM, LAST_BUILD_NUM, TEST_CASE_ID) values ('11993104', A_lot_of_data_goes_here_as_TEST_CASE_NAME, '0', '0', '0', '-1', '-1', '11960535') was aborted. Call getNextException to see the cause.; nested exception is java.sql.BatchUpdateException: Batch entry 0 insert into TEST_CASE (TEST_CLASS_ID, TEST_CASE_NAME, SUCCESSFUL_RUNS, FAILED_RUNS, AVG_DURATION, FIRST_BUILD_NUM, LAST_BUILD_NUM, TEST_CASE_ID) values ('11993104', A_lot_of_data_goes_here_as_TEST_CASE_NAME, '0', '0', '0', '-1', '-1', '11960535') was aborted. Call getNextException to see the cause.\n" +
"2012-03-01 08:32:53,666 FATAL [2-BAM::Default Agent::Agent:pool-8-thread-1] [PlanStatePersisterImpl] Could not save the build results BuildResults: TWTHREE-MAIN-JOB1-659. Data could be in an inconsistent state.\n" +
"org.springframework.dao.DataIntegrityViolationException: Hibernate operation: Could not execute JDBC batch update; SQL []; Batch entry 0 insert into TEST_CASE (TEST_CLASS_ID, TEST_CASE_NAME, SUCCESSFUL_RUNS, FAILED_RUNS, AVG_DURATION, FIRST_BUILD_NUM, LAST_BUILD_NUM, TEST_CASE_ID) values ('11993104', A_lot_of_data_goes_here_as_TEST_CASE_NAME, '0', '0', '0', '-1', '-1', '11960535') was aborted. Call getNextException to see the cause.\n" +
"Caused by: java.sql.BatchUpdateException: Batch entry 0 insert into TEST_CASE (TEST_CLASS_ID, TEST_CASE_NAME, SUCCESSFUL_RUNS, FAILED_RUNS, AVG_DURATION, FIRST_BUILD_NUM, LAST_BUILD_NUM, TEST_CASE_ID) values ('11993104', A_lot_of_data_goes_here_as_TEST_CASE_NAME, '0', '0', '0', '-1', '-1', '11960535') was aborted. Call getNextException to see the cause.\n" +
"\tat org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.handleError(AbstractJdbc2Statement.java:2569)\n" +
"\tat org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1796)\n" +
"\tat org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:407)\n" +
"\tat org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch(AbstractJdbc2Statement.java:2708)\n" +
"\tat com.mchange.v2.c3p0.impl.NewProxyPreparedStatement.executeBatch(NewProxyPreparedStatement.java:1723)\n" +
"\tat net.sf.hibernate.impl.BatchingBatcher.doExecuteBatch(BatchingBatcher.java:54)\n" +
"\tat net.sf.hibernate.impl.BatcherImpl.executeBatch(BatcherImpl.java:128)\n" +
"\tat net.sf.hibernate.impl.BatcherImpl.prepareStatement(BatcherImpl.java:61)\n" +
"\tat net.sf.hibernate.impl.BatcherImpl.prepareStatement(BatcherImpl.java:58)\n" +
"\tat net.sf.hibernate.impl.BatcherImpl.prepareBatchStatement(BatcherImpl.java:111)\n" +
"\tat net.sf.hibernate.persister.EntityPersister.insert(EntityPersister.java:454)\n" +
"\tat net.sf.hibernate.persister.EntityPersister.insert(EntityPersister.java:436)\n" +
"\tat net.sf.hibernate.impl.ScheduledInsertion.execute(ScheduledInsertion.java:37)\n" +
"\tat net.sf.hibernate.impl.SessionImpl.execute(SessionImpl.java:2447)\n" +
"...");
ps.setString(5, "M");
ps.setDate(6, new java.sql.Date(10000));
ps.addBatch();
ps.setInt(1, 2);
ps.setDate(2, new java.sql.Date(10000));
ps.setString(3, "John4");
ps.setString(4, "Doe4");
ps.setString(5, "M");
ps.setDate(6, new java.sql.Date(10000));
ps.addBatch();
int[] result = ps.executeBatch();
for(int i = 0; i < result.length; i++) {
System.out.println("Index:" + i + " Result:" + result[i]);
}
} catch (Exception e) {
System.out.println("Exception" + e);
}
}
}

View File

@ -1,108 +0,0 @@
package com.altinity.clickhouse.sink.connector.db;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class QueryFormatterTest {
static Map<String, String> columnNameToDataTypesMap = new HashMap<>();
static List<Field> fields = new ArrayList<>();
@BeforeClass
public static void initialize() {
columnNameToDataTypesMap.put("customerName", "String");
columnNameToDataTypesMap.put("occupation", "String");
columnNameToDataTypesMap.put("quantity", "UInt32");
columnNameToDataTypesMap.put("_topic", "String");
fields.add(new Field("customerName", 0, Schema.STRING_SCHEMA));
fields.add(new Field("occupation", 1, Schema.STRING_SCHEMA));
fields.add(new Field("quantity", 2, Schema.INT32_SCHEMA));
fields.add(new Field("amount", 3, Schema.FLOAT64_SCHEMA));
fields.add(new Field("employed", 4, Schema.BOOLEAN_SCHEMA));
}
@Test
public void testGetInsertQueryUsingInputFunctionWithKafkaMetaDataEnabled() {
QueryFormatter qf = new QueryFormatter();
String tableName = "products";
boolean includeKafkaMetaData = true;
boolean includeRawData = false;
DBMetadata.TABLE_ENGINE engine = DBMetadata.TABLE_ENGINE.COLLAPSING_MERGE_TREE;
MutablePair<String, Map<String, Integer>> response = qf.getInsertQueryUsingInputFunction(tableName, fields, columnNameToDataTypesMap, includeKafkaMetaData, includeRawData,
null, null, null, null, engine);
String expectedQuery = "insert into products(customerName,occupation,quantity,_topic) select customerName,occupation,quantity,_topic " +
"from input('customerName String,occupation String,quantity UInt32,_topic String')";
Assert.assertTrue(response.left.equalsIgnoreCase(expectedQuery));
}
@Test
public void testGetInsertQueryUsingInputFunctionWithKafkaMetaDataDisabled() {
QueryFormatter qf = new QueryFormatter();
String tableName = "products";
boolean includeKafkaMetaData = false;
boolean includeRawData = false;
DBMetadata.TABLE_ENGINE engine = DBMetadata.TABLE_ENGINE.COLLAPSING_MERGE_TREE;
MutablePair<String, Map<String, Integer>> response = qf.getInsertQueryUsingInputFunction(tableName, fields, columnNameToDataTypesMap,
includeKafkaMetaData, includeRawData, null, null, null, null, engine);
String expectedQuery = "insert into products(customerName,occupation,quantity) select customerName,occupation,quantity from input('customerName String,occupation " +
"String,quantity UInt32')";
Assert.assertTrue(response.left.equalsIgnoreCase(expectedQuery));
}
@Test
public void testGetInsertQueryUsingInputFunctionWithRawDataEnabledButRawColumnNotProvided() {
QueryFormatter qf = new QueryFormatter();
String tableName = "products";
boolean includeKafkaMetaData = false;
boolean includeRawData = true;
DBMetadata.TABLE_ENGINE engine = DBMetadata.TABLE_ENGINE.COLLAPSING_MERGE_TREE;
MutablePair<String, Map<String, Integer>> response = qf.getInsertQueryUsingInputFunction(tableName, fields, columnNameToDataTypesMap,
includeKafkaMetaData, includeRawData, null, null, null, null, engine);
String expectedQuery = "insert into products(customerName,occupation,quantity) select customerName,occupation,quantity from input('customerName String,occupation " +
"String,quantity UInt32')";
Assert.assertTrue(response.left.equalsIgnoreCase(expectedQuery));
}
@Test
public void testGetInsertQueryUsingInputFunctionWithRawDataEnabledButRawColumnProvided() {
QueryFormatter qf = new QueryFormatter();
String tableName = "products";
boolean includeKafkaMetaData = false;
boolean includeRawData = true;
DBMetadata.TABLE_ENGINE engine = DBMetadata.TABLE_ENGINE.COLLAPSING_MERGE_TREE;
columnNameToDataTypesMap.put("raw_column", "String");
MutablePair<String, Map<String, Integer>> response = qf.getInsertQueryUsingInputFunction(tableName, fields, columnNameToDataTypesMap,
includeKafkaMetaData, includeRawData, "raw_column", null, null,
null, engine);
String expectedQuery = "insert into products(customerName,occupation,quantity,raw_column) select customerName,occupation,quantity,raw_column from input('customerName String,occupation String,quantity UInt32,raw_column String')";
Assert.assertTrue(response.left.equalsIgnoreCase(expectedQuery));
}
}

View File

@ -1,50 +0,0 @@
package com.altinity.clickhouse.sink.connector.db;
import com.altinity.clickhouse.sink.connector.converters.ClickHouseConverter;
import com.altinity.clickhouse.sink.connector.metadata.TableMetaDataWriter;
import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TableMetaDataWriterTest {
public static final Logger log = LoggerFactory.getLogger(TableMetaDataWriter.class);
@Test
public void testConvertRecordToJSON() {
Schema kafkaConnectSchema = SchemaBuilder
.struct()
.field("first_name", Schema.STRING_SCHEMA)
.field("last_name", Schema.STRING_SCHEMA)
.field("quantity", Schema.INT32_SCHEMA)
.field("amount", Schema.FLOAT64_SCHEMA)
.field("employed", Schema.BOOLEAN_SCHEMA)
.build();
Struct kafkaConnectStruct = new Struct(kafkaConnectSchema);
kafkaConnectStruct.put("first_name", "John");
kafkaConnectStruct.put("last_name", "Doe");
kafkaConnectStruct.put("quantity", 100);
kafkaConnectStruct.put("amount", 23.223);
kafkaConnectStruct.put("employed", true);
ClickHouseStruct s = new ClickHouseStruct(1, "test-topic", kafkaConnectStruct, 12,
122323L, null, kafkaConnectStruct, null, ClickHouseConverter.CDC_OPERATION.UPDATE);
String jsonString = null;
try {
jsonString = TableMetaDataWriter.convertRecordToJSON(s.getAfterStruct());
} catch(Exception e) {
log.error("Exception converting record to JSON" + e);
}
String expectedString = "{\"amount\":23.223,\"quantity\":100,\"last_name\":\"Doe\",\"first_name\":\"John\",\"employed\":true}";
Assert.assertNotNull(jsonString);
Assert.assertTrue(jsonString.equalsIgnoreCase(expectedString));
}
}

View File

@ -1,20 +0,0 @@
package com.altinity.clickhouse.sink.connector.db.operations;
import org.junit.Assert;
import org.junit.Test;
public class ClickHouseAlterTableTest extends ClickHouseAutoCreateTableTest {
@Test
public void createAlterTableSyntaxTest() {
ClickHouseAlterTable cat = new ClickHouseAlterTable();
String alterTableQuery = cat.createAlterTableSyntax("employees",
this.getExpectedColumnToDataTypesMap(), ClickHouseAlterTable.ALTER_TABLE_OPERATION.ADD);
String expectedQuery = "ALTER TABLE employees add column `amount` Float64,add column `occupation` String,add column `quantity` Int32,add column `amount_1` Float32,add column `customerName` String,add column `blob_storage` String,add column `employed` Bool";
Assert.assertTrue(alterTableQuery.equalsIgnoreCase(expectedQuery));
System.out.println("Alter table query" + alterTableQuery);
}
}

View File

@ -1,167 +0,0 @@
package com.altinity.clickhouse.sink.connector.db.operations;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.DbWriter;
import com.clickhouse.client.ClickHouseDataType;
import com.clickhouse.jdbc.ClickHouseConnection;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Tag;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
public class ClickHouseAutoCreateTableTest {
Map<String, String> columnToDataTypesMap;
ClickHouseConnection conn;
@Before
public void initialize() throws SQLException {
this.columnToDataTypesMap = getExpectedColumnToDataTypesMap();
// this.columnToDataTypesMap.put("customer_id", "Int32");
// this.columnToDataTypesMap.put("address", "String");
// this.columnToDataTypesMap.put("first_name", "String");
// this.columnToDataTypesMap.put("amount", "Int32");
String hostName = "localhost";
Integer port = 8123;
String database = "test";
String userName = "root";
String password = "root";
String tableName = "auto_create_table";
ClickHouseSinkConnectorConfig config= new ClickHouseSinkConnectorConfig(new HashMap<>());
DbWriter writer = new DbWriter(hostName, port, database, tableName, userName, password, config, null);
this.conn = writer.getConnection();
}
protected Field[] createFields() {
ArrayList<Field> fields = new ArrayList<>();
fields.add(new Field("customerName", 0, Schema.STRING_SCHEMA));
fields.add(new Field("occupation", 1, Schema.STRING_SCHEMA));
fields.add(new Field("quantity", 2, Schema.INT32_SCHEMA));
fields.add(new Field("amount_1", 3, Schema.FLOAT32_SCHEMA));
fields.add(new Field("amount", 4, Schema.FLOAT64_SCHEMA));
fields.add(new Field("employed", 5, Schema.BOOLEAN_SCHEMA));
fields.add(new Field("blob_storage", 6, SchemaBuilder.type(Schema.BYTES_SCHEMA.type()).
name(Decimal.LOGICAL_NAME).build()));
Schema decimalSchema = SchemaBuilder.type(Schema.BYTES_SCHEMA.type()).parameter("scale", "10")
.parameter("connect.decimal.precision", "30")
.name(Decimal.LOGICAL_NAME).build();
fields.add(new Field("blob_storage_scale", 6, decimalSchema));
Field[] result = new Field[fields.size()];
fields.toArray(result);
return result;
}
protected Map<String, String> getExpectedColumnToDataTypesMap() {
Map<String, String> columnToDataTypesMap = new HashMap<>();
columnToDataTypesMap.put("customerName", ClickHouseDataType.String.name());
columnToDataTypesMap.put("occupation", ClickHouseDataType.String.name());
columnToDataTypesMap.put("quantity", ClickHouseDataType.Int32.name());
columnToDataTypesMap.put("amount_1", ClickHouseDataType.Float32.name());
columnToDataTypesMap.put("amount", ClickHouseDataType.Float64.name());
columnToDataTypesMap.put("employed", ClickHouseDataType.Bool.name());
columnToDataTypesMap.put("blob_storage", ClickHouseDataType.String.name());
return columnToDataTypesMap;
}
@Test
public void getColumnNameToCHDataTypeMappingTest() {
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();
Field[] fields = createFields();
Map<String, String> colNameToDataTypeMap = act.getColumnNameToCHDataTypeMapping(fields);
Map<String, String> expectedColNameToDataTypeMap = getExpectedColumnToDataTypesMap();
Assert.assertTrue(colNameToDataTypeMap.equals(expectedColNameToDataTypeMap));
Assert.assertFalse(colNameToDataTypeMap.isEmpty());
}
@Test
public void testCreateTableSyntax() {
ArrayList<String> primaryKeys = new ArrayList<>();
primaryKeys.add("customerName");
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();
String query = act.createTableSyntax(primaryKeys, "auto_create_table", createFields(), this.columnToDataTypesMap);
String expectedQuery = "CREATE TABLE auto_create_table(`amount` Int32,`address` String,`customer_id` Int32,`first_name` String,`sign` Int8,`ver` UInt64) ENGINE = ReplacingMergeTree(ver) PRIMARY KEY(customer_id) ORDER BY(customer_id)";
Assert.assertTrue(query.equalsIgnoreCase(expectedQuery));
}
@Test
public void testCreateTableEmptyPrimaryKey() {
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();
String query = act.createTableSyntax(null, "auto_create_table", createFields(), this.columnToDataTypesMap);
String expectedQuery = "CREATE TABLE auto_create_table(`amount` Int32,`address` String,`customer_id` Int32,`first_name` String,`sign` Int8,`ver` UInt64) ENGINE = ReplacingMergeTree(ver) ORDER BY(ver)";
Assert.assertTrue(query.equalsIgnoreCase(expectedQuery));
}
@Test
public void testCreateTableMultiplePrimaryKeys() {
ArrayList<String> primaryKeys = new ArrayList<>();
primaryKeys.add("customer_id");
primaryKeys.add("customer_name");
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();
String query = act.createTableSyntax(primaryKeys, "auto_create_table", createFields(), this.columnToDataTypesMap);
String expectedQuery = "CREATE TABLE auto_create_table(`amount` Int32,`address` String,`customer_id` Int32,`first_name` String,`sign` Int8,`ver` UInt64) ENGINE = ReplacingMergeTree(ver) PRIMARY KEY(customer_id,customer_name) ORDER BY(customer_id,customer_name)";
Assert.assertTrue(query.equalsIgnoreCase(expectedQuery));
System.out.println(query);
}
@Test
@Tag("IntegrationTest")
public void testCreateNewTable() {
String dbHostName = "localhost";
Integer port = 8123;
String database = "test";
String userName = "root";
String password = "root";
String tableName = "employees";
DbWriter writer = new DbWriter(dbHostName, port, database, tableName, userName, password,
new ClickHouseSinkConnectorConfig(new HashMap<>()), null);
ClickHouseAutoCreateTable act = new ClickHouseAutoCreateTable();
ArrayList<String> primaryKeys = new ArrayList<>();
primaryKeys.add("customerName");
try {
act.createNewTable(primaryKeys, "auto_create_table", this.createFields(), writer.getConnection());
} catch(SQLException se) {
Assert.assertTrue(false);
}
}
}

View File

@ -1,59 +0,0 @@
package com.altinity.clickhouse.sink.connector.deduplicator;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkTaskTest;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class DeDuplicatorTest {
@Test
public void testIsNew() {
Map<String, String> properties = new HashMap<String, String>();
properties.put(ClickHouseSinkConnectorConfigVariables.DEDUPLICATION_POLICY, "new");
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(properties);
DeDuplicator dedupe = new DeDuplicator(config);
String topic = "products";
String keyField = "productId";
String key = "11";
String valueField = "amount";
String value = "2000";
Long timestamp1 = System.currentTimeMillis();
// Same key
SinkRecord recordOne = ClickHouseSinkTaskTest.spoofSinkRecord(topic, keyField, key, valueField, value,
TimestampType.NO_TIMESTAMP_TYPE, timestamp1);
boolean result1 = dedupe.isNew(topic, recordOne);
Assert.assertTrue(result1 == true);
long timestamp2 = System.currentTimeMillis();
SinkRecord recordTwo = ClickHouseSinkTaskTest.spoofSinkRecord(topic, keyField, key, valueField, value,
TimestampType.NO_TIMESTAMP_TYPE, timestamp2);
boolean result2 = dedupe.isNew(topic, recordTwo);
Assert.assertTrue(result2 == false);
// End: Send key
// Different key.
SinkRecord recordDifferentKey = ClickHouseSinkTaskTest.spoofSinkRecord(topic, keyField, "22", valueField, value,
TimestampType.NO_TIMESTAMP_TYPE, timestamp2);
boolean resultDifferentKey = dedupe.isNew(topic, recordDifferentKey);
Assert.assertTrue(resultDifferentKey == false);
// Test Different Topic.
boolean resultDifferentTopic = dedupe.isNew("employees", recordTwo);
Assert.assertTrue(resultDifferentTopic == true);
}
}

View File

@ -1,89 +0,0 @@
package com.altinity.clickhouse.sink.connector.executor;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.converters.ClickHouseConverter;
import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class ClickHouseBatchRunnableTest {
ConcurrentHashMap<String, ConcurrentLinkedQueue<ClickHouseStruct>>
records = new ConcurrentHashMap<>();
Map<String, String> topic2TableMap = new HashMap<>();
@Before
public void initTest() {
ClickHouseStruct ch1 = new ClickHouseStruct(10, "SERVER5432.test.customers", getKafkaStruct(), 2, System.currentTimeMillis(), null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ClickHouseStruct ch2 = new ClickHouseStruct(8, "SERVER5432.test.customers", getKafkaStruct(), 2, System.currentTimeMillis() ,null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ClickHouseStruct ch3 = new ClickHouseStruct(1000, "SERVER5432.test.customers", getKafkaStruct(), 2, System.currentTimeMillis(), null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ClickHouseStruct ch4 = new ClickHouseStruct(1020, "SERVER5432.test.products", getKafkaStruct(), 3, System.currentTimeMillis(), null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ClickHouseStruct ch5 = new ClickHouseStruct(1400, "SERVER5432.test.products", getKafkaStruct(), 2, System.currentTimeMillis(), null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ClickHouseStruct ch6 = new ClickHouseStruct(1010, "SERVER5432.test.products", getKafkaStruct(), 2, System.currentTimeMillis(), null, getKafkaStruct(), null, ClickHouseConverter.CDC_OPERATION.CREATE);
ConcurrentLinkedQueue<ClickHouseStruct> customersQueue = new ConcurrentLinkedQueue<>();
customersQueue.add(ch1);
customersQueue.add(ch2);
customersQueue.add(ch3);
ConcurrentLinkedQueue<ClickHouseStruct> productsQueue = new ConcurrentLinkedQueue<>();
productsQueue.add(ch1);
productsQueue.add(ch2);
productsQueue.add(ch3);
records.put("SERVER5432.test.customers", customersQueue);
records.put("SERVER5432.test.products", productsQueue);
this.topic2TableMap.put("SERVER5432.test.customers", "customers");
this.topic2TableMap.put("SERVER5432.test.products", "products");
this.topic2TableMap.put("SERVER5432.test.employees", "employees");
}
public Struct getKafkaStruct() {
Schema kafkaConnectSchema = SchemaBuilder
.struct()
.field("first_name", Schema.STRING_SCHEMA)
.field("last_name", Schema.STRING_SCHEMA)
.field("quantity", Schema.INT32_SCHEMA)
.field("amount", Schema.FLOAT64_SCHEMA)
.field("employed", Schema.BOOLEAN_SCHEMA)
.build();
Struct kafkaConnectStruct = new Struct(kafkaConnectSchema);
kafkaConnectStruct.put("first_name", "John");
kafkaConnectStruct.put("last_name", "Doe");
kafkaConnectStruct.put("quantity", 100);
kafkaConnectStruct.put("amount", 23.223);
kafkaConnectStruct.put("employed", true);
return kafkaConnectStruct;
}
@Test
public void testGetTableNameFromTopic() {
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(new HashMap<String, String>());
ClickHouseBatchRunnable run = new ClickHouseBatchRunnable(this.records, config, this.topic2TableMap);
String tableName = run.getTableFromTopic("SERVER5432.test.customers");
Assert.assertTrue(tableName.equalsIgnoreCase("customers"));
}
}

View File

@ -1,41 +0,0 @@
package com.altinity.clickhouse.sink.connector.model;
import com.altinity.clickhouse.sink.connector.converters.ClickHouseConverter;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class ClickHouseStructTest {
ClickHouseStruct st;
@Before
public void initialize() {
}
@Test
public void testGtid() {
Map<String, Object> metaData = new HashMap<String, Object>();
metaData.put("gtid", "0010-122323-0232323:512345");
String keyField = "customer";
Schema basicKeySchema = SchemaBuilder
.struct()
.field(keyField, Schema.STRING_SCHEMA)
.build();
st = new ClickHouseStruct(10, "topic_1", new Struct(basicKeySchema), 100,
12322323L, new Struct(basicKeySchema), new Struct(basicKeySchema),
metaData, ClickHouseConverter.CDC_OPERATION.CREATE);
Assert.assertTrue(st.getGtid() == 512345);
}
}