diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/Utils.java b/src/main/java/com/altinity/clickhouse/sink/connector/Utils.java index 06b4c22..8011157 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/Utils.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/Utils.java @@ -22,7 +22,7 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; -class Utils { +public class Utils { private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class); public static final String TASK_ID = "task_id"; @@ -81,6 +81,24 @@ class Utils { return topic2Table; } + /** + * Function to get Table name from kafka connect topic + * @param topicName + * @return Table Name + */ + public static String getTableNameFromTopic(String topicName) { + String tableName = null; + + + // topic names is of the following format. + // hostname.dbName.tableName + String[] splitName = topicName.split("\\."); + if(splitName.length == 3) { + tableName = splitName[2]; + } + + return tableName; + } /** * Function to valid table name passed in settings * //ToDO: Implement the function. diff --git a/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java b/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java index 7697bfe..b7f3e67 100644 --- a/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java +++ b/src/main/java/com/altinity/clickhouse/sink/connector/executor/ClickHouseBatchRunnable.java @@ -3,6 +3,7 @@ package com.altinity.clickhouse.sink.connector.executor; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig; import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables; import com.altinity.clickhouse.sink.connector.Metrics; +import com.altinity.clickhouse.sink.connector.Utils; import com.altinity.clickhouse.sink.connector.db.DbWriter; import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct; import com.codahale.metrics.Timer; @@ -51,7 +52,13 @@ public class ClickHouseBatchRunnable implements Runnable { for (Map.Entry> entry : this.records.entrySet()) { String topicName = entry.getKey(); + + //The user parameter will override the topic mapping to table. String tableName = this.topic2TableMap.get(topicName); + if(tableName == null) { + tableName = Utils.getTableNameFromTopic(topicName); + } + // Initialize Timer to track time taken to transform and insert to Clickhouse. Timer timer = Metrics.timer("Bulk Insert: " + blockUuid + " Size:" + records.size()); Timer.Context context = timer.time(); diff --git a/src/test/com/altinity/clickhouse/sink/connector/UtilsTest.java b/src/test/com/altinity/clickhouse/sink/connector/UtilsTest.java index 53afa5b..157877b 100644 --- a/src/test/com/altinity/clickhouse/sink/connector/UtilsTest.java +++ b/src/test/com/altinity/clickhouse/sink/connector/UtilsTest.java @@ -22,5 +22,13 @@ public class UtilsTest { } + @Test + public void testGetTableNameFromTopic() { + + String topicName = "SERVER5432.test.employees"; + String tableName = Utils.getTableNameFromTopic(topicName); + + Assert.assertEquals(tableName, "employees"); + } } diff --git a/tests/clickhouse_connection.py b/tests/clickhouse_connection.py index 5d219fb..a11b73e 100644 --- a/tests/clickhouse_connection.py +++ b/tests/clickhouse_connection.py @@ -1,12 +1,20 @@ +from clickhouse_driver import Client class ClickHouseConnection: - def __init__(self): - pass + def __init__(self, host_name, username, password, database): + self.host_name = host_name + self.username = username + self.password = password + self.database = database def create_connection(self): - pass + self.client = Client(self.host_name, + user=self.username, + password=self.password, + database=self.database) - def execute_sql(self): - pass + def execute_sql(self, query): + result = self.client.execute(query) + return result \ No newline at end of file diff --git a/tests/requirements.txt b/tests/requirements.txt index a738b6d..42cc596 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -1,2 +1,3 @@ +clickhouse-driver==0.2.3 mysql-connector==2.2.9 faker==13.3.5 \ No newline at end of file diff --git a/tests/test_datatypes.py b/tests/test_datatypes.py index 1945cde..2b94b47 100644 --- a/tests/test_datatypes.py +++ b/tests/test_datatypes.py @@ -1,6 +1,7 @@ import unittest from datetime import date +from tests.clickhouse_connection import ClickHouseConnection from tests.mysql_connection import MySqlConnection from fake_data import FakeData @@ -67,8 +68,12 @@ class MyTestCase(unittest.TestCase): fake_row_ch_invalid_date_range = FakeData.get_fake_employees_row_with_out_of_range_datetime(122324, date(9999, 12, 30), date(9999, 12, 31)) conn.execute_sql(sql_query, fake_row_ch_invalid_date_range) + clickhouse_conn = ClickHouseConnection(host_name='localhost', username='root', password='root', database='test') + clickhouse_conn.create_connection() + result = clickhouse_conn.execute_sql('select * from products') - conn.close + print(result) + conn.close() def generate_products_fake_records(self):