Committed changes to set source host, portnumber, username, password of mysql host

This commit is contained in:
Kanthi Subramanian 2023-07-18 18:56:55 -04:00
parent 2e3c65d968
commit b39e7d0771
9 changed files with 103 additions and 24 deletions

5
Dockerfile Normal file
View File

@ -0,0 +1,5 @@
FROM openjdk:11
COPY sink-connector-client/sink-connector-client /sink-connector-client
COPY sink-connector-lightweight/target/clickhouse-debezium-embedded-1.0-SNAPSHOT.jar /app.jar
ENV JAVA_OPTS="-Dlog4jDebug=true"
ENTRYPOINT ["java", "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005", "-jar","/app.jar", "/config.yml", "com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication"]

16
build_docker.sh Executable file
View File

@ -0,0 +1,16 @@
#!/bin/bash
cd sink-connector-lightweight
mvn clean install -DskipTests=true
today_date=$(date +%F)
cd ..
cd sink-connector-client
CGO_ENABLED=0 go build
cd ..
docker login registry.gitlab.com
docker build -f sink-connector-lightweight/Dockerfile -t clickhouse_debezium_embedded:${today_date} . --no-cache
docker tag clickhouse_debezium_embedded:${today_date} registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:${today_date}
docker push registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:${today_date}

View File

