Merge remote-tracking branch 'altinity/mysql_json_converter' into main

This commit is contained in:
Vladislav Klimenko 2022-04-01 23:14:44 +03:00
commit 20ba705676
23 changed files with 300426 additions and 15 deletions

20
doc/Milestones.md Normal file
View File

@ -0,0 +1,20 @@
MySQL Tests
- Common data types test
- Batch Insert from MySQL
- JSON support
General
- Fix logging framework, replace slf4j - log4j to version 2
- Unit tests to support different CDC events
- Focusing only on JSON converter.
March 30 Week - Milestones
- Add ThreadPool to buffer records
- Add logic to flush records once a limit is reached
- Change from clickhouse-client to clickhouse-jdbc to use PreparedStatement
- Fix Altinity co
Questions:
- Focus on Inserts
- Support Updates and Deletes - CollapsingMergeTree and ReplacingMergeTree(???)
-

24
doc/SETUP.md Normal file
View File

@ -0,0 +1,24 @@
For local setup, run Docker compose in docker
directory. It will start
- MySQL
- RedPanda(Kafka)
- Clickhouse
ToDO: Create Kafka connector image with Mysql
` cd docker`
`docker-compose up`
Create JAR file by running the following command and copy to the /libs directory of Kafka.
` mvn install`
Copy MYSQL libs from container/libs to the kafka libs directory.
Start the Kafka connect process and pass the properties file
for both MYSQL and Clickhouse properties.
`./connect-standalone.sh ../config/connect-standalone.properties
../../kafka-connect-clickhouse/kcch-connector/src/main/config/mysql-debezium.properties
../../kafka-connect-clickhouse/kcch-connector/src/main/config/clickhouse-sink.properties`

35
docker/docker-compose.yml Normal file
View File

@ -0,0 +1,35 @@
version: "2.3"
# Ubuntu , set this for redpanda to start
# https://sort.veritas.com/public/documents/HSO/2.0/linux/productguides/html/hfo_admin_ubuntu/ch04s03.htm
# Clickhouse Table Schema
# create table test(id int, message String) ENGINE=MergeTree() PRIMARY KEY id;
services:
kafka:
image: vectorized/redpanda
ports:
- "8081:8081"
- "8082:8082"
- "9091:9091"
- "9092:9092"
clickhouse:
image: yandex/clickhouse-server
ports:
- "8123:8123"
ulimits:
nofile:
soft: "262144"
hard: "262144"
environment:
- CLICKHOUSE_USER=admin
- CLICKHOUSE_PASSWORD=root
- CLICKHOUSE_DB=test
mysql:
image: mysql
command: --default-authentication-plugin=mysql_native_password
restart: always
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: test
ports:
- "3306:3306"

300024
docker/load_employees.dump Normal file

File diff suppressed because it is too large Load Diff

11
docker/mysql_db.sql Normal file
View File

@ -0,0 +1,11 @@
CREATE TABLE employees (
emp_no INT NOT NULL,
birth_date DATE NOT NULL,
first_name VARCHAR(14) NOT NULL,
last_name VARCHAR(16) NOT NULL,
gender ENUM ('M','F') NOT NULL,
hire_date DATE NOT NULL,
PRIMARY KEY (emp_no)
);
SOURCE load_employees.dump

15
pom.xml
View File

@ -389,6 +389,11 @@
<version>5.5.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>1.8.1.Final</version>
</dependency>
<!--JDBC driver for building connection with Clickhouse-->
<!-- <dependency>
@ -399,8 +404,16 @@
<dependency>
<groupId>com.clickhouse</groupId>
<!-- or clickhouse-grpc-client if you prefer gRPC -->
<artifactId>clickhouse-http-client</artifactId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2-patch7</version>
<!-- below is only needed when all you want is a shaded jar -->
<classifier>http</classifier>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

View File

@ -0,0 +1,10 @@
name=clickhouseconnector
connector.class=com.kafka.connect.clickhouse.ClickHouseSinkConnector
clickhouse.server.url=http://localhost:5322
clickhouse.server.user=admin
clickhouse.server.pass=password
clickhouse.server.database=default
topics=local_mysql3234.test.test_hello2
clickhouse.topic2table.map=topic1:table1,topic2:table2
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

