Changes to support new REST API features. Added Debezium callback to check if event loop is running. Pretty print for response in client. Added source replication options.

This commit is contained in:
Kanthi Subramanian 2023-07-12 09:57:39 -04:00
parent eaf8b6b6c5
commit 2e3c65d968
7 changed files with 128 additions and 19 deletions

View File

@ -5,6 +5,7 @@ go 1.18
require (
github.com/levigross/grequests v0.0.0-20221222020224-9eee758d18d5
github.com/stretchr/testify v1.8.2
github.com/tidwall/pretty v1.2.1
github.com/urfave/cli v1.22.13
)

View File

@ -19,6 +19,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/urfave/cli v1.22.13 h1:wsLILXG8qCJNse/qAgLNf23737Cx05GflHg/PJGe1Ok=
github.com/urfave/cli v1.22.13/go.mod h1:VufqObjsMTF2BBwKawpx9R8eAneNEWhoO0yx8Vd+FkE=
golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1 h1:Y/KGZSOdz/2r0WJ9Mkmz6NJBusp0kiNx1Cn82lzJQ6w=

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"github.com/levigross/grequests"
"github.com/tidwall/pretty"
cli "github.com/urfave/cli"
"log"
"os"
@ -26,7 +27,7 @@ const (
START_REPLICATION_COMMAND = "start_replica"
STOP_REPLICATION_COMAND = "stop_replica"
STATUS_COMMAND = "show_replica_status"
UPDATE_BINLOG_COMMAND = "update_binlog"
UPDATE_BINLOG_COMMAND = "change_replication_source"
UPDATE_LSN_COMMAND = "lsn"
)
const (
@ -124,7 +125,9 @@ func main() {
Action: func(c *cli.Context) error {
var serverUrl = getServerUrl(STATUS, c)
resp := getHTTPCall(serverUrl)
log.Println(resp.String())
//var j, _ = json.MarshalIndent(resp, "", " ")
fmt.Println(string(pretty.Pretty([]byte(resp.String()))))
return nil
},
},
@ -135,20 +138,45 @@ func main() {
cli.StringFlag{
Name: "binlog_file",
Usage: "Set binlog file",
Required: true,
Required: false,
},
cli.StringFlag{
Name: "binlog_position",
Usage: "Set binlog position",
Required: true,
Required: false,
},
cli.StringFlag{
Name: "gtid",
Usage: "Set GTID",
Required: true,
Required: false,
},
cli.StringFlag{
Name: "source_host",
Usage: "Source Hostname",
Required: false,
},
cli.StringFlag{
Name: "source_port",
Usage: "Source Port",
Required: false,
},
cli.StringFlag{
Name: "source_username",
Usage: "Source Username",
Required: false,
},
cli.StringFlag{
Name: "source_password",
Usage: "Source Password",
Required: false,
},
},
Action: func(c *cli.Context) error {
handleUpdateBinLogAction(c)
return nil
},
@ -225,6 +253,14 @@ func handleUpdateBinLogAction(c *cli.Context) bool {
var binlogPos = c.String("binlog_position")
var gtid = c.String("gtid")
if gtid == "" {
// If gtid is empty, then a valid binlog file and position
// needs to be passed.
if binlogPos == "" || binlogFile == "" {
log.Println(" ****** A Valid binlog position/file or GTID set is required")
return false
}
}
log.Println("***** binlog file: ", binlogFile+" *****")
log.Println("***** binlog position:", binlogPos+" *****")
log.Println("***** GTID:", gtid+" *****")
@ -238,11 +274,11 @@ func handleUpdateBinLogAction(c *cli.Context) bool {
log.Println("Continuing...")
}
// Step1: Stop replication
log.Println("Stopping replication...")
var stopUrl = getServerUrl(STOP_REPLICATION, c)
resp := getHTTPCall(stopUrl)
time.Sleep(5 * time.Second)
//// Step1: Stop replication
//log.Println("Stopping replication...")
//var stopUrl = getServerUrl(STOP_REPLICATION, c)
//resp := getHTTPCall(stopUrl)
//time.Sleep(5 * time.Second)
// Step2: Update binlog position
log.Println("Updating binlog file/position and gtids...")
@ -253,16 +289,23 @@ func handleUpdateBinLogAction(c *cli.Context) bool {
requestOptions_copy.JSON = string(postBody)
var serverUrl = getServerUrl(UPDATE_BINLOG, c)
resp, err := grequests.Post(serverUrl, requestOptions_copy)
if resp.StatusCode == 400 {
log.Println("***** Error: Replication is running, please stop it first ******")
log.Println("***** Use stop_replica command to stop replication ******")
log.Println("***** After change_replication_source is successful, use start_replica to start replication *******")
return false
}
log.Println(resp.String())
if err != nil {
log.Println(err)
log.Println("Create request failed for Github API")
log.Println("Create request failed")
}
// Step3: Start replication
time.Sleep(10 * time.Second)
var startUrl = getServerUrl(START_REPLICATION, c)
resp1 := getHTTPCall(startUrl)
log.Println(resp1.String())
//
//// Step3: Start replication
//time.Sleep(10 * time.Second)
//var startUrl = getServerUrl(START_REPLICATION, c)
//resp1 := getHTTPCall(startUrl)
//log.Println(resp1.String())
return true
}

View File

@ -12,6 +12,7 @@ import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.google.inject.Guice;
import com.google.inject.Injector;
import io.javalin.Javalin;
import io.javalin.http.HttpStatus;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
@ -99,6 +100,10 @@ public class ClickHouseDebeziumEmbeddedApplication {
});
app.post("/binlog", ctx -> {
if(debeziumChangeEventCapture.isReplicationRunning()) {
ctx.status(HttpStatus.BAD_REQUEST);
return;
}
String body = ctx.body();
JSONObject jsonObject = (JSONObject) new JSONParser().parse(body);
String binlogFile = (String) jsonObject.get(BINLOG_FILE);

View File

@ -66,6 +66,16 @@ public class DebeziumChangeEventCapture {
static public boolean isNewReplacingMergeTreeEngine = true;
private long replicationLag = 0;
private boolean isReplicationRunning = false;
private String binLogFile = "";
private String binLogPosition = "";
private String gtid = "";
DebeziumEngine<ChangeEvent<SourceRecord, SourceRecord>> engine;
private void performDDLOperation(String DDL, Properties props, SourceRecord sr, ClickHouseSinkConnectorConfig config) {
@ -180,7 +190,14 @@ public class DebeziumChangeEventCapture {
} else {
ClickHouseStruct chStruct = debeziumRecordParserService.parse(sr);
try {
this.replicationLag = chStruct.getReplicationLag();
this.binLogFile = chStruct.getFile();
this.binLogPosition = String.valueOf(chStruct.getPos());
this.gtid = String.valueOf(chStruct.getGtid());
} catch(Exception e) {
log.error("Error retrieving status metrics");
}
ConcurrentLinkedQueue<ClickHouseStruct> queue = new ConcurrentLinkedQueue<ClickHouseStruct>();
if (chStruct != null) {
queue.add(chStruct);
@ -308,6 +325,13 @@ public class DebeziumChangeEventCapture {
.collect(Collectors.toList());
JSONArray result = new JSONArray();
JSONObject replicationLag = new JSONObject();
replicationLag.put("Seconds_Behind_Source", this.replicationLag/1000);
result.add(replicationLag);
JSONObject replicationRunning = new JSONObject();
replicationRunning.put("Replica_Running", this.isReplicationRunning);
result.add(replicationRunning);
// Add Database name and table name.
JSONObject dbName = new JSONObject();
@ -436,7 +460,22 @@ public class DebeziumChangeEventCapture {
}
log.debug("Completion callback");
}
}).build();
}).using(
new DebeziumEngine.ConnectorCallback() {
@Override
public void connectorStarted() {
isReplicationRunning = true;
log.debug("Connector started");
}
@Override
public void connectorStopped() {
isReplicationRunning = false;
log.debug("Connector stopped");
}
}
).build();
engine.run();
} catch (Exception e) {
@ -475,6 +514,18 @@ public class DebeziumChangeEventCapture {
Metrics.stop();
}
public long getReplicationLag() {
return this.replicationLag;
}
public long getReplicationLagInSecs() {
return this.replicationLag / 1000;
}
public boolean isReplicationRunning() {
return this.isReplicationRunning;
}
DBCredentials parseDBConfiguration(ClickHouseSinkConnectorConfig config) {
DBCredentials dbCredentials = new DBCredentials();
@ -521,6 +572,7 @@ public class DebeziumChangeEventCapture {
}
}
// db.items.insert({_id:ObjectId(), uuid:ObjectId(), price:22, name:"New record"});
private void trySomething(Configuration config){
PostgresConnectorConfig postgresConfig = new PostgresConnectorConfig(config);

View File

@ -231,6 +231,12 @@ public class ClickHouseStruct {
}
}
public long getReplicationLag() {
if(this.getTs_ms() > 0) {
return System.currentTimeMillis() - this.getTs_ms();
}
return 0;
}
@Override
public String toString() {
return new StringBuffer()