Added logic to auto create tables - map kafka connect type to CH typesS

This commit is contained in:
Kanthi Subramanian 2022-06-24 16:44:54 -04:00
parent 831fb3332f
commit c3e01a8c81
2 changed files with 48 additions and 14 deletions

View File

@ -0,0 +1,20 @@
package com.altinity.clickhouse.sink.connector.converters;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
public class ClickHouseAutoCreateTable {
public void createClickHouseTableSyntax(Field[] fields) {
for(Field f: fields) {
String colName = f.name();
Schema.Type type = f.schema().type();
String schemaName = f.schema().name();
// Input:
}
}
}

View File

@ -3,6 +3,9 @@ package com.altinity.clickhouse.sink.connector.converters;
import com.clickhouse.client.ClickHouseDataType;
import org.apache.kafka.connect.data.Schema;
import org.apache.commons.lang3.tuple.MutablePair;
import java.util.HashMap;
import java.util.Map;
@ -12,29 +15,40 @@ import java.util.Map;
*
*/
public class ClickHouseDataTypeMapper {
static Map<Schema.Type, ClickHouseDataType> dataTypesMap;
static Map<MutablePair<Schema.Type, String>, ClickHouseDataType> dataTypesMap;
static {
dataTypesMap = new HashMap<>();
dataTypesMap.put(Schema.INT16_SCHEMA.type(), ClickHouseDataType.Int16);
dataTypesMap.put(Schema.INT8_SCHEMA.type(), ClickHouseDataType.Int8);
dataTypesMap.put(Schema.INT32_SCHEMA.type(), ClickHouseDataType.Int32);
dataTypesMap.put(Schema.INT64_SCHEMA.type(), ClickHouseDataType.Int64);
dataTypesMap.put(new MutablePair(Schema.INT16_SCHEMA.type(), null), ClickHouseDataType.Int16);
dataTypesMap.put(new MutablePair(Schema.INT8_SCHEMA.type(), null), ClickHouseDataType.Int8);
dataTypesMap.put(new MutablePair(Schema.INT32_SCHEMA.type(), null), ClickHouseDataType.Int32);
dataTypesMap.put(new MutablePair(Schema.INT64_SCHEMA.type(), null), ClickHouseDataType.Int64);
dataTypesMap.put(Schema.FLOAT32_SCHEMA.type(), ClickHouseDataType.Float32);
dataTypesMap.put(Schema.FLOAT64_SCHEMA.type(), ClickHouseDataType.Float64);
dataTypesMap.put(Schema.STRING_SCHEMA.type(), ClickHouseDataType.String);
// BLOB -> String
dataTypesMap.put(Schema.BYTES_SCHEMA.type(), ClickHouseDataType.String);
// dataTypesMap.put(Schema.FLOAT32_SCHEMA.type(), ClickHouseDataType.Float32);
// dataTypesMap.put(Schema.FLOAT64_SCHEMA.type(), ClickHouseDataType.Float64);
// dataTypesMap.put(Schema.STRING_SCHEMA.type(), ClickHouseDataType.String);
// // BLOB -> String
// dataTypesMap.put(Schema.BYTES_SCHEMA.type(), ClickHouseDataType.String);
}
static ClickHouseDataType getClickHouseDataType(Schema.Type kafkaConnectType, String schemaName) {
ClickHouseDataType matchingDataType = null;
for(Map.Entry<MutablePair<Schema.Type, String>, ClickHouseDataType> entry: dataTypesMap.entrySet()) {
// return dataTypesMap.get(kafkaConnectType);
MutablePair mp = entry.getKey();
if(kafkaConnectType == mp.left && schemaName == mp.right) {
// Founding matching type.
matchingDataType = entry.getValue();
}
static ClickHouseDataType getClickHouseDataType(Schema.Type kafkaConnectType) {
return dataTypesMap.get(kafkaConnectType);
}
return matchingDataType;
}