Added support for rename multiple tables and ALTER table old_name to new_name, Added integration tests for Postgres.

This commit is contained in:
Kanthi Subramanian 2023-04-18 16:18:14 -04:00
parent 95fba26249
commit a943d5a311
23 changed files with 1030 additions and 233 deletions

View File

@ -125,22 +125,19 @@ cd helm
helm install clickhouse-debezium-embedded . helm install clickhouse-debezium-embedded .
``` ```
### Getting started (Docker-compose) ### Getting started
**Start application**
```
cd docker
docker-compose up
```
### Building from sources
Build the JAR file Build the JAR file
`mvn clean install` `mvn clean install`
Build docker images and start docker compose
`cd docker` \
`docker-compose build`
**MySQL need to be started before starting the application**\
**Start MySQL docker** \
`docker-compose -f docker-compose-mysql.yml up`
**Start application**\
`docker-compose up`
#### DDL Support: #### DDL Support:
@ -165,6 +162,11 @@ ALTER TABLE
| ALTER COLUMN col_name ADD DEFAULT | Not supported by grammar | | ALTER COLUMN col_name ADD DEFAULT | Not supported by grammar |
| ALTER COLUMN col_name ADD DROP DEFAULT | Not supported by grammar | | ALTER COLUMN col_name ADD DROP DEFAULT | Not supported by grammar |
| ADD PRIMARY KEY | Cannot modify primary key in CH | | ADD PRIMARY KEY | Cannot modify primary key in CH |
### Not supported:
| MySQL | ClickHouse |
|--------------------------------------------------------|-----------------------------------------------------------------|
| ADD INDEX | Secondary indexes in CH, what about type and index granularity? | | ADD INDEX | Secondary indexes in CH, what about type and index granularity? |
| ADD CONSTRAINT (CHECK) | | | ADD CONSTRAINT (CHECK) | |
| ADD CONSTRAINT | Add constraint with Primary key(Not supported) | | ADD CONSTRAINT | Add constraint with Primary key(Not supported) |
@ -174,12 +176,12 @@ ALTER TABLE
## TABLE operations ## TABLE operations
| MySQL | ClickHouse | | MySQL | ClickHouse |
|----------------------|------------| |------------------------------------------|-------------------------------------|
| RENAME TABLE name_1 to name_2 | | | RENAME TABLE name_1 to name_2 | |
| TRUNCATE TABLE | | | TRUNCATE TABLE | |
| DROP TABLE name_1 | | | DROP TABLE name_1 | |
| DROP TABLE name_1, name_2 | | | DROP TABLE name_1, name_2 | |
| | | | ALTER TABLE table_name to new_table_name | RENAME table_name to new_table_name |
## DATABASE operations ## DATABASE operations
| MySQL | ClickHouse | | MySQL | ClickHouse |

View File

@ -27,7 +27,7 @@
<!-- Users and ACL. --> <!-- Users and ACL. -->
<users> <users>
<!-- If user name was not specified, 'default' user is used. --> <!-- If user name was not specified, 'default' user is used. -->
<ch_user> <default>
<!-- See also the files in users.d directory where the password can be overridden. <!-- See also the files in users.d directory where the password can be overridden.
Password could be specified in plaintext or in SHA256 (in hex format). Password could be specified in plaintext or in SHA256 (in hex format).
@ -63,7 +63,7 @@
Execute: PASSWORD=$(base64 < /dev/urandom | head -c8); echo "$PASSWORD"; echo -n "$PASSWORD" | sha1sum | tr -d '-' | xxd -r -p | sha1sum | tr -d '-' Execute: PASSWORD=$(base64 < /dev/urandom | head -c8); echo "$PASSWORD"; echo -n "$PASSWORD" | sha1sum | tr -d '-' | xxd -r -p | sha1sum | tr -d '-'
In first line will be password and in second - corresponding double SHA1. In first line will be password and in second - corresponding double SHA1.
--> -->
<password>root</password> <!-- <password>root</password> -->
<!-- List of networks with open access. <!-- List of networks with open access.
@ -97,7 +97,7 @@
<!-- User can create other users and grant rights to them. --> <!-- User can create other users and grant rights to them. -->
<!-- <access_management>1</access_management> --> <!-- <access_management>1</access_management> -->
</ch_user> </default>
</users> </users>
<!-- Quotas. --> <!-- Quotas. -->

View File

