Modified pom.xml in lightweight to use sink-connector as additional sources.

This commit is contained in:
Kanthi Subramanian 2023-04-05 15:57:32 -04:00
parent d6274f2206
commit f88df8db7c
8 changed files with 347 additions and 63 deletions

View File

@ -8,7 +8,7 @@ on:
jobs:
build:
runs-on: ubuntu-latest
runs-on: [self-hosted, linux, x64]
steps:
- uses: actions/checkout@v2
- name: Set up JDK 11

View File

@ -3,4 +3,4 @@
docker login registry.gitlab.com
docker build . -t clickhouse_debezium_embedded
docker tag clickhouse_debezium_embedded registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded
docker push registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded
docker push registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded

View File

@ -15,6 +15,6 @@ export clickhouse.server.url="clickhouse"
export clickhouse.server.user="root"
export clickhouse.server.pass="root"
export clickhouse.server.port="8123"
export thread.pool.size="20"
export thread.pool.size="10"
export provide.transaction.metadata="false"
export disable.ddl="false"

View File

@ -140,17 +140,6 @@
<version>1.33</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
@ -206,14 +195,14 @@
</dependency>
<dependency>
<!-- <dependency>
<groupId>com.altinity.clickhouse.sink.connector</groupId>
<artifactId>sink</artifactId>
<version>1.0</version>
<scope>system</scope>
<!-- <classifier>sources</classifier> -->
<classifier>sources</classifier>
<systemPath>${project.basedir}/src/main/resources/clickhouse-kafka-sink-connector-0.0.1.jar</systemPath>
</dependency>
</dependency> -->
<!--junit for unit test-->
<dependency>
<groupId>org.junit</groupId>
@ -266,6 +255,268 @@
<version>6.0.53</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-fips</artifactId>
<version>1.0.3</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.8.1</version>
<!-- <scope>provided</scope> -->
<exclusions>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</exclusion>
<exclusion>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.0</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.12.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.6</version>
</dependency>
<!-- <dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>6.2.0</version>
<exclusions>
<exclusion>
<groupId>io.confluent</groupId>
<artifactId>common-utils</artifactId>
</exclusion>
<exclusion>
<groupId>io.confluent</groupId>
<artifactId>common-config</artifactId>
</exclusion>
<exclusion>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>io.swagger</groupId>
<artifactId>swagger-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.5.1</version>
<exclusions>
<exclusion>
<groupId>io.confluent</groupId>
<artifactId>common-utils</artifactId>
</exclusion>
</exclusions>
</dependency> -->
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-connect-avro-converter -->
<!-- <dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>5.5.1</version>
<scope>test</scope>
</dependency> -->
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>2.1.0.Alpha1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<!--JDBC driver for building connection with Clickhouse-->
<!-- <dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2</version>
</dependency> -->
<dependency>
<groupId>com.clickhouse</groupId>
<!-- or clickhouse-grpc-client if you prefer gRPC -->
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2-patch11</version>
<!-- below is only needed when all you want is a shaded jar -->
<classifier>http</classifier>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Micrometer and support to prometheus -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>1.8.5</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>1.8.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.dropwizard.metrics/metrics-core -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.2.3</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jmx</artifactId>
<version>4.2.3</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jvm</artifactId>
<version>4.2.3</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version>
</dependency>
<!--junit for unit test-->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.8.1</version>
<scope>test</scope>
</dependency>
<!--Mockito for unit test-->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.21.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.6</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>com.github.stefanbirkner</groupId>
<artifactId>system-rules</artifactId>
<version>1.19.0</version>
<scope>test</scope>
</dependency>
<!--Kafka JSON converter for SMT unit test-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>3.1.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>3.1.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.17.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.17.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>clickhouse</artifactId>
<version>1.17.6</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<extensions>
@ -276,7 +527,25 @@
</extension>
</extensions>
<plugins>
<plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>../sink-connector/src/main</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<!-- <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
@ -298,7 +567,7 @@
</goals>
</execution>
</executions>
</plugin>
</plugin> -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>

View File

