Added logic to automatically map table name to topic name if not provided in the configuration. Added python test to query clickhouse using clickhouse python library.

This commit is contained in:
Kanthi Subramanian 2022-04-27 17:16:37 -04:00
parent 671a0f6f84
commit b3c967ae91
6 changed files with 54 additions and 7 deletions

View File

@ -22,7 +22,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
class Utils {
public class Utils {
private static final Logger LOGGER = LoggerFactory.getLogger(Utils.class);
public static final String TASK_ID = "task_id";
@ -81,6 +81,24 @@ class Utils {
return topic2Table;
}
/**
* Function to get Table name from kafka connect topic
* @param topicName
* @return Table Name
*/
public static String getTableNameFromTopic(String topicName) {
String tableName = null;
// topic names is of the following format.
// hostname.dbName.tableName
String[] splitName = topicName.split("\\.");
if(splitName.length == 3) {
tableName = splitName[2];
}
return tableName;
}
/**
* Function to valid table name passed in settings
* //ToDO: Implement the function.

View File

@ -3,6 +3,7 @@ package com.altinity.clickhouse.sink.connector.executor;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
import com.altinity.clickhouse.sink.connector.Metrics;
import com.altinity.clickhouse.sink.connector.Utils;
import com.altinity.clickhouse.sink.connector.db.DbWriter;
import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import com.codahale.metrics.Timer;
@ -51,7 +52,13 @@ public class ClickHouseBatchRunnable implements Runnable {
for (Map.Entry<String, ConcurrentLinkedQueue<ClickHouseStruct>> entry : this.records.entrySet()) {
String topicName = entry.getKey();
//The user parameter will override the topic mapping to table.
String tableName = this.topic2TableMap.get(topicName);
if(tableName == null) {
tableName = Utils.getTableNameFromTopic(topicName);
}
// Initialize Timer to track time taken to transform and insert to Clickhouse.
Timer timer = Metrics.timer("Bulk Insert: " + blockUuid + " Size:" + records.size());
Timer.Context context = timer.time();

View File

@ -22,5 +22,13 @@ public class UtilsTest {
}
@Test
public void testGetTableNameFromTopic() {
String topicName = "SERVER5432.test.employees";
String tableName = Utils.getTableNameFromTopic(topicName);
Assert.assertEquals(tableName, "employees");
}
}

View File

@ -1,12 +1,20 @@
from clickhouse_driver import Client
class ClickHouseConnection:
def __init__(self):
pass
def __init__(self, host_name, username, password, database):
self.host_name = host_name
self.username = username
self.password = password
self.database = database
def create_connection(self):
pass
self.client = Client(self.host_name,
user=self.username,
password=self.password,
database=self.database)
def execute_sql(self):
pass
def execute_sql(self, query):
result = self.client.execute(query)
return result

View File

@ -1,2 +1,3 @@
clickhouse-driver==0.2.3
mysql-connector==2.2.9
faker==13.3.5

View File

@ -1,6 +1,7 @@
import unittest
from datetime import date
from tests.clickhouse_connection import ClickHouseConnection
from tests.mysql_connection import MySqlConnection
from fake_data import FakeData
@ -67,8 +68,12 @@ class MyTestCase(unittest.TestCase):
fake_row_ch_invalid_date_range = FakeData.get_fake_employees_row_with_out_of_range_datetime(122324, date(9999, 12, 30), date(9999, 12, 31))
conn.execute_sql(sql_query, fake_row_ch_invalid_date_range)
clickhouse_conn = ClickHouseConnection(host_name='localhost', username='root', password='root', database='test')
clickhouse_conn.create_connection()
result = clickhouse_conn.execute_sql('select * from products')
conn.close
print(result)
conn.close()
def generate_products_fake_records(self):