@ -888,7 +888,7 @@
"options": { "options": {
"barRadius": 0, "barRadius": 0,
"barWidth": 0.97, "barWidth": 0.97,
"fullHighlight": false, "fullHighlight": true,
"groupWidth": 0.7, "groupWidth": 0.7,
"legend": { "legend": {
"calcs": [], "calcs": [],
@ -897,14 +897,15 @@
"showLegend": true "showLegend": true
}, },
"orientation": "auto", "orientation": "auto",
"showValue": "auto", "showValue": "always",
"stacking": "none", "stacking": "none",
"tooltip": { "tooltip": {
"mode": "single", "mode": "multi",
"sort": "none" "sort": "none"
}, },
"xField": "Time",
"xTickLabelRotation": 0, "xTickLabelRotation": 0,
"xTickLabelSpacing": 0 "xTickLabelSpacing": -200
}, },
"pluginVersion": "9.4.7", "pluginVersion": "9.4.7",
"targets": [ "targets": [
@ -915,18 +916,18 @@
}, },
"dateTimeType": "DATETIME", "dateTimeType": "DATETIME",
"extrapolate": true, "extrapolate": true,
"format": "table", "format": "time_series",
"formattedQuery": "SELECT $timeSeries as t, count() FROM $table WHERE $timeFilter GROUP BY t ORDER BY t", "formattedQuery": "SELECT $timeSeries as t, count() FROM $table WHERE $timeFilter GROUP BY t ORDER BY t",
"hide": false, "hide": false,
"intervalFactor": 1, "intervalFactor": 1,
"query": " select sum(tmp.rows), tmp.event_time from ( select rows,event_time ,event_type from system.part_log pl where event_type='NewPart' order by event_time desc) tmp where tmp.event_time >= toDateTime($from) and tmp.event_time <= toDateTime($to) group by tmp.event_time", "query": "SELECT toStartOfInterval(event_time, toIntervalSecond(1)) AS _time_dec,\nsum(rows) as rows, table\nFROM system.part_log\nWHERE (event_type = 'NewPart') AND event_time >= toDateTime($from) and event_time < toDateTime($to)\nGROUP BY _time_dec, table",
"rawQuery": "select sum(tmp.rows), tmp.event_time from ( select rows,event_time ,event_type from system.part_log pl where event_type='NewPart' order by event_time desc) tmp where tmp.event_time >= toDateTime(1681141796) and tmp.event_time <= toDateTime(1681163396) group by tmp.event_time", "rawQuery": "SELECT toStartOfInterval(event_time, toIntervalSecond(1)) AS _time_dec,\nsum(rows) as rows, table\nFROM system.part_log\nWHERE (event_type = 'NewPart') AND event_time >= toDateTime(1681314113) and event_time < toDateTime(1681335713)\nGROUP BY _time_dec, table",
"refId": "A", "refId": "A",
"round": "0s", "round": "0s",
"skip_comments": true "skip_comments": true
} }
], ],
"title": "New Parts rate", "title": "Rows inserted ",
"type": "barchart" "type": "barchart"
}, },
{ {
@ -1332,8 +1333,7 @@
"mode": "absolute", "mode": "absolute",
"steps": [ "steps": [
{ {
"color": "green", "color": "green"
"value": null
}, },
{ {
"color": "red", "color": "red",
@ -1423,8 +1423,7 @@
"mode": "absolute", "mode": "absolute",
"steps": [ "steps": [
{ {
"color": "green", "color": "green"
"value": null
}, },
{ {
"color": "red", "color": "red",
@ -1783,7 +1782,7 @@
"type": "barchart" "type": "barchart"
} }
], ],
"refresh": "", "refresh": false,
"revision": 1, "revision": 1,
"schemaVersion": 38, "schemaVersion": 38,
"style": "dark", "style": "dark",
@ -1792,7 +1791,7 @@
"list": [] "list": []
}, },
"time": { "time": {
"from": "now-6h", "from": "now-15m",
"to": "now" "to": "now"
}, },
"timepicker": {}, "timepicker": {},

View File

@ -0,0 +1,79 @@
version: "3.4"
# Ubuntu , set this for redpanda to start
# https://sort.veritas.com/public/documents/HSO/2.0/linux/productguides/html/hfo_admin_ubuntu/ch04s03.htm
# Clickhouse Table Schema
# create table test(id int, message String) ENGINE=MergeTree() PRIMARY KEY id;
services:
postgres:
image: debezium/postgres:15-alpine
restart: always
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=root
- POSTGRES_USER=root
- POSTGRES_DB=public
volumes:
- ../sql/init_postgres.sql:/docker-entrypoint-initdb.d/init_postgres.sql
command:
- "postgres"
- "-c"
- "wal_level=logical"
clickhouse:
# clickhouse-client --host=127.0.0.1 --port=9000 --user=root --password=root --database=test
container_name: clickhouse
image: clickhouse/clickhouse-server:latest
restart: "no"
ports:
- "8123:8123"
- "9000:9000"
environment:
- CLICKHOUSE_USER=root
- CLICKHOUSE_PASSWORD=root
- CLICKHOUSE_DB=test
- CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=0
ulimits:
nofile:
soft: "262144"
hard: "262144"
volumes:
#- ../sql/init_ch.sql:/docker-entrypoint-initdb.d/init_clickhouse.sql
- ../clickhouse/users.xml:/etc/clickhouse-server/users.xml
depends_on:
zookeeper:
condition: service_healthy
zookeeper:
image: zookeeper:3.6.2
expose:
- "2181"
environment:
ZOO_TICK_TIME: 500
ZOO_MY_ID: 1
healthcheck:
test: echo stat | nc localhost 2181
interval: 3s
timeout: 2s
retries: 5
start_period: 2s
security_opt:
- label:disable
debezium-embedded:
image: registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:latest
# build:
# context: ../
restart: "no"
ports:
- "8083:8083"
# depends_on:
# - mysql-master
env_file:
- docker_postgres.env
extra_hosts:
- "host.docker.internal:host-gateway"

View File

@ -7,6 +7,8 @@ version: "3.4"
# create table test(id int, message String) ENGINE=MergeTree() PRIMARY KEY id; # create table test(id int, message String) ENGINE=MergeTree() PRIMARY KEY id;
services: services:
mysql-master: mysql-master:
container_name: mysql-master container_name: mysql-master
image: docker.io/bitnami/mysql:latest image: docker.io/bitnami/mysql:latest
@ -37,7 +39,7 @@ services:
- "8123:8123" - "8123:8123"
- "9000:9000" - "9000:9000"
environment: environment:
- CLICKHOUSE_USER=ch_user - CLICKHOUSE_USER=root
- CLICKHOUSE_PASSWORD=root - CLICKHOUSE_PASSWORD=root
- CLICKHOUSE_DB=test - CLICKHOUSE_DB=test
- CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=0 - CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=0
@ -51,7 +53,7 @@ services:
depends_on: depends_on:
zookeeper: zookeeper:
condition: service_healthy condition: service_healthy
#
zookeeper: zookeeper:
image: zookeeper:3.6.2 image: zookeeper:3.6.2
expose: expose:
@ -83,7 +85,7 @@ services:
- "host.docker.internal:host-gateway" - "host.docker.internal:host-gateway"
#### MONITORING #### ### MONITORING ####
prometheus: prometheus:
container_name: prometheus container_name: prometheus
image: bitnami/prometheus:2.36.0 image: bitnami/prometheus:2.36.0
@ -112,4 +114,4 @@ services:
- GF_INSTALL_PLUGINS=vertamedia-clickhouse-datasource,grafana-clickhouse-datasource - GF_INSTALL_PLUGINS=vertamedia-clickhouse-datasource,grafana-clickhouse-datasource
depends_on: depends_on:
- prometheus - prometheus
### END OF MONITORING ### ## END OF MONITORING ###

View File

@ -12,7 +12,7 @@ export auto.create.tables=true
export replacingmergetree.delete.column="_sign" export replacingmergetree.delete.column="_sign"
export metrics.port=8083 export metrics.port=8083
export clickhouse.server.url="clickhouse" export clickhouse.server.url="clickhouse"
export clickhouse.server.user="ch_user" export clickhouse.server.user="root"
export clickhouse.server.pass="root" export clickhouse.server.pass="root"
export clickhouse.server.port="8123" export clickhouse.server.port="8123"
export thread.pool.size="10" export thread.pool.size="10"

View File

@ -2,15 +2,20 @@ export database.hostname="postgres"
export database.port="5432" export database.port="5432"
export database.user="root" export database.user="root"
export database.password="root" export database.password="root"
export snapshot.mode="schema_only" export snapshot.mode="initial"
export database.dbname="public"
export connector.class="io.debezium.connector.postgresql.PostgresConnector" export connector.class="io.debezium.connector.postgresql.PostgresConnector"
export plugin.name="pgoutput" export plugin.name="decoderbufs"
export table.include.list="public.tm" export table.include.list="public.tm"
export clickhouse.server.database="test"
export auto.create.tables=true export auto.create.tables=true
export replacingmergetree.delete.column="_sign" export replacingmergetree.delete.column="_sign"
export metrics.port=8083 export metrics.port=8087
export clickhouse.server.url="clickhouse" export clickhouse.server.url="clickhouse"
export clickhouse.server.user="root" export clickhouse.server.user="root"
export clickhouse.server.pass="root" export clickhouse.server.pass="root"
export clickhouse.server.port="8123" export clickhouse.server.port="8123"
export clickhouse.server.database="test"
export offset.storage="org.apache.kafka.connect.storage.FileOffsetBackingStore"
export offset.storage.file.filename="/tmp/offsets.dat"
export offset.flush.interval.ms="60000"
export disable.ddl="false"

View File

@ -95,7 +95,11 @@
<groupId>io.debezium</groupId> <groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId> <artifactId>debezium-connector-postgres</artifactId>
<version>${version.debezium}</version> <version>${version.debezium}</version>
<scope>test</scope> </dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.4</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.debezium</groupId> <groupId>io.debezium</groupId>
@ -187,12 +191,7 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.postgresql/postgresql --> <!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.3</version>
<scope>test</scope>
</dependency>
<!-- <dependency> <!-- <dependency>
@ -367,12 +366,7 @@
<scope>test</scope> <scope>test</scope>
</dependency> --> </dependency> -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.apache.commons</groupId>
@ -522,6 +516,21 @@
</extension> </extension>
</extensions> </extensions>
<plugins> <plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<!-- <includeArtifactIds>io.debezium.ddl.parser</includeArtifactIds> -->
</configuration>
</execution>
</executions>
</plugin>
<plugin> <plugin>
<groupId>org.codehaus.mojo</groupId> <groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId> <artifactId>build-helper-maven-plugin</artifactId>
@ -540,6 +549,30 @@
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<!-- <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin> -->
<!-- <plugin> <!-- <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId> <artifactId>maven-install-plugin</artifactId>
@ -563,6 +596,37 @@
</execution> </execution>
</executions> </executions>
</plugin> --> </plugin> -->
<!-- <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>true</overWriteSnapshots>
</configuration>
</execution>
</executions>
</plugin> -->
<!-- <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication</mainClass>
</manifest>
</archive>
</configuration>
</plugin> -->
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId> <artifactId>maven-shade-plugin</artifactId>
@ -577,6 +641,12 @@
<transformers> <transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication</mainClass> <mainClass>com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication</mainClass>
<!-- <manifestEntries>
<mainClass>com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication</mainClass>
<Class-Path>lib/*.jar</Class-Path>
</manifestEntries> -->
</transformer> </transformer>
</transformers> </transformers>
<artifactSet> <artifactSet>

View File

@ -0,0 +1,69 @@
CREATE TABLE "tm" (
id uuid NOT NULL PRIMARY KEY,
secid uuid,
acc_id uuid,
ccatz character varying,
tcred boolean DEFAULT false ,
am numeric(21,5) ,
set_date timestamp with time zone ,
created timestamp with time zone,
updated timestamp with time zone,
events_id uuid,
events_transaction_id uuid ,
events_status character varying,
events_payment_snapshot jsonb,
events_created timestamp with time zone,
vid uuid,
vtid uuid ,
vstatus character varying ,
vamount numeric(21,5) ,
vcreated timestamp with time zone,
vbilling_currency character varying
);
INSERT INTO public.tm VALUES (
'9cb52b2a-8ef2-4987-8856-c79a1b2c2f71',
'9cb52b2a-8ef2-4987-8856-c79a1b2c2f72',
'9cb52b2a-8ef2-4987-8856-c79a1b2c2f72',
'IDR',
't',
200000.00000,
'2022-10-16 16:53:15.01957',
'2022-10-16 16:53:15.01957',
'2022-10-16 16:53:15.01957',
'b4763f4a-2e3f-41ae-9715-4ab113e2f53c',
'9cb52b2a-8ef2-4987-8856-c79a1b2c2f72',
NULL,
'{"Hello":"World"}',
'2022-10-16 16:53:15.01957',
NULL,
NULL,
NULL,
NULL,
NULL,
NULL
);
insert into public.tm values(
'9cb52b2a-8ef2-4987-8856-c79a1b2c2f73',
'9cb52b2a-8ef2-4987-8856-c79a1b2c2f72',
'9cb52b2a-8ef2-4987-8856-c79a1b2c2f72',
'IDR',
't',
200000.00000,
'2022-10-16 16:53:15.01957',
'2022-10-16 16:53:15.01957',
'2022-10-16 16:53:15.01957',
'b4763f4a-2e3f-41ae-9715-4ab113e2f53c',
'9cb52b2a-8ef2-4987-8856-c79a1b2c2f72',
NULL,
NULL,
'2022-10-16 16:53:15.01957',
'9cb52b2a-8ef2-4987-8856-c79a1b2c2f71',
'9cb52b2a-8ef2-4987-8856-c79a1b2c2f72',
'COMPLETED',
200000.00000,
'2022-10-16 16:53:15.01957',
'IDR'
);

View File

@ -13,6 +13,9 @@ import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchExecutor;
import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable; import com.altinity.clickhouse.sink.connector.executor.ClickHouseBatchRunnable;
import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct; import com.altinity.clickhouse.sink.connector.model.ClickHouseStruct;
import com.altinity.clickhouse.sink.connector.model.DBCredentials; import com.altinity.clickhouse.sink.connector.model.DBCredentials;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.embedded.Connect; import io.debezium.embedded.Connect;
import io.debezium.engine.ChangeEvent; import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine; import io.debezium.engine.DebeziumEngine;
@ -23,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
@ -165,7 +169,7 @@ public class DebeziumChangeEventCapture {
} }
} }
public static int MAX_RETRIES = 5; public static int MAX_RETRIES = 25;
public static int SLEEP_TIME = 10000; public static int SLEEP_TIME = 10000;
public int numRetries = 0; public int numRetries = 0;
@ -186,7 +190,7 @@ public class DebeziumChangeEventCapture {
public void handle(boolean b, String s, Throwable throwable) { public void handle(boolean b, String s, Throwable throwable) {
if (b == false) { if (b == false) {
log.error("Error starting connector" + throwable); log.error("Error starting connector" + throwable + " Message:" + s);
log.error("Retrying - try number:" + numRetries); log.error("Retrying - try number:" + numRetries);
if (numRetries++ <= MAX_RETRIES) { if (numRetries++ <= MAX_RETRIES) {
try { try {
@ -225,8 +229,9 @@ public class DebeziumChangeEventCapture {
public void setup(Properties props, DebeziumRecordParserService debeziumRecordParserService, public void setup(Properties props, DebeziumRecordParserService debeziumRecordParserService,
DDLParserService ddlParserService) throws IOException { DDLParserService ddlParserService) throws IOException {
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(props));
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(props));
trySomething(Configuration.from(props));
Metrics.initialize(props.getProperty(ClickHouseSinkConnectorConfigVariables.ENABLE_METRICS.toString()), Metrics.initialize(props.getProperty(ClickHouseSinkConnectorConfigVariables.ENABLE_METRICS.toString()),
props.getProperty(ClickHouseSinkConnectorConfigVariables.METRICS_ENDPOINT_PORT.toString())); props.getProperty(ClickHouseSinkConnectorConfigVariables.METRICS_ENDPOINT_PORT.toString()));
@ -286,4 +291,21 @@ public class DebeziumChangeEventCapture {
this.records.put(topicName, structs); this.records.put(topicName, structs);
} }
} }
private void trySomething(Configuration config){
PostgresConnectorConfig postgresConfig = new PostgresConnectorConfig(config);
// ConfigValue hostnameValue = (ConfigValue) configValues.get(RelationalDatabaseConnectorConfig.HOSTNAME.name());
PostgresConnection connection = new PostgresConnection(postgresConfig.getJdbcConfig(), "Debezium Validate Connection");
try {
try {
connection.connection(false);
connection.execute(new String[]{"SELECT version()"});
} catch (SQLException e) {
throw new RuntimeException(e);
}
} catch(Exception e) {
}
}
} }

View File

@ -3,6 +3,8 @@ package com.altinity.clickhouse.debezium.embedded.ddl.parser;
public class Constants { public class Constants {
public static final String ALTER_TABLE = "ALTER TABLE %s"; public static final String ALTER_TABLE = "ALTER TABLE %s";
public static final String ALTER_RENAME_TABLE = "RENAME TABLE %s TO %s";
public static final String CREATE_TABLE = "CREATE TABLE"; public static final String CREATE_TABLE = "CREATE TABLE";
public static final String NULLABLE = "Nullable"; public static final String NULLABLE = "Nullable";
@ -29,7 +31,7 @@ public class Constants {
public static final String IF_EXISTS = "if exists "; public static final String IF_EXISTS = "if exists ";
public static final String IF_NOT_EXISTS = "if not exists "; public static final String IF_NOT_EXISTS = "if not exists ";
public static final String RENAME_TABLE = "RENAME TABLE %s to %s"; public static final String RENAME_TABLE = "RENAME TABLE";
public static final String TRUNCATE_TABLE = "TRUNCATE TABLE %s"; public static final String TRUNCATE_TABLE = "TRUNCATE TABLE %s";
public static final String DROP_TABLE = "DROP TABLE"; public static final String DROP_TABLE = "DROP TABLE";

View File

@ -788,6 +788,16 @@ public class MySqlDDLParserListenerImpl implements MySqlParserListener {
} }
// @Override
// public void enterClusteringKeyColumnConstraint(MySqlParser.ClusteringKeyColumnConstraintContext clusteringKeyColumnConstraintContext) {
//
// }
//
// @Override
// public void exitClusteringKeyColumnConstraint(MySqlParser.ClusteringKeyColumnConstraintContext clusteringKeyColumnConstraintContext) {
//
// }
@Override @Override
public void enterUniqueKeyColumnConstraint(MySqlParser.UniqueKeyColumnConstraintContext uniqueKeyColumnConstraintContext) { public void enterUniqueKeyColumnConstraint(MySqlParser.UniqueKeyColumnConstraintContext uniqueKeyColumnConstraintContext) {
@ -918,6 +928,16 @@ public class MySqlDDLParserListenerImpl implements MySqlParserListener {
} }
// @Override
// public void enterClusteringKeyTableConstraint(MySqlParser.ClusteringKeyTableConstraintContext clusteringKeyTableConstraintContext) {
//
// }
//
// @Override
// public void exitClusteringKeyTableConstraint(MySqlParser.ClusteringKeyTableConstraintContext clusteringKeyTableConstraintContext) {
//
// }
@Override @Override
public void enterReferenceDefinition(MySqlParser.ReferenceDefinitionContext referenceDefinitionContext) { public void enterReferenceDefinition(MySqlParser.ReferenceDefinitionContext referenceDefinitionContext) {
@ -1830,10 +1850,23 @@ public class MySqlDDLParserListenerImpl implements MySqlParserListener {
if (((TerminalNodeImpl) tree).symbol.getType() == MySqlParser.COMMA) { if (((TerminalNodeImpl) tree).symbol.getType() == MySqlParser.COMMA) {
this.query.append(","); this.query.append(",");
} }
} else if(tree instanceof MySqlParser.AlterByRenameContext) {
parseAlterTableByRename(tableName, (MySqlParser.AlterByRenameContext) tree);
} }
} }
} }
private void parseAlterTableByRename(String originalTableName, MySqlParser.AlterByRenameContext tree) {
String newTableName = null;
for(ParseTree alterByRenameChildren: tree.children) {
if(alterByRenameChildren instanceof MySqlParser.UidContext) {
newTableName = alterByRenameChildren.getText();
}
}
this.query.delete(0, this.query.toString().length()).append(String.format(Constants.ALTER_RENAME_TABLE, originalTableName, newTableName));
}
@Override @Override
public void exitAlterTable(MySqlParser.AlterTableContext alterTableContext) { public void exitAlterTable(MySqlParser.AlterTableContext alterTableContext) {
} }
@ -1970,6 +2003,16 @@ public class MySqlDDLParserListenerImpl implements MySqlParserListener {
log.info("Exit check table constraint"); log.info("Exit check table constraint");
} }
// @Override
// public void enterAlterByAlterCheckTableConstraint(MySqlParser.AlterByAlterCheckTableConstraintContext alterByAlterCheckTableConstraintContext) {
//
// }
//
// @Override
// public void exitAlterByAlterCheckTableConstraint(MySqlParser.AlterByAlterCheckTableConstraintContext alterByAlterCheckTableConstraintContext) {
//
// }
@Override @Override
public void enterAlterBySetAlgorithm(MySqlParser.AlterBySetAlgorithmContext alterBySetAlgorithmContext) { public void enterAlterBySetAlgorithm(MySqlParser.AlterBySetAlgorithmContext alterBySetAlgorithmContext) {
@ -2086,6 +2129,16 @@ public class MySqlDDLParserListenerImpl implements MySqlParserListener {
} }
// @Override
// public void enterAlterByAlterColumnDefault(MySqlParser.AlterByAlterColumnDefaultContext alterByAlterColumnDefaultContext) {
//
// }
//
// @Override
// public void exitAlterByAlterColumnDefault(MySqlParser.AlterByAlterColumnDefaultContext alterByAlterColumnDefaultContext) {
//
// }
@Override @Override
public void enterAlterByAlterIndexVisibility(MySqlParser.AlterByAlterIndexVisibilityContext alterByAlterIndexVisibilityContext) { public void enterAlterByAlterIndexVisibility(MySqlParser.AlterByAlterIndexVisibilityContext alterByAlterIndexVisibilityContext) {
@ -2524,6 +2577,7 @@ public class MySqlDDLParserListenerImpl implements MySqlParserListener {
@Override @Override
public void enterRenameTable(MySqlParser.RenameTableContext renameTableContext) { public void enterRenameTable(MySqlParser.RenameTableContext renameTableContext) {
log.info("Rename table enter"); log.info("Rename table enter");
this.query.append(Constants.RENAME_TABLE).append(" ");
String originalTableName = null; String originalTableName = null;
String newTableName = null; String newTableName = null;
for (ParseTree child : renameTableContext.children) { for (ParseTree child : renameTableContext.children) {
@ -2533,14 +2587,19 @@ public class MySqlDDLParserListenerImpl implements MySqlParserListener {
if (renameTableContextChildren.size() >= 3) { if (renameTableContextChildren.size() >= 3) {
originalTableName = renameTableContextChildren.get(0).getText(); originalTableName = renameTableContextChildren.get(0).getText();
newTableName = renameTableContextChildren.get(2).getText(); newTableName = renameTableContextChildren.get(2).getText();
this.query.append(originalTableName).append(" to ").append(newTableName);
}
} else if(child instanceof TerminalNodeImpl) {
if (((TerminalNodeImpl) child).symbol.getType() == MySqlParser.COMMA) {
this.query.append(",");
} }
} }
} }
//
if (originalTableName != null && originalTableName.isEmpty() == false && newTableName != null && // if (originalTableName != null && originalTableName.isEmpty() == false && newTableName != null &&
newTableName.isEmpty() == false) { // newTableName.isEmpty() == false) {
this.query.append(String.format(Constants.RENAME_TABLE, originalTableName, newTableName)); // this.query.append(String.format(Constants.RENAME_TABLE, originalTableName, newTableName));
} // }
} }
@Override @Override
@ -5473,6 +5532,16 @@ public class MySqlDDLParserListenerImpl implements MySqlParserListener {
} }
// @Override
// public void enterUuidDataType(MySqlParser.UuidDataTypeContext uuidDataTypeContext) {
//
// }
//
// @Override
// public void exitUuidDataType(MySqlParser.UuidDataTypeContext uuidDataTypeContext) {
//
// }
@Override @Override
public void enterCollectionOptions(MySqlParser.CollectionOptionsContext collectionOptionsContext) { public void enterCollectionOptions(MySqlParser.CollectionOptionsContext collectionOptionsContext) {

View File

@ -1,167 +1,167 @@
//package com.altinity.clickhouse.debezium.embedded; package com.altinity.clickhouse.debezium.embedded;
//
//import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture; import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
//import com.altinity.clickhouse.debezium.embedded.config.EnvironmentConfigurationService; import com.altinity.clickhouse.debezium.embedded.config.EnvironmentConfigurationService;
//import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService; import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
//import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService; import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
//import com.mongodb.MongoException; import com.mongodb.MongoException;
//import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClient;
//import com.mongodb.client.MongoClients; import com.mongodb.client.MongoClients;
//import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCollection;
//import com.mongodb.client.MongoDatabase; import com.mongodb.client.MongoDatabase;
//import com.mongodb.client.result.InsertOneResult; import com.mongodb.client.result.InsertOneResult;
//import org.bson.Document; import org.bson.Document;
//import org.bson.types.ObjectId; import org.bson.types.ObjectId;
//import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Disabled;
//import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
//import org.testcontainers.containers.ClickHouseContainer; import org.testcontainers.containers.ClickHouseContainer;
//import org.testcontainers.containers.MongoDBContainer; import org.testcontainers.containers.MongoDBContainer;
//import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Container;
//import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.junit.jupiter.Testcontainers;
//import org.testcontainers.utility.MountableFile; import org.testcontainers.utility.MountableFile;
//
//import java.nio.file.Files; import java.nio.file.Files;
//import java.nio.file.Path; import java.nio.file.Path;
//import java.util.Properties; import java.util.Properties;
//import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
//import java.util.concurrent.Executors; import java.util.concurrent.Executors;
//
//@Testcontainers @Testcontainers
//public class ClickHouseDebeziumEmbeddedMongoIT { public class ClickHouseDebeziumEmbeddedMongoIT {
//
// @Container @Container
// public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:latest") public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:latest")
// .withInitScript("init_clickhouse.sql") .withInitScript("init_clickhouse.sql")
// .withExposedPorts(8123); .withExposedPorts(8123);
//
// //https://github.com/testcontainers/testcontainers-java/issues/3066 //https://github.com/testcontainers/testcontainers-java/issues/3066
// @Container @Container
// public static MongoDBContainer mongoContainer = new MongoDBContainer("mongo:latest") public static MongoDBContainer mongoContainer = new MongoDBContainer("mongo:latest")
//// .withEnv("MONGO_INITDB_ROOT_USERNAME", "project") // .withEnv("MONGO_INITDB_ROOT_USERNAME", "project")
//// .withEnv("MONGO_INITDB_ROOT_PASSWORD", "project") // .withEnv("MONGO_INITDB_ROOT_PASSWORD", "project")
// .withEnv("MONGO_INITDB_DATABASE", "project") .withEnv("MONGO_INITDB_DATABASE", "project")
// .withCopyFileToContainer(MountableFile.forClasspathResource("mongo-init.js"), .withCopyFileToContainer(MountableFile.forClasspathResource("mongo-init.js"),
// "/docker-entrypoint-initdb.d/mongo-init.js"); "/docker-entrypoint-initdb.d/mongo-init.js");
// // .waitingFor(Wait.forLogMessage("(?i).*waiting for connections.*", 2)) // .waitingFor(Wait.forLogMessage("(?i).*waiting for connections.*", 2))
// // .withStartupTimeout(Duration.ofSeconds(10)); // .withStartupTimeout(Duration.ofSeconds(10));
//
//// .withInitScript("init_postgres.sql") // .withInitScript("init_postgres.sql")
//// .withDatabaseName("public") // .withDatabaseName("public")
//// .withUsername("root") // .withUsername("root")
//// .withPassword("adminpass") // .withPassword("adminpass")
//// .withExposedPorts(5432) // .withExposedPorts(5432)
//// .withCommand("postgres -c wal_level=logical"); // .withCommand("postgres -c wal_level=logical");
//
// @Test @Test
// @Disabled @Disabled
// public void testDataTypesDB() throws Exception { public void testDataTypesDB() throws Exception {
// Start the debezium embedded application.
Properties defaultProps = (new EnvironmentConfigurationService()).parse();
System.out.println("MYSQL HOST" + mongoContainer.getHost());
System.out.println("Connection string" + mongoContainer.getConnectionString());
defaultProps.setProperty("mongodb.connection.string", mongoContainer.getConnectionString());
defaultProps.setProperty("mongodb.members.auto.discover", "true");
defaultProps.setProperty("topic.prefix", "mongo-ch");
defaultProps.setProperty("collection.include.list", "project.items");
defaultProps.setProperty("snapshot.include.collection.list", "project.items");
defaultProps.setProperty("database.include.list", "project");
defaultProps.setProperty("key.converter", "org.apache.kafka.connect.json.JsonConverter");
defaultProps.setProperty("value.converter", "org.apache.kafka.connect.storage.StringConverter");
defaultProps.setProperty("value.converter.schemas.enable", "true");
//defaultProps.setProperty("mongodb.hosts", mongoContainer.getHost() + ":" + mongoContainer.getFirstMappedPort());
// defaultProps.setProperty("topic.prefix", mongoContainer.getC());
//System.out.println("JDBC URL" + mySqlContainer.getJdbcUrl());
// defaultProps.setProperty("database.hostname", mongoContainer.getHost());
// defaultProps.setProperty("database.port", String.valueOf(mongoContainer.getFirstMappedPort()));
defaultProps.setProperty("database.dbname", "project");
defaultProps.setProperty("database.user", "project");
defaultProps.setProperty("database.password", "project");
// defaultProps.setProperty("database.include.list", "public");
defaultProps.setProperty("snapshot.mode", "initial");
defaultProps.setProperty("connector.class", "io.debezium.connector.mongodb.MongoDbConnector");
//defaultProps.setProperty("plugin.name", "pgoutput");
//defaultProps.setProperty("table.include.list", "public.tm");
defaultProps.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
//String tempOffsetPath = "/tmp/2/offsets" + System.currentTimeMillis() + ".dat";
Path tmpFilePath = Files.createTempFile("offsets", ".dat");
if (tmpFilePath != null) {
System.out.println("TEMP FILE PATH" + tmpFilePath);
}
Files.deleteIfExists(tmpFilePath);
defaultProps.setProperty("offset.storage.file.filename", tmpFilePath.toString());
defaultProps.setProperty("offset.flush.interval.ms", "60000");
defaultProps.setProperty("auto.create.tables", "true");
defaultProps.setProperty("clickhouse.server.url", clickHouseContainer.getHost());
defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort()));
defaultProps.setProperty("clickhouse.server.user", "default");
defaultProps.setProperty("clickhouse.server.pass", "");
defaultProps.setProperty("clickhouse.server.database", "project");
defaultProps.setProperty("replacingmergetree.delete.column", "_sign");
defaultProps.setProperty("metrics.port", "8087");
defaultProps.setProperty("database.allowPublicKeyRetrieval", "true");
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
new DebeziumChangeEventCapture().setup(defaultProps, new SourceRecordParserService(),
new MySQLDDLParserService());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
Thread.sleep(15000);
insertNewDocument();
Thread.sleep(60000);
// BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
// "public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);
// Map<String, String> tmColumns = writer.getColumnsDataTypesForTable("tm");
// Assert.assertTrue(tmColumns.size() == 22);
// Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID"));
// Assert.assertTrue(tmColumns.get("secid").equalsIgnoreCase("Nullable(UUID)"));
// //Assert.assertTrue(tmColumns.get("am").equalsIgnoreCase("Nullable(Decimal(21,5))"));
// Assert.assertTrue(tmColumns.get("created").equalsIgnoreCase("Nullable(DateTime64(6))"));
// //
// //
// // Start the debezium embedded application. // int tmCount = 0;
// // ResultSet chRs = writer.getConnection().prepareStatement("select count(*) from tm").executeQuery();
// Properties defaultProps = (new EnvironmentConfigurationService()).parse(); // while(chRs.next()) {
// System.out.println("MYSQL HOST" + mongoContainer.getHost()); // tmCount = chRs.getInt(1);
// System.out.println("Connection string" + mongoContainer.getConnectionString());
// defaultProps.setProperty("mongodb.connection.string", mongoContainer.getConnectionString());
// defaultProps.setProperty("mongodb.members.auto.discover", "true");
// defaultProps.setProperty("topic.prefix", "mongo-ch");
// defaultProps.setProperty("collection.include.list", "project.items");
// defaultProps.setProperty("snapshot.include.collection.list", "project.items");
// defaultProps.setProperty("database.include.list", "project");
// defaultProps.setProperty("key.converter", "org.apache.kafka.connect.json.JsonConverter");
//
// defaultProps.setProperty("value.converter", "org.apache.kafka.connect.storage.StringConverter");
// defaultProps.setProperty("value.converter.schemas.enable", "true");
//
// //defaultProps.setProperty("mongodb.hosts", mongoContainer.getHost() + ":" + mongoContainer.getFirstMappedPort());
// // defaultProps.setProperty("topic.prefix", mongoContainer.getC());
// //System.out.println("JDBC URL" + mySqlContainer.getJdbcUrl());
//// defaultProps.setProperty("database.hostname", mongoContainer.getHost());
//// defaultProps.setProperty("database.port", String.valueOf(mongoContainer.getFirstMappedPort()));
// defaultProps.setProperty("database.dbname", "project");
// defaultProps.setProperty("database.user", "project");
// defaultProps.setProperty("database.password", "project");
//
// // defaultProps.setProperty("database.include.list", "public");
// defaultProps.setProperty("snapshot.mode", "initial");
// defaultProps.setProperty("connector.class", "io.debezium.connector.mongodb.MongoDbConnector");
// //defaultProps.setProperty("plugin.name", "pgoutput");
// //defaultProps.setProperty("table.include.list", "public.tm");
//
//
// defaultProps.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
//
// //String tempOffsetPath = "/tmp/2/offsets" + System.currentTimeMillis() + ".dat";
// Path tmpFilePath = Files.createTempFile("offsets", ".dat");
//
// if (tmpFilePath != null) {
// System.out.println("TEMP FILE PATH" + tmpFilePath);
// }
//
// Files.deleteIfExists(tmpFilePath);
// defaultProps.setProperty("offset.storage.file.filename", tmpFilePath.toString());
// defaultProps.setProperty("offset.flush.interval.ms", "60000");
// defaultProps.setProperty("auto.create.tables", "true");
// defaultProps.setProperty("clickhouse.server.url", clickHouseContainer.getHost());
// defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort()));
// defaultProps.setProperty("clickhouse.server.user", "default");
// defaultProps.setProperty("clickhouse.server.pass", "");
// defaultProps.setProperty("clickhouse.server.database", "project");
// defaultProps.setProperty("replacingmergetree.delete.column", "_sign");
// defaultProps.setProperty("metrics.port", "8087");
// defaultProps.setProperty("database.allowPublicKeyRetrieval", "true");
//
// ExecutorService executorService = Executors.newFixedThreadPool(1);
// executorService.execute(() -> {
// try {
// new DebeziumChangeEventCapture().setup(defaultProps, new SourceRecordParserService(),
// new MySQLDDLParserService());
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// });
// Thread.sleep(15000);
//
// insertNewDocument();
// Thread.sleep(60000);
//
//// BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
//// "public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);
//// Map<String, String> tmColumns = writer.getColumnsDataTypesForTable("tm");
//// Assert.assertTrue(tmColumns.size() == 22);
//// Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID"));
//// Assert.assertTrue(tmColumns.get("secid").equalsIgnoreCase("Nullable(UUID)"));
//// //Assert.assertTrue(tmColumns.get("am").equalsIgnoreCase("Nullable(Decimal(21,5))"));
//// Assert.assertTrue(tmColumns.get("created").equalsIgnoreCase("Nullable(DateTime64(6))"));
////
////
//// int tmCount = 0;
//// ResultSet chRs = writer.getConnection().prepareStatement("select count(*) from tm").executeQuery();
//// while(chRs.next()) {
//// tmCount = chRs.getInt(1);
//// }
//
// // Assert.assertTrue(tmCount == 2);
//
// executorService.shutdown();
// Files.deleteIfExists(tmpFilePath);
//
//
// }
//
// private void insertNewDocument() {
// try (MongoClient mongoClient = MongoClients.create(mongoContainer.getConnectionString())) {
// MongoDatabase database = mongoClient.getDatabase("project");
// MongoCollection<Document> collection = database.getCollection("items");
// try {
// InsertOneResult result = collection.insertOne(new Document()
// .append("uuid", new ObjectId())
// .append("price", 44)
// .append("name", "Record one"));
// System.out.println("Success! Inserted document id: " + result.getInsertedId());
// } catch (MongoException me) {
// System.err.println("Unable to insert due to an error: " + me);
// }
// }
// }
// } // }
// Assert.assertTrue(tmCount == 2);
executorService.shutdown();
Files.deleteIfExists(tmpFilePath);
}
private void insertNewDocument() {
try (MongoClient mongoClient = MongoClients.create(mongoContainer.getConnectionString())) {
MongoDatabase database = mongoClient.getDatabase("project");
MongoCollection<Document> collection = database.getCollection("items");
try {
InsertOneResult result = collection.insertOne(new Document()
.append("uuid", new ObjectId())
.append("price", 44)
.append("name", "Record one"));
System.out.println("Success! Inserted document id: " + result.getInsertedId());
} catch (MongoException me) {
System.err.println("Unable to insert due to an error: " + me);
}
}
}
}

View File

@ -0,0 +1,119 @@
//package com.altinity.clickhouse.debezium.embedded;
//
//import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
//import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
//import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
//import org.apache.log4j.BasicConfigurator;
//import org.junit.Assert;
//import org.junit.jupiter.api.BeforeEach;
//import org.junit.jupiter.api.Test;
//import org.testcontainers.containers.ClickHouseContainer;
//import org.testcontainers.containers.MySQLContainer;
//import org.testcontainers.containers.PostgreSQLContainer;
//import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
//import org.testcontainers.junit.jupiter.Container;
//import org.testcontainers.utility.DockerImageName;
//
//import java.sql.Connection;
//import java.util.Map;
//import java.util.concurrent.ExecutorService;
//import java.util.concurrent.Executors;
//import java.util.concurrent.atomic.AtomicReference;
//
//public class ClickHouseDebeziumEmbeddedMySqlDockerIT {
//
// @Container
// public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:latest")
// .withInitScript("init_clickhouse.sql")
// .withExposedPorts(8123).withNetworkAliases("clickhouse").withAccessToHost(true);
//
// @Container
// public static PostgreSQLContainer postgreSQLContainer = new PostgreSQLContainer<>("postgres:latest")
// .withInitScript("init_postgres.sql")
// .withDatabaseName("public")
// .withUsername("root")
// .withPassword("root")
// .withExposedPorts(5432)
// .withCommand("postgres -c wal_level=logical")
// .withNetworkAliases("postgres").withAccessToHost(true);
//
//
// @BeforeEach
// public void startContainers() throws InterruptedException {
// mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
// .asCompatibleSubstituteFor("mysql"))
// .withDatabaseName("employees").withUsername("root").withPassword("adminpass")
// .withInitScript("alter_ddl_add_column.sql")
// .withExtraHost("mysql-server", "0.0.0.0")
// .waitingFor(new HttpWaitStrategy().forPort(3306));
//
// BasicConfigurator.configure();
// mySqlContainer.start();
// Thread.sleep(15000);
// }
//
// @Test
// public void testAddColumn() throws Exception {
//
// AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();
//
// ExecutorService executorService = Executors.newFixedThreadPool(1);
// executorService.execute(() -> {
// try {
// engine.set(new DebeziumChangeEventCapture());
// engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(),
// new MySQLDDLParserService());
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// });
//
// Thread.sleep(10000);
//
// Connection conn = connectToMySQL();
// // alter table ship_class change column class_name class_name_new int;
// // alter table ship_class change column tonange tonange_new decimal(10,10);
//
// conn.prepareStatement("alter table ship_class add column ship_spec varchar(150) first, add somecol int after start_build, algorithm=instant;").execute();
// conn.prepareStatement("alter table ship_class ADD newcol bool null DEFAULT 0;").execute();
// conn.prepareStatement("alter table ship_class add column customer_address varchar(100) not null, add column customer_name varchar(20) null;").execute();
// conn.prepareStatement("alter table add_test add column col8 varchar(255) first;").execute();
// conn.prepareStatement("alter table add_test add column col99 int default 1 after col8;").execute();
//
// conn.prepareStatement("alter table add_test modify column col99 tinyint;").execute();
// conn.prepareStatement("alter table add_test add column col22 varchar(255);").execute();
// conn.prepareStatement("alter table add_test add column col4 varchar(255);").execute();
// conn.prepareStatement("alter table add_test rename column col99 to col101;").execute();
// conn.prepareStatement(" alter table add_test drop column col101;").execute();
//
// Thread.sleep(15000);
//
//
// com.altinity.clickhouse.sink.connector.db.BaseDbWriter writer = new com.altinity.clickhouse.sink.connector.db.BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
// "employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);
//
// Map<String, String> shipClassColumns = writer.getColumnsDataTypesForTable("ship_class");
// Map<String, String> addTestColumns = writer.getColumnsDataTypesForTable("add_test");
//
// // Validate all ship_class columns.
// Assert.assertTrue(shipClassColumns.get("ship_spec").equalsIgnoreCase("Nullable(String)"));
// Assert.assertTrue(shipClassColumns.get("somecol").equalsIgnoreCase("Nullable(Int32)"));
// Assert.assertTrue(shipClassColumns.get("newcol").equalsIgnoreCase("Nullable(Bool)"));
// Assert.assertTrue(shipClassColumns.get("customer_address").equalsIgnoreCase("String"));
// Assert.assertTrue(shipClassColumns.get("customer_name").equalsIgnoreCase("Nullable(String)"));
//
// // Validate all add_test columns.
// Assert.assertTrue(addTestColumns.get("col8").equalsIgnoreCase("Nullable(String)"));
// Assert.assertTrue(addTestColumns.get("col2").equalsIgnoreCase("Nullable(Int32)"));
// Assert.assertTrue(addTestColumns.get("col3").equalsIgnoreCase("Nullable(Int32)"));
//
// if(engine.get() != null) {
// engine.get().stop();
// }
// // Files.deleteIfExists(tmpFilePath);
// executorService.shutdown();
//
//
//
// }
//}

View File

@ -0,0 +1,127 @@
package com.altinity.clickhouse.debezium.embedded;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.testcontainers.Testcontainers;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
public class ClickHouseDebeziumEmbeddedPostgresDecoderBufsDockerIT {
@Container
public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:latest")
.withInitScript("init_clickhouse.sql")
.withExposedPorts(8123).withNetworkAliases("clickhouse").withAccessToHost(true);
public static DockerImageName myImage = DockerImageName.parse("debezium/postgres:15-alpine").asCompatibleSubstituteFor("postgres");
@Container
public static PostgreSQLContainer postgreSQLContainer = (PostgreSQLContainer) new PostgreSQLContainer(myImage)
.withInitScript("init_postgres.sql")
.withDatabaseName("public")
.withUsername("root")
.withPassword("root")
.withExposedPorts(5432)
.withCommand("postgres -c wal_level=logical")
.withNetworkAliases("postgres").withAccessToHost(true);
public Map<String, String> getDefaultProperties() throws IOException {
Map<String, String> properties = new HashMap<String, String>();
properties.put("database.hostname", "postgres");
properties.put("database.port", "5432");
properties.put("database.dbname", "public");
properties.put("database.user", "root");
properties.put("database.password", "root");
properties.put("snapshot.mode", "initial");
properties.put("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
properties.put("plugin.name", "decoderbufs");
properties.put("plugin.path", "/");
properties.put("table.include.list", "public.tm");
properties.put("topic.prefix","test-server");
properties.put("slot.max.retries", "6");
properties.put("slot.retry.delay.ms", "5000");
properties.put("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
//String tempOffsetPath = "/tmp/2/offsets" + System.currentTimeMillis() + ".dat";
Path tmpFilePath = Files.createTempFile("offsets", ".dat");
if (tmpFilePath != null) {
System.out.println("TEMP FILE PATH" + tmpFilePath);
}
Files.deleteIfExists(tmpFilePath);
properties.put("offset.storage.file.filename", tmpFilePath.toString());
properties.put("offset.flush.interval.ms", "60000");
properties.put("auto.create.tables", "true");
properties.put("clickhouse.server.url", "clickhouse");
properties.put("clickhouse.server.port", "8123");
properties.put("clickhouse.server.user", "default");
properties.put("clickhouse.server.pass", "");
properties.put("clickhouse.server.database", "public");
properties.put("replacingmergetree.delete.column", "_sign");
properties.put("metrics.port", "8087");
properties.put("database.allowPublicKeyRetrieval", "true");
return properties;
}
@Test
public void testDataTypesDB() throws Exception {
Network network = Network.newNetwork();
postgreSQLContainer.withNetwork(network).start();
clickHouseContainer.withNetwork(network).start();
Thread.sleep(10000);
Testcontainers.exposeHostPorts(postgreSQLContainer.getFirstMappedPort());
GenericContainer sinkConnectorLightWeightContainer = new
GenericContainer("registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:latest")
.withEnv(getDefaultProperties()).withNetwork(network);
sinkConnectorLightWeightContainer.start();
Thread.sleep(50000);
BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);
Map<String, String> tmColumns = writer.getColumnsDataTypesForTable("tm");
Assert.assertTrue(tmColumns.size() == 22);
Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID"));
Assert.assertTrue(tmColumns.get("secid").equalsIgnoreCase("Nullable(UUID)"));
//Assert.assertTrue(tmColumns.get("am").equalsIgnoreCase("Nullable(Decimal(21,5))"));
Assert.assertTrue(tmColumns.get("created").equalsIgnoreCase("Nullable(DateTime64(6))"));
int tmCount = 0;
ResultSet chRs = writer.getConnection().prepareStatement("select count(*) from tm").executeQuery();
while(chRs.next()) {
tmCount = chRs.getInt(1);
}
Assert.assertTrue(tmCount == 1);
// if(engine.get() != null) {
// engine.get().stop();
// }
// executorService.shutdown();
// Files.deleteIfExists(tmpFilePath);
}
}

View File

@ -0,0 +1,124 @@
package com.altinity.clickhouse.debezium.embedded;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.testcontainers.Testcontainers;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
public class ClickHouseDebeziumEmbeddedPostgresDockerIT {
@Container
public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:latest")
.withInitScript("init_clickhouse.sql")
.withExposedPorts(8123).withNetworkAliases("clickhouse").withAccessToHost(true);
@Container
public static PostgreSQLContainer postgreSQLContainer = new PostgreSQLContainer<>("postgres:latest")
.withInitScript("init_postgres.sql")
.withDatabaseName("public")
.withUsername("root")
.withPassword("root")
.withExposedPorts(5432)
.withCommand("postgres -c wal_level=logical")
.withNetworkAliases("postgres").withAccessToHost(true);
public Map<String, String> getDefaultProperties() throws IOException {
Map<String, String> properties = new HashMap<String, String>();
properties.put("database.hostname", "postgres");
properties.put("database.port", "5432");
properties.put("database.dbname", "public");
properties.put("database.user", "root");
properties.put("database.password", "root");
properties.put("snapshot.mode", "initial");
properties.put("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
properties.put("plugin.name", "pgoutput");
properties.put("plugin.path", "/");
properties.put("table.include.list", "public.tm");
properties.put("topic.prefix","test-server");
properties.put("slot.max.retries", "6");
properties.put("slot.retry.delay.ms", "5000");
properties.put("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
//String tempOffsetPath = "/tmp/2/offsets" + System.currentTimeMillis() + ".dat";
Path tmpFilePath = Files.createTempFile("offsets", ".dat");
if (tmpFilePath != null) {
System.out.println("TEMP FILE PATH" + tmpFilePath);
}
Files.deleteIfExists(tmpFilePath);
properties.put("offset.storage.file.filename", tmpFilePath.toString());
properties.put("offset.flush.interval.ms", "60000");
properties.put("auto.create.tables", "true");
properties.put("clickhouse.server.url", "clickhouse");
properties.put("clickhouse.server.port", "8123");
properties.put("clickhouse.server.user", "default");
properties.put("clickhouse.server.pass", "");
properties.put("clickhouse.server.database", "public");
properties.put("replacingmergetree.delete.column", "_sign");
properties.put("metrics.port", "8087");
properties.put("database.allowPublicKeyRetrieval", "true");
return properties;
}
@Test
public void testDataTypesDB() throws Exception {
Network network = Network.newNetwork();
postgreSQLContainer.withNetwork(network).start();
clickHouseContainer.withNetwork(network).start();
Thread.sleep(10000);
Testcontainers.exposeHostPorts(postgreSQLContainer.getFirstMappedPort());
GenericContainer sinkConnectorLightWeightContainer = new
GenericContainer("registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:latest")
.withEnv(getDefaultProperties()).withNetwork(network);
sinkConnectorLightWeightContainer.start();
Thread.sleep(50000);
BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);
Map<String, String> tmColumns = writer.getColumnsDataTypesForTable("tm");
Assert.assertTrue(tmColumns.size() == 22);
Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID"));
Assert.assertTrue(tmColumns.get("secid").equalsIgnoreCase("Nullable(UUID)"));
//Assert.assertTrue(tmColumns.get("am").equalsIgnoreCase("Nullable(Decimal(21,5))"));
Assert.assertTrue(tmColumns.get("created").equalsIgnoreCase("Nullable(DateTime64(6))"));
int tmCount = 0;
ResultSet chRs = writer.getConnection().prepareStatement("select count(*) from tm").executeQuery();
while(chRs.next()) {
tmCount = chRs.getInt(1);
}
Assert.assertTrue(tmCount == 1);
// if(engine.get() != null) {
// engine.get().stop();
// }
// executorService.shutdown();
// Files.deleteIfExists(tmpFilePath);
}
}

View File

@ -0,0 +1,86 @@
package com.altinity.clickhouse.debezium.embedded.ddl.parser;
import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import org.apache.log4j.BasicConfigurator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import java.sql.Connection;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
@Testcontainers
public class ClickHouseDebeziumEmbeddedDDLTableOperationsIT extends ClickHouseDebeziumEmbeddedDDLBaseIT {
@BeforeEach
public void startContainers() throws InterruptedException {
mySqlContainer = new MySQLContainer<>(DockerImageName.parse("docker.io/bitnami/mysql:latest")
.asCompatibleSubstituteFor("mysql"))
.withDatabaseName("employees").withUsername("root").withPassword("adminpass")
.withInitScript("data_types.sql")
.withExtraHost("mysql-server", "0.0.0.0")
.waitingFor(new HttpWaitStrategy().forPort(3306));
BasicConfigurator.configure();
mySqlContainer.start();
clickHouseContainer.start();
Thread.sleep(15000);
}
@Test
public void testTableOperations() throws Exception {
AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {
Properties props = getDebeziumProperties();
props.setProperty("database.include.list", "datatypes");
props.setProperty("clickhouse.server.database", "datatypes");
engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(),
new MySQLDDLParserService());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
Thread.sleep(10000);
Connection conn = connectToMySQL();
conn.prepareStatement("RENAME TABLE ship_class to ship_class_new, add_test to add_test_new").execute();
conn.prepareStatement("RENAME TABLE ship_class_new to ship_class_new2").execute();
conn.prepareStatement("ALTER TABLE ship_class_new2 to ship_class_new3").execute();
conn.prepareStatement("ALTER TABLE ship_class_new2 to ship_class_new3").execute();
conn.prepareStatement("create table new_table(col1 varchar(255), col2 int, col3 int)").execute();
Thread.sleep(10000);
BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"employees", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null);
conn.prepareStatement("create table new_table_copy like new_table");
if(engine.get() != null) {
engine.get().stop();
}
// Files.deleteIfExists(tmpFilePath);
executorService.shutdown();
}
}

View File

@ -424,6 +424,28 @@ public class MySqlDDLParserListenerImplTest {
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase(sql)); Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase(sql));
} }
@Test
public void renameMultipleTables() {
StringBuffer clickHouseQuery = new StringBuffer();
String sql = "rename /* gh-ost */ table `trade_prod`.`enriched_trade` to `trade_prod`.`_enriched_trade_del`, `trade_prod`.`_enriched_trade_gho` to `trade_prod`.`enriched_trade`\n";
MySQLDDLParserService mySQLDDLParserService2 = new MySQLDDLParserService();
mySQLDDLParserService2.parseSql(sql, "", clickHouseQuery);
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("RENAME TABLE `trade_prod`.`enriched_trade` to `trade_prod`.`_enriched_trade_del`,`trade_prod`.`_enriched_trade_gho` to `trade_prod`.`enriched_trade`"));
}
@Test
public void alterTableRenameTable() {
StringBuffer clickHouseQuery = new StringBuffer();
String sql = "ALTER TABLE test_table rename to test_table_new";
MySQLDDLParserService mySQLDDLParserService2 = new MySQLDDLParserService();
mySQLDDLParserService2.parseSql(sql, "", clickHouseQuery);
Assert.assertTrue(clickHouseQuery.toString().equalsIgnoreCase("RENAME TABLE test_table to test_table_new"));
}
// @Test // @Test
// public void testDropDatabase() { // public void testDropDatabase() {
// StringBuffer clickHouseQuery = new StringBuffer(); // StringBuffer clickHouseQuery = new StringBuffer();