@ -40,10 +40,55 @@ public class DebeziumChangeEventCapture {
private ClickHouseBatchExecutor executor;
private ClickHouseBatchRunnable runnable;
// Records grouped by Topic Name
private ConcurrentHashMap<String, ConcurrentLinkedQueue<ClickHouseStruct>> records;
private void performDDLOperation(String DDL, ClickHouseSinkConnectorConfig config) {
StringBuffer clickHouseQuery = new StringBuffer();
MySQLDDLParserService mySQLDDLParserService = new MySQLDDLParserService();
mySQLDDLParserService.parseSql(DDL, "", clickHouseQuery);
ClickHouseAlterTable cat = new ClickHouseAlterTable();
DBCredentials dbCredentials = parseDBConfiguration(config);
BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
dbCredentials.getDatabase(), dbCredentials.getUserName(),
dbCredentials.getPassword(), config);
long currentTime = System.currentTimeMillis();
boolean ddlProcessingResult = true;
Metrics.updateDdlMetrics(DDL, currentTime, 0, ddlProcessingResult);
try {
String formattedQuery = clickHouseQuery.toString().replaceAll(",$", "");
if (formattedQuery != null && formattedQuery.isEmpty() == false) {
if (formattedQuery.contains("\n")) {
String[] queries = formattedQuery.split("\n");
for (String query : queries) {
if (query != null && query.isEmpty() == false) {
log.info("ClickHouse DDL: " + query);
cat.runQuery(query, writer.getConnection());
}
}
} else {
log.info("ClickHouse DDL: " + formattedQuery);
cat.runQuery(formattedQuery, writer.getConnection());
}
} else {
log.error("DDL translation failed: " + DDL);
}
} catch (Exception e) {
log.error("Error running DDL Query: " + e);
ddlProcessingResult = false;
//throw new RuntimeException(e);
}
long elapsedTime = System.currentTimeMillis() - currentTime;
Metrics.updateDdlMetrics(DDL, currentTime, elapsedTime, ddlProcessingResult);
}
/**
* Function to process every change event record
* as received from Debezium
@ -86,45 +131,14 @@ public class DebeziumChangeEventCapture {
if (DDL != null && DDL.isEmpty() == false)
//&& ((DDL.toUpperCase().contains("ALTER TABLE") || DDL.toUpperCase().contains("RENAME TABLE"))))
{
StringBuffer clickHouseQuery = new StringBuffer();
MySQLDDLParserService mySQLDDLParserService = new MySQLDDLParserService();
mySQLDDLParserService.parseSql(DDL, "", clickHouseQuery);
ClickHouseAlterTable cat = new ClickHouseAlterTable();
log.info("***** DDL received, Flush all existing records");
this.executor.shutdown();
this.executor.awaitTermination(60, TimeUnit.SECONDS);
DBCredentials dbCredentials = parseDBConfiguration(config);
BaseDbWriter writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
dbCredentials.getDatabase(), dbCredentials.getUserName(),
dbCredentials.getPassword(), config);
long currentTime = System.currentTimeMillis();
boolean ddlProcessingResult = true;
Metrics.updateDdlMetrics(DDL, currentTime, 0, ddlProcessingResult);
try {
String formattedQuery = clickHouseQuery.toString().replaceAll(",$", "");
if (formattedQuery != null && formattedQuery.isEmpty() == false) {
if (formattedQuery.contains("\n")) {
String[] queries = formattedQuery.split("\n");
for (String query : queries) {
if (query != null && query.isEmpty() == false) {
log.info("ClickHouse DDL: " + query);
cat.runQuery(query, writer.getConnection());
}
}
} else {
log.info("ClickHouse DDL: " + formattedQuery);
cat.runQuery(formattedQuery, writer.getConnection());
}
} else {
log.error("DDL translation failed: " + DDL);
}
} catch (Exception e) {
log.error("Error running DDL Query: " + e);
ddlProcessingResult = false;
//throw new RuntimeException(e);
}
long elapsedTime = System.currentTimeMillis() - currentTime;
Metrics.updateDdlMetrics(DDL, currentTime, elapsedTime, ddlProcessingResult);
performDDLOperation(DDL, config);
this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()));
this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);
}
} else {
@ -168,7 +182,8 @@ public class DebeziumChangeEventCapture {
@Override
public void handle(boolean b, String s, Throwable throwable) {
if (b == false) {
log.error("Error starting connector" + throwable.toString());
log.error("Error starting connector" + throwable);
log.error("Retrying - try number:" + numRetries);
if (numRetries++ <= MAX_RETRIES) {
try {
@ -230,9 +245,9 @@ public class DebeziumChangeEventCapture {
private void setupProcessingThread(ClickHouseSinkConnectorConfig config, DDLParserService ddlParserService) {
// Setup separate thread to read messages from shared buffer.
this.records = new ConcurrentHashMap<>();
ClickHouseBatchRunnable runnable = new ClickHouseBatchRunnable(this.records, config, new HashMap());
this.runnable = new ClickHouseBatchRunnable(this.records, config, new HashMap());
this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()));
this.executor.scheduleAtFixedRate(runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);
this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);
}
/**

View File

@ -12,11 +12,11 @@ docker exec -it mysql-master mysql -uroot -proot -e "GRANT ALL PRIVILEGES ON sbt
####
#for sysbench_test in bulk_insert oltp_insert oltp_delete oltp_update_index oltp_update_non_index
for sysbench_test in oltp_update_index
#for sysbench_test in oltp_insert_truncate
#for sysbench_test in oltp_insert
for sysbench_test in oltp_insert_truncate
do
echo "*** Setup connectors"
deploy/./configure_sysbench.sh
../../deploy/./configure_sysbench.sh
echo "*** Running Sysbench tests ****"
./run_sysbench_tests.sh -t $sysbench_test
result=$(./compare_mysql_ch.sh $sysbench_test)

View File

@ -24,7 +24,7 @@ supported_test_names+=('oltp_update_non_index')
supported_test_names+=('oltp_insert_truncate')
### Sysbench configuration
num_threads=16
num_threads=50
time=500 # IN Seconds
mysql_host=127.0.0.1