View File

@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=

View File

@ -0,0 +1,16 @@
connector.class= io.debezium.connector.mysql.MySqlConnector
snapshot.locking.mode=minimal
database.user=root
database.history.kafka.bootstrap.servers=localhost:9092
database.history.kafka.topic=test_history2
database.server.name=local_mysql3234
heartbeat.interval.ms=5000
database.port=3306
database.whitelist=test
database.hostname=localhost
database.password=root
database.history.kafka.recovery.poll.interval.ms=5000
name=mysql_conn_new323
snapshot.mode=initial
snapshot.delay.ms=10000
include.schema.changes=true

View File

@ -0,0 +1,4 @@
clickhouse.server.url='http://localhost:5322'
clickhouse.server.user='admin'
clickhouse.server.pass='password'
clickhouse.server.database='default'

View File

@ -1,29 +1,158 @@
package com.altinity.clickhouse.sink.connector.db;
import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseCredentials;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.jdbc.ClickHouseConnection;
import com.clickhouse.jdbc.ClickHouseDataSource;
import io.debezium.time.Time;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import java.text.MessageFormat;
import io.debezium.time.Date;
import java.sql.PreparedStatement;
import java.util.List;
import java.util.Properties;
/**
* Class that abstracts all functionality
* related to interacting with Clickhouse DB.
*/
public class DbWriter {
ClickHouseNode server;
//ClickHouseNode server;
ClickHouseConnection conn;
/**
* Constructor to create Clickhouse DB connection.
*/
public DbWriter() {
// Keep a singleton of the connection to clickhouse.
this.server = ClickHouseNode.of("localhost", ClickHouseProtocol.HTTP, 8123, "my_db");
//ToDo: Read from Config
String url = "jdbc:ch://localhost/test";
String clientName = "Agent_1";
String userName = "admin";
String password = "root";
this.createConnection(url, clientName, userName, password);
}
/**
* Function to create Connection using the JDBC Driver
* @param url url with the JDBC format jdbc:ch://localhost/test
* @param clientName Client Name
* @param userName UserName
* @param password Password
*/
public void createConnection(String url, String clientName, String userName, String password) {
try {
Properties properties = new Properties();
properties.setProperty("client_name", clientName);
ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties);
this.conn = dataSource.getConnection(userName, password);
} catch(Exception e) {
System.out.println("Error creating SQL connection" + e);
}
}
/**
* Function to retrieve Clickhouse http client Connection
* @return
*/
private ClickHouseNode getHttpConnection() {
ClickHouseCredentials credentials = ClickHouseCredentials.fromUserAndPassword("admin", "root");
return ClickHouseNode.builder().credentials(credentials).database("test").port(8123).host("localhost").build();
}
public void insert(String table, List<String> rows, List<String> values) {
String insertQuery = MessageFormat.format("insert into {0} {1} values({2})",
table, rows.toArray(), values.toArray());
if(this.server != null) {
ClickHouseClient.send(this.server, insertQuery);
} else {
// Error .
/**
* Formatter for Raw Insert SQL query with placeholders for values
* with this format insert into <tablename> values(?, ?, ?, )
* @param tableName Table Name
* @param numFields Number of fields with placeholders
* @return
*/
public String getInsertQuery(String tableName, int numFields) {
StringBuffer insertQuery = new StringBuffer().append("insert into ")
.append(tableName).append(" values(");
for(int i = 0; i < numFields; i++) {
insertQuery.append("?");
if (i == numFields - 1) {
insertQuery.append(")");
} else {
insertQuery.append(",");
}
}
return insertQuery.toString();
}
/**
* Function where the Kafka connect data types
* are mapped to Clickhouse data types and a batch insert is performed.
* @param table Table Name
* @param afterValue after Value (With Insert: before is always empty_
* @param fields Kafka connect fields
*/
public void insert(String table, Struct afterValue, List<Field> fields){
table = "test";
String insertQueryTemplate = this.getInsertQuery(table, fields.size());
try (PreparedStatement ps = this.conn.prepareStatement(insertQueryTemplate)) {
int index = 1;
for(Field f: fields) {
Schema.Type fieldType = f.schema().type();
Object value = afterValue.get(f);
// Text columns
if(fieldType == Schema.Type.STRING) {
ps.setString(index, (String) value);
}
else if(fieldType == Schema.INT8_SCHEMA.type() ||
fieldType == Schema.INT16_SCHEMA.type() ||
fieldType == Schema.INT32_SCHEMA.type()) {
if(f.schema().name() == Date.SCHEMA_NAME) {
// Date field arrives as INT32 with schema name set to io.debezium.time.Date
ps.setDate(index, (java.sql.Date) value);
} else {
ps.setInt(index, (Integer) value);
}
} else if(fieldType == Schema.FLOAT32_SCHEMA.type() ||
fieldType == Schema.FLOAT64_SCHEMA.type()) {
ps.setFloat(index, (Float) value);
} else if(fieldType == Schema.BOOLEAN_SCHEMA.type()) {
ps.setBoolean(index, (Boolean) value);
}
index++;
}
ps.addBatch(); // append parameters to the query
ps.executeBatch(); // issue the composed query: insert into mytable values(...)(...)...(...)
} catch(Exception e) {
System.out.println("insert Batch exception" + e);
}
}
/**
* Function to insert records using Http Connection.
*/
public void insertUsingHttpConnection() {
// table = "test_hello2";
// String insertQuery = MessageFormat.format("insert into {0} {1} values({2})",
// table, "(id, message)", "1, 'hello'");
//// if(this.server != null) {
//// CompletableFuture<List<ClickHouseResponseSummary>> future = ClickHouseClient.send(this.server, insertQuery);
//// try {
//// future.get();
//// } catch (InterruptedException e) {
//// e.printStackTrace();
//// } catch (ExecutionException e) {
//// e.printStackTrace();
//// }
//// } else {
//// // Error .
//// }
}
}

View File

@ -0,0 +1,4 @@
package com.altinity.clickhouse.sink.connector;
public class ClickHouseSinkConnectorTest {
}

View File

@ -0,0 +1,62 @@
package com.altinity.clickhouse.sink.connector.converters;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Test;
public class ClickHouseConverterTest {
@Test
public void testConvert() {
ClickHouseConverter converter = new ClickHouseConverter();
SinkRecord record = spoofSinkRecord("test", "key", "k", "value", "v",
TimestampType.NO_TIMESTAMP_TYPE, null);
converter.convert(record);
}
/**
* Utility method for spoofing SinkRecords that should be passed to SinkTask.put()
* @param topic The topic of the record.
* @param keyField The field name for the record key; may be null.
* @param key The content of the record key; may be null.
* @param valueField The field name for the record value; may be null
* @param value The content of the record value; may be null
* @param timestampType The type of timestamp embedded in the message
* @param timestamp The timestamp in milliseconds
* @return The spoofed SinkRecord.
*/
public static SinkRecord spoofSinkRecord(String topic, String keyField, String key,
String valueField, String value,
TimestampType timestampType, Long timestamp) {
Schema basicKeySchema = null;
Struct basicKey = null;
if (keyField != null) {
basicKeySchema = SchemaBuilder
.struct()
.field(keyField, Schema.STRING_SCHEMA)
.build();
basicKey = new Struct(basicKeySchema);
basicKey.put(keyField, key);
}
Schema basicValueSchema = null;
Struct basicValue = null;
if (valueField != null) {
basicValueSchema = SchemaBuilder
.struct()
.field(valueField, Schema.STRING_SCHEMA)
.build();
basicValue = new Struct(basicValueSchema);
basicValue.put(valueField, value);
}
return new SinkRecord(topic, 0, basicKeySchema, basicKey,
basicValueSchema, basicValue, 0, timestamp, timestampType);
}
}

View File

@ -0,0 +1,18 @@
package com.altinity.clickhouse.sink.connector.db;
import org.junit.Assert;
import org.junit.Test;
public class DbWriterTest {
@Test
public void testInsertQuery() {
DbWriter writer = new DbWriter();
String query = writer.getInsertQuery("products", 4);
Assert.assertEquals(query, "insert into products values(?,?,?,?)");
}
}