mirror of
https://github.com/valitydev/clickhouse-sink-connector.git
synced 2024-11-06 02:25:18 +00:00
Merge pull request #269 from Altinity/252-upgrade-all-integration-tests-to-include-debezium-storage-and-yaml-file
Added logic to use yml file for Integration tests.
This commit is contained in:
commit
288359aab6
@ -10,10 +10,11 @@ import java.util.Properties;
|
||||
|
||||
public class ConfigLoader {
|
||||
|
||||
public Properties load(String fileName) {
|
||||
public Properties load(String resourceFileName) {
|
||||
InputStream fis = this.getClass()
|
||||
.getClassLoader()
|
||||
.getResourceAsStream(fileName);
|
||||
.getResourceAsStream(resourceFileName);
|
||||
|
||||
Map<String, Object> yamlFile = new Yaml().load(fis);
|
||||
|
||||
|
||||
|
@ -43,6 +43,7 @@ public class ClickHouseDebeziumEmbeddedDDLAddColumnIT extends ClickHouseDebezium
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(1);
|
||||
executorService.execute(() -> {
|
||||
try {
|
||||
|
||||
engine.set(new DebeziumChangeEventCapture());
|
||||
engine.get().setup(getDebeziumProperties(), new SourceRecordParserService(),
|
||||
new MySQLDDLParserService());
|
||||
@ -69,7 +70,7 @@ public class ClickHouseDebeziumEmbeddedDDLAddColumnIT extends ClickHouseDebezium
|
||||
conn.prepareStatement("alter table add_test rename column col99 to col101;").execute();
|
||||
conn.prepareStatement(" alter table add_test drop column col101;").execute();
|
||||
|
||||
Thread.sleep(15000);
|
||||
Thread.sleep(25000);
|
||||
|
||||
|
||||
BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
|
||||
@ -81,7 +82,7 @@ public class ClickHouseDebeziumEmbeddedDDLAddColumnIT extends ClickHouseDebezium
|
||||
// Validate all ship_class columns.
|
||||
Assert.assertTrue(shipClassColumns.get("ship_spec").equalsIgnoreCase("Nullable(String)"));
|
||||
Assert.assertTrue(shipClassColumns.get("somecol").equalsIgnoreCase("Nullable(Int32)"));
|
||||
Assert.assertTrue(shipClassColumns.get("newcol").equalsIgnoreCase("Nullable(Bool)"));
|
||||
Assert.assertTrue(shipClassColumns.get("newcol").equalsIgnoreCase("Nullable(Int16)"));
|
||||
Assert.assertTrue(shipClassColumns.get("customer_address").equalsIgnoreCase("String"));
|
||||
Assert.assertTrue(shipClassColumns.get("customer_name").equalsIgnoreCase("Nullable(String)"));
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
package com.altinity.clickhouse.debezium.embedded.ddl.parser;
|
||||
|
||||
import com.altinity.clickhouse.debezium.embedded.common.PropertiesHelper;
|
||||
import com.altinity.clickhouse.debezium.embedded.config.ConfigLoader;
|
||||
import com.altinity.clickhouse.debezium.embedded.config.EnvironmentConfigurationService;
|
||||
import org.apache.log4j.BasicConfigurator;
|
||||
@ -10,6 +11,7 @@ import org.testcontainers.containers.MySQLContainer;
|
||||
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
import org.testcontainers.shaded.com.fasterxml.jackson.databind.annotation.JsonAppend;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
|
||||
import java.nio.file.Files;
|
||||
@ -25,7 +27,7 @@ public class ClickHouseDebeziumEmbeddedDDLBaseIT {
|
||||
|
||||
@Container
|
||||
public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:latest")
|
||||
.withInitScript("init_clickhouse.sql")
|
||||
.withInitScript("init_clickhouse_it.sql")
|
||||
.withExposedPorts(8123);
|
||||
|
||||
|
||||
@ -73,66 +75,32 @@ public class ClickHouseDebeziumEmbeddedDDLBaseIT {
|
||||
|
||||
protected Properties getDebeziumProperties() throws Exception {
|
||||
|
||||
Properties props = new ConfigLoader().load("config.yml");
|
||||
//Properties props = getDebeziumProperties();
|
||||
props.setProperty("database.hostname", mySqlContainer.getHost());
|
||||
props.setProperty("database.port", String.valueOf(mySqlContainer.getFirstMappedPort()));
|
||||
props.setProperty("database.include.list", "employees");
|
||||
props.setProperty("clickhouse.server.database", "employees");
|
||||
props.setProperty("offset.storage.jdbc.url", clickHouseContainer.getJdbcUrl());
|
||||
props.setProperty("clickhouse.server.url", clickHouseContainer.getHost());
|
||||
props.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort()));
|
||||
props.setProperty("schema.history.internal.jdbc.url", clickHouseContainer.getJdbcUrl());
|
||||
props.setProperty("snapshot.mode", "initial");
|
||||
|
||||
// Properties fileProps = new ConfigLoader().load("config.yml");
|
||||
// fileProps.setProperty("clickhouse.server.url", clickHouseContainer.getHost());
|
||||
// fileProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort()));
|
||||
// fileProps.setProperty("database.password", "adminpass");
|
||||
//
|
||||
// fileProps.setProperty("database.hostname", mySqlContainer.getHost());
|
||||
// fileProps.setProperty("database.port", String.valueOf(mySqlContainer.getFirstMappedPort()));
|
||||
|
||||
return props;
|
||||
|
||||
// Start the debezium embedded application.
|
||||
|
||||
Properties defaultProps = (new EnvironmentConfigurationService()).parse();
|
||||
Properties defaultProps = new Properties();
|
||||
Properties defaultProperties = PropertiesHelper.getProperties("config.properties");
|
||||
|
||||
defaultProps.putAll(defaultProperties);
|
||||
Properties fileProps = new ConfigLoader().load("config.yml");
|
||||
defaultProps.putAll(fileProps);
|
||||
|
||||
defaultProps.setProperty("database.hostname", mySqlContainer.getHost());
|
||||
defaultProps.setProperty("database.port", String.valueOf(mySqlContainer.getFirstMappedPort()));
|
||||
defaultProps.setProperty("database.user", "root");
|
||||
defaultProps.setProperty("database.password", "adminpass");
|
||||
|
||||
defaultProps.setProperty("database.include.list", "employees");
|
||||
defaultProps.setProperty("snapshot.mode", "initial");
|
||||
|
||||
|
||||
defaultProps.setProperty("snapshot.mode", "initial");
|
||||
defaultProps.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
|
||||
defaultProps.setProperty("name", "sink-connector-1");
|
||||
defaultProps.setProperty("include.schema.change", "true");
|
||||
defaultProps.setProperty("include.schema.comments", "true");
|
||||
|
||||
defaultProps.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
|
||||
defaultProps.setProperty("provide.transaction.metadata", "true");
|
||||
//String tempOffsetPath = "/tmp/2/offsets" + System.currentTimeMillis() + ".dat";
|
||||
Path tmpFilePath = Files.createTempFile("offsets", ".dat");
|
||||
Files.deleteIfExists(tmpFilePath);
|
||||
defaultProps.setProperty("offset.storage.file.filename", tmpFilePath.toAbsolutePath().toString());
|
||||
defaultProps.setProperty("offset.flush.interval.ms", "60000");
|
||||
|
||||
defaultProps.setProperty("offset.storage.offset.storage.jdbc.offset.table.name", "altinity_sink_connector.replica_source_info");
|
||||
defaultProps.setProperty("auto.create.tables", "true");
|
||||
defaultProps.setProperty("clickhouse.server.url", clickHouseContainer.getHost());
|
||||
defaultProps.setProperty("clickhouse.server.port", String.valueOf(clickHouseContainer.getFirstMappedPort()));
|
||||
defaultProps.setProperty("clickhouse.server.user", "default");
|
||||
defaultProps.setProperty("clickhouse.server.pass", "");
|
||||
defaultProps.setProperty("clickhouse.server.database", "employees");
|
||||
defaultProps.setProperty("replacingmergetree.delete.column", "_sign");
|
||||
defaultProps.setProperty("metrics.port", "8088");
|
||||
defaultProps.setProperty("thread.pool.size", "1");
|
||||
defaultProps.setProperty("database.allowPublicKeyRetrieval", "true");
|
||||
defaultProps.setProperty("metrics.enable", "false");
|
||||
|
||||
defaultProps.setProperty("offset.storage.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
|
||||
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));
|
||||
|
||||
defaultProps.setProperty("schema.history.internal.jdbc.url", String.format("jdbc:clickhouse://%s:%s",
|
||||
clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort()));
|
||||
|
||||
|
||||
return defaultProps;
|
||||
|
||||
|
@ -0,0 +1,45 @@
|
||||
name= "engine"
|
||||
#offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
|
||||
#offset.storage.file.filename=/tmp/offsets.dat
|
||||
|
||||
database.server.name= "clickhouse-ddl"
|
||||
database.server.id= 976
|
||||
#database.history= "io.debezium.relational.history.FileDatabaseHistory"
|
||||
#database.history.file.filename=/tmp/dbhistory.dat
|
||||
#connector.class= io.debezium.connector.mysql.MySqlConnector
|
||||
converter.schemas.enable= "true"
|
||||
schemas.enable= true
|
||||
topic.prefix=embeddedconnector
|
||||
offset.storage="io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore"
|
||||
offset.storage.offset.storage.jdbc.offset.table.name= "default.replica_source_info"
|
||||
offset.storage.jdbc.url: "jdbc:clickhouse://clickhouse:8123"
|
||||
offset.storage.jdbc.user: "root"
|
||||
offset.storage.jdbc.password: "root"
|
||||
offset.storage.offset.storage.jdbc.offset.table.ddl: "CREATE TABLE %s
|
||||
(
|
||||
`id` String,
|
||||
`offset_key` String,
|
||||
`offset_val` String,
|
||||
`record_insert_ts` DateTime,
|
||||
`record_insert_seq` UInt64,
|
||||
`_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9))
|
||||
)
|
||||
ENGINE = ReplacingMergeTree(_version)
|
||||
ORDER BY id
|
||||
SETTINGS index_granularity = 8198"
|
||||
offset.storage.offset.storage.jdbc.offset.table.delete: "delete from %s where 1=1"
|
||||
schema.history.internal: io.debezium.storage.jdbc.history.JdbcSchemaHistory
|
||||
schema.history.internal.jdbc.url: "jdbc:clickhouse://clickhouse:8123"
|
||||
schema.history.internal.jdbc.user: "root"
|
||||
schema.history.internal.jdbc.password: "root"
|
||||
schema.history.internal.schema.history.table.ddl: "CREATE TABLE %s
|
||||
(`id` VARCHAR(36) NOT NULL, `history_data` VARCHAR(65000), `history_data_seq` INTEGER, `record_insert_ts` TIMESTAMP NOT NULL,
|
||||
`record_insert_seq` INTEGER NOT NULL) Engine=ReplacingMergeTree(record_insert_seq) order by id"
|
||||
|
||||
schema.history.internal.schema.history.table.name: "default.replicate_schema_history"
|
||||
|
||||
auto.create.tables= false
|
||||
replacingmergetree.delete.column=_sign
|
||||
metrics.enable= true
|
||||
metrics.port= 8083
|
||||
snapshot.mode= "initial"
|
@ -1,21 +1,19 @@
|
||||
name: clickhouse-connector-test
|
||||
name: "company-1"
|
||||
database.hostname: "mysql-master"
|
||||
database.port: "3306"
|
||||
database.user: "root"
|
||||
database.password: "adminpass"
|
||||
database.password: "root"
|
||||
database.server.name: "ER54"
|
||||
topic.prefix: "altinity_sink_connector"
|
||||
database.server.id: "1"
|
||||
database.include.list: sbtest
|
||||
database.include.list: employees
|
||||
#table.include.list=sbtest1
|
||||
clickhouse.server.url: "clickhouse"
|
||||
clickhouse.server.user: "default"
|
||||
clickhouse.server.pass: ""
|
||||
clickhouse.server.pass: "root"
|
||||
clickhouse.server.port: "8123"
|
||||
clickhouse.server.database: "test"
|
||||
database.allowPublicKeyRetrieval: "true"
|
||||
snapshot.mode: "schema_only"
|
||||
offset.flush.interval.ms: 5000
|
||||
snapshot.mode: "initial"
|
||||
offset.flush.interval.ms: "5000"
|
||||
connector.class: "io.debezium.connector.mysql.MySqlConnector"
|
||||
offset.storage: "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore"
|
||||
offset.storage.offset.storage.jdbc.offset.table.name: "altinity_sink_connector.replica_source_info"
|
||||
@ -43,4 +41,5 @@ schema.history.internal.jdbc.schema.history.table.ddl: "CREATE TABLE if not exis
|
||||
(`id` VARCHAR(36) NOT NULL, `history_data` VARCHAR(65000), `history_data_seq` INTEGER, `record_insert_ts` TIMESTAMP NOT NULL, `record_insert_seq` INTEGER NOT NULL) ENGINE=ReplacingMergeTree(record_insert_seq) order by id"
|
||||
|
||||
schema.history.internal.jdbc.schema.history.table.name: "altinity_sink_connector.replicate_schema_history"
|
||||
enable.snapshot.ddl: "false"
|
||||
enable.snapshot.ddl: "true"
|
||||
auto.create.tables: "true"
|
@ -0,0 +1,44 @@
|
||||
CREATE database datatypes;
|
||||
CREATE database employees;
|
||||
CREATE database public;
|
||||
CREATE database project;
|
||||
|
||||
CREATE TABLE project.items
|
||||
(
|
||||
`price` Int64,
|
||||
`name` String,
|
||||
`_id` String,
|
||||
`uuid` String,
|
||||
`_sign` Int8,
|
||||
`_version` UInt64
|
||||
)
|
||||
ENGINE = ReplacingMergeTree(_version)
|
||||
ORDER BY _id;
|
||||
|
||||
|
||||
CREATE TABLE public.protocol_test
|
||||
(
|
||||
`id` Int64,
|
||||
`consultation_id` Int64,
|
||||
`recomendation` Nullable(String),
|
||||
`create_date` DateTime64(6),
|
||||
`_sign` Int8,
|
||||
`_version` UInt64
|
||||
)
|
||||
ENGINE = ReplacingMergeTree(_version)
|
||||
ORDER BY id;
|
||||
|
||||
CREATE DATABASE altinity_sink_connector;
|
||||
|
||||
CREATE TABLE altinity_sink_connector.replica_source_info
|
||||
(
|
||||
`id` String,
|
||||
`offset_key` String,
|
||||
`offset_val` String,
|
||||
`record_insert_ts` DateTime,
|
||||
`record_insert_seq` UInt64,
|
||||
`_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9))
|
||||
)
|
||||
ENGINE = ReplacingMergeTree(_version)
|
||||
ORDER BY id
|
||||
SETTINGS index_granularity = 8198;
|
Loading…
Reference in New Issue
Block a user