@ -14,9 +14,13 @@ import (
var requestOptions = &grequests.RequestOptions{}
type UpdateBinLog struct {
File string `json:"binlog_file"`
Position string `json:"binlog_position"`
Gtid string `json:"gtid"`
File string `json:"binlog_file"`
Position string `json:"binlog_position"`
Gtid string `json:"gtid"`
SourceHost string `json:"source_host"`
SourcePort string `json:"source_port"`
SourceUser string `json:"source_user"`
SourcePassword string `json:"source_password"`
}
type UpdateLsn struct {
@ -252,14 +256,23 @@ func handleUpdateBinLogAction(c *cli.Context) bool {
var binlogFile = c.String("binlog_file")
var binlogPos = c.String("binlog_position")
var gtid = c.String("gtid")
var sourceHost = c.String("source_host")
var sourcePort = c.String("source_port")
var sourceUsername = c.String("source_username")
var sourcePassword = c.String("source_password")
if gtid == "" {
// If gtid is empty, then a valid binlog file and position
// needs to be passed.
if binlogPos == "" || binlogFile == "" {
log.Println(" ****** A Valid binlog position/file or GTID set is required")
return false
}
//if binlogPos == "" || binlogFile == "" {
// log.Println(" ****** A Valid binlog position/file or GTID set is required")
// cli.ShowCommandHelp(c, UPDATE_BINLOG_COMMAND)
// return false
//} else if sourceHost == "" || sourcePort == "" || sourceUsername == "" || sourcePassword == "" {
// log.Println(" ****** A Valid source host/port/username/password is required")
// cli.ShowCommandHelp(c, UPDATE_BINLOG_COMMAND)
// return false
//}
}
log.Println("***** binlog file: ", binlogFile+" *****")
log.Println("***** binlog position:", binlogPos+" *****")
@ -282,7 +295,7 @@ func handleUpdateBinLogAction(c *cli.Context) bool {
// Step2: Update binlog position
log.Println("Updating binlog file/position and gtids...")
var updateBinLogBody = UpdateBinLog{File: binlogFile, Position: binlogPos, Gtid: gtid}
var updateBinLogBody = UpdateBinLog{File: binlogFile, Position: binlogPos, Gtid: gtid, SourceHost: sourceHost, SourcePort: sourcePort, SourceUser: sourceUsername, SourcePassword: sourcePassword}
var postBody, _ = json.Marshal(updateBinLogBody)
var requestOptions_copy = requestOptions
// Add data to JSON field

View File

@ -1,4 +1,5 @@
FROM openjdk:11
COPY target/clickhouse-debezium-embedded-1.0-SNAPSHOT.jar /app.jar
COPY sink-connector-client/sink-connector-client /sink-connector-client
COPY sink-connector-lightweight/target/clickhouse-debezium-embedded-1.0-SNAPSHOT.jar /app.jar
ENV JAVA_OPTS="-Dlog4jDebug=true"
ENTRYPOINT ["java", "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005", "-jar","/app.jar", "/config.yml", "com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication"]

View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<module version="4">
<component name="AdditionalModuleElements">
<content url="file://$MODULE_DIR$" dumb="true">
<sourceFolder url="file://$MODULE_DIR$/src/test" isTestSource="true" />
</content>
</component>
</module>

View File

@ -11,6 +11,7 @@ import com.altinity.clickhouse.debezium.embedded.parser.DebeziumRecordParserServ
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.google.inject.Guice;
import com.google.inject.Injector;
import io.debezium.engine.DebeziumEngine;
import io.javalin.Javalin;
import io.javalin.http.HttpStatus;
import org.apache.log4j.ConsoleAppender;
@ -35,6 +36,7 @@ public class ClickHouseDebeziumEmbeddedApplication {
private static DebeziumChangeEventCapture debeziumChangeEventCapture;
private static Properties userProperties = new Properties();
/**
* Main Entry for the application
@ -109,9 +111,37 @@ public class ClickHouseDebeziumEmbeddedApplication {
String binlogFile = (String) jsonObject.get(BINLOG_FILE);
String binlogPosition = (String) jsonObject.get(BINLOG_POS);
String gtid = (String) jsonObject.get(GTID);
String sourceHost = (String) jsonObject.get(SOURCE_HOST);
String sourcePort = (String) jsonObject.get(SOURCE_PORT);
String sourceUser = (String) jsonObject.get(SOURCE_USER);
String sourcePassword = (String) jsonObject.get(SOURCE_PASSWORD);
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(finalProps1));
debeziumChangeEventCapture.updateDebeziumStorageStatus(config, finalProps1, binlogFile, binlogPosition, gtid);
if(sourceHost != null && !sourceHost.isEmpty()) {
userProperties.setProperty("database.hostname", sourceHost);
}
if(sourcePort != null && !sourcePort.isEmpty()) {
userProperties.setProperty("database.port", sourcePort);
}
if(sourceUser != null && !sourceUser.isEmpty()) {
userProperties.setProperty("database.user", sourceUser);
}
if(sourcePassword != null && !sourcePassword.isEmpty()) {
userProperties.setProperty("database.password", sourcePassword);
}
if(userProperties.size() > 0) {
log.info("User Overridden properties: " + userProperties);
}
debeziumChangeEventCapture.updateDebeziumStorageStatus(config, finalProps1, binlogFile, binlogPosition,
gtid, sourceHost, sourcePort, sourceUser, sourcePassword);
log.info("Received update-binlog request: " + body);
});
@ -128,7 +158,7 @@ public class ClickHouseDebeziumEmbeddedApplication {
Properties finalProps = props;
app.get("/start", ctx -> {
finalProps.putAll(userProperties);
CompletableFuture<String> cf = startDebeziumEventLoop(injector, finalProps);
ctx.result("Started Debezium Event Loop");
});
@ -142,14 +172,7 @@ public class ClickHouseDebeziumEmbeddedApplication {
embeddedApplication = new ClickHouseDebeziumEmbeddedApplication();
embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(ConfigurationService.class),
injector.getInstance(DDLParserService.class), props);
}
public static CompletableFuture<String> startDebeziumEventLoop(Injector injector, Properties props) throws InterruptedException {
@ -162,7 +185,6 @@ public class ClickHouseDebeziumEmbeddedApplication {
embeddedApplication = new ClickHouseDebeziumEmbeddedApplication();
embeddedApplication.start(injector.getInstance(DebeziumRecordParserService.class),
injector.getInstance(ConfigurationService.class),
injector.getInstance(DDLParserService.class), props);
return null;
});
@ -172,7 +194,6 @@ public class ClickHouseDebeziumEmbeddedApplication {
public void start(DebeziumRecordParserService recordParserService,
ConfigurationService configurationService,
DDLParserService ddlParserService, Properties props) throws Exception {
// Define the configuration for the Debezium Engine with MySQL connector...
// log.debug("Loading properties");

View File

@ -364,7 +364,9 @@ public class DebeziumChangeEventCapture {
* @param gtid
*/
public void updateDebeziumStorageStatus(ClickHouseSinkConnectorConfig config, Properties props,
String binlogFile, String binLogPosition, String gtid) throws SQLException, ParseException {
String binlogFile, String binLogPosition, String gtid,
String sourceHost, String sourcePort, String sourceUsername,
String sourcePassword) throws SQLException, ParseException {
String tableName = props.getProperty(JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX +

View File

@ -3,6 +3,7 @@ package com.altinity.clickhouse.debezium.embedded.cdc;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import org.apache.kafka.common.protocol.types.Field;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
@ -25,6 +26,12 @@ public class DebeziumOffsetStorage {
public static final String LSN_PROCESSED = "lsn_proc";
public static final String LSN = "lsn";
// Source Host
public static final String SOURCE_HOST = "source_host";
public static final String SOURCE_PORT = "source_port";
public static final String SOURCE_USER = "source_user";
public static final String SOURCE_PASSWORD = "source_password";
public String getOffsetKey(Properties props) {
@ -72,9 +79,15 @@ public class DebeziumOffsetStorage {
jsonObject.put("transaction_id", null);
}
jsonObject.put("file", binLogFile);
jsonObject.put("pos", binLogPosition);
if(gtids != null) {
if(binLogFile != null && !binLogFile.isEmpty()) {
jsonObject.put("file", binLogFile);
}
if(binLogPosition != null && !binLogPosition.isEmpty()) {
jsonObject.put("pos", binLogPosition);
}
if(gtids != null && !gtids.isEmpty()) {
jsonObject.put("gtids", gtids);
}