Added lag metrics in Grafana dashboard.

This commit is contained in:
Kanthi Subramanian 2022-10-23 13:41:21 -04:00
parent eca7b93bb1
commit 15d9e9ba56
14 changed files with 226 additions and 51 deletions

View File

@ -1,5 +1,7 @@
DATABASE=airportdb
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE
docker exec -it clickhouse clickhouse-client -uroot --password root -mn --query "drop database if exists $DATABASE;create database $DATABASE;"
docker exec -it mysql-master mysql -uroot -proot -e ""
sleep 5
./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE

View File

@ -1,6 +1,6 @@
DATABASE=employees
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE
docker exec -it clickhouse clickhouse-client -uroot --password root -mn --query "drop database if exists $DATABASE;create database $DATABASE;"
mkdir test_db
cd test_db
@ -27,3 +27,6 @@ docker cp load_salaries3.dump mysql-master:/
docker cp load_titles.dump mysql-master:/
docker exec -it mysql-master mysql -uroot -proot -e "source /employees.sql"
sleep 5
./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE

View File

@ -16,4 +16,5 @@ docker exec -it mysql-master mysqlimport -uroot -proot --local menagerie pet.txt
docker exec -it mysql-master mysqlimport -uroot -proot --local menagerie event.txt
rm -fr menagerie-db
sleep 5
./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE

View File

@ -10,4 +10,5 @@ docker exec -it mysql-master mysql -uroot -proot -e "source /tmp/sakila-schema.s
rm -f sakila-db.zip
rm -fr sakila-db/
sleep 5
./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE

View File

@ -1,8 +1,8 @@
DATABASE=sbtest
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE
docker exec -it clickhouse clickhouse-client -uroot --password root -mn --query "drop database if exists $DATABASE;create database $DATABASE;"
sleep 5
./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE

View File

@ -1,6 +1,6 @@
DATABASE=world
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE
docker exec -it clickhouse clickhouse-client -uroot --password root -mn --query "drop database if exists $DATABASE;create database $DATABASE;"
wget https://downloads.mysql.com/docs/world-db.zip
unzip -a world-db.zip
@ -8,3 +8,6 @@ docker cp world-db/world.sql mysql-master:/
docker exec -it mysql-master mysql -uroot -proot -e "source /world.sql"
rm -fr world-db.zip
rm -fr world-db/
sleep 5
./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE

View File

@ -1,6 +1,6 @@
DATABASE=world_x
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE && ./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE
./debezium-delete.sh && ./debezium-connector-setup-database.sh $DATABASE
docker exec -it clickhouse clickhouse-client -uroot --password root -mn --query "drop database if exists $DATABASE;create database $DATABASE;"
wget https://downloads.mysql.com/docs/world_x-db.zip
unzip -a world_x-db.zip
@ -8,3 +8,6 @@ docker cp world_x-db/world_x.sql mysql-master:/
docker exec -it mysql-master mysql -uroot -proot -e "source /world_x.sql"
rm -fr world_x-db.zip
rm -fr world_x-db/
sleep 5
./sink-delete.sh && ./sink-connector-setup-database.sh $DATABASE

Binary file not shown.

Before

Width:  |  Height:  |  Size: 117 KiB

After

Width:  |  Height:  |  Size: 424 KiB

View File

@ -1,10 +0,0 @@
datasources:
- access: 'proxy' # make grafana perform the requests
editable: true # whether it should be editable
is_default: true # whether this should be the default DS
name: 'Prometheus' # name of the datasource
org_id: 1 # id of the organization to tie this datasource to
type: 'prometheus' # type of the data source
url: 'http://prometheus:9090' # url of the prom instance
database: 'prometheus'
version: 1 # well, versioning

View File

@ -3,7 +3,10 @@
"list": [
{
"builtIn": 1,
"datasource": "-- Grafana --",
"datasource": {
"type": "datasource",
"uid": "grafana"
},
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
@ -20,17 +23,18 @@
},
"editable": true,
"fiscalYearStartMonth": 0,
"gnetId": null,
"graphTooltip": 0,
"id": 2,
"links": [],
"liveNow": false,
"panels": [
{
"datasource": null,
"datasource": {
"type": "prometheus",
"uid": "PBFA97CFB590B2093"
},
"gridPos": {
"h": 5,
"w": 12,
"w": 6,
"x": 0,
"y": 0
},
@ -39,12 +43,21 @@
"content": "<div>\n <img width=\"100\" height=\"100\" src=\"https://logosandtypes.com/wp-content/uploads/2020/12/altinity.svg\"/>\n<p>Altinity Sink Connector for ClickHouse</p>\n<p>For more information, visit <href https://github.com/Altinity/clickhouse-sink-connector\n</div>",
"mode": "html"
},
"pluginVersion": "8.2.6",
"pluginVersion": "9.0.4",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "PBFA97CFB590B2093"
},
"refId": "A"
}
],
"title": "Altinity Sink Connector for ClickHouse",
"type": "text"
},
{
"datasource": "prometheus",
"datasource": {},
"fieldConfig": {
"defaults": {
"color": {
@ -70,7 +83,7 @@
"gridPos": {
"h": 5,
"w": 5,
"x": 12,
"x": 6,
"y": 0
},
"id": 4,
@ -89,13 +102,18 @@
"text": {},
"textMode": "auto"
},
"pluginVersion": "8.2.6",
"pluginVersion": "9.0.4",
"targets": [
{
"datasource": {
"uid": "prometheus"
},
"editorMode": "code",
"exemplar": true,
"expr": "clickhouse_sink_binlog_pos{}",
"interval": "",
"legendFormat": "",
"range": true,
"refId": "A"
}
],
@ -103,7 +121,7 @@
"type": "stat"
},
{
"datasource": "prometheus",
"datasource": {},
"fieldConfig": {
"defaults": {
"color": {
@ -129,7 +147,7 @@
"gridPos": {
"h": 5,
"w": 5,
"x": 17,
"x": 11,
"y": 0
},
"id": 6,
@ -148,13 +166,18 @@
"text": {},
"textMode": "auto"
},
"pluginVersion": "8.2.6",
"pluginVersion": "9.0.4",
"targets": [
{
"datasource": {
"uid": "prometheus"
},
"editorMode": "code",
"exemplar": true,
"expr": "clickhouse_sink_gtid{}",
"interval": "",
"legendFormat": "",
"range": true,
"refId": "A"
}
],
@ -162,7 +185,7 @@
"type": "stat"
},
{
"datasource": "prometheus",
"datasource": {},
"fieldConfig": {
"defaults": {
"color": {
@ -171,7 +194,8 @@
"custom": {
"align": "auto",
"displayMode": "auto",
"filterable": false
"filterable": false,
"inspect": false
},
"mappings": [],
"thresholds": {
@ -229,19 +253,30 @@
},
"gridPos": {
"h": 8,
"w": 10,
"w": 12,
"x": 0,
"y": 5
},
"id": 8,
"options": {
"footer": {
"fields": "",
"reducer": [
"sum"
],
"show": false
},
"frameIndex": 0,
"showHeader": true,
"sortBy": []
},
"pluginVersion": "8.2.6",
"pluginVersion": "9.0.4",
"targets": [
{
"datasource": {
"uid": "prometheus"
},
"editorMode": "code",
"exemplar": true,
"expr": "clickhouse_topics_num_records_total{}",
"format": "table",
@ -255,7 +290,7 @@
"type": "table"
},
{
"datasource": "prometheus",
"datasource": {},
"description": "",
"fieldConfig": {
"defaults": {
@ -310,7 +345,7 @@
"gridPos": {
"h": 8,
"w": 12,
"x": 10,
"x": 12,
"y": 5
},
"id": 10,
@ -318,18 +353,25 @@
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single"
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"uid": "prometheus"
},
"editorMode": "code",
"exemplar": true,
"expr": "clickhouse_topics_num_records_total{}",
"interval": "",
"legendFormat": "",
"range": true,
"refId": "A"
}
],
@ -337,7 +379,7 @@
"type": "timeseries"
},
{
"datasource": "prometheus",
"datasource": {},
"fieldConfig": {
"defaults": {
"color": {
@ -399,27 +441,122 @@
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single"
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"uid": "prometheus"
},
"editorMode": "code",
"exemplar": true,
"expr": "clickhouse_sink_partition_offset{}",
"interval": "",
"legendFormat": "",
"range": true,
"refId": "A"
}
],
"title": "Kafka Offset",
"type": "timeseries"
},
{
"datasource": {},
"description": "Lag calculated from time records are inserted to CH and the timestamp recorded by debezium connector",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 13
},
"id": 14,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "PBFA97CFB590B2093"
},
"editorMode": "code",
"expr": "clickhouse_sink_lag",
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Sink Connector Lag",
"type": "timeseries"
}
],
"refresh": false,
"schemaVersion": 32,
"schemaVersion": 36,
"style": "dark",
"tags": [],
"templating": {
@ -431,7 +568,8 @@
},
"timepicker": {},
"timezone": "",
"title": "altinity_sink_connector_clickhouse",
"title": "Altinity Sink Connector for ClickHouse",
"uid": "c4cKtgk4k",
"version": 1
"version": 1,
"weekStart": ""
}

View File

@ -2,7 +2,7 @@ datasources:
- access: 'proxy' # make grafana perform the requests
editable: true # whether it should be editable
is_default: true # whether this should be the default DS
name: 'Prometheus' # name of the datasource
name: 'prometheus' # name of the datasource
org_id: 1 # id of the organization to tie this datasource to
type: 'prometheus' # type of the data source
url: 'http://prometheus:9090' # url of the prom instance

View File

@ -54,6 +54,8 @@ public class Metrics {
private static Gauge partitionOffsetCounter;
private static Gauge topicLagCounter;
private static Gauge gtidCounter;
private static HttpServer server;
@ -122,6 +124,11 @@ public class Metrics {
labelNames("Topic", "Partition").
name("clickhouse_sink_partition_offset").help("Kafka partition Offset").register(collectorRegistry);
topicLagCounter = Gauge.build().
labelNames("Topic").
name("clickhouse_sink_lag").help("Lag between Debezium processing and Bulk Insert to CH").register(collectorRegistry);
topicsNumRecordsCounter = Counter.builder("clickhouse.topics.num.records");
}
@ -176,7 +183,11 @@ public class Metrics {
partitionOffsetCounter.labels(entry.getKey(), Integer.toString(mp.left))
.set(mp.right);
}
}
if(!bmd.getTopicToBlockTimestamp().isEmpty()) {
for(Map.Entry<String, Long> entry: bmd.getTopicToBlockTimestamp().entrySet()) {
topicLagCounter.labels(entry.getKey()).set(entry.getValue());
}
}
}

View File

@ -5,6 +5,7 @@ import lombok.Setter;
import org.apache.commons.lang3.tuple.MutablePair;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
@ -69,6 +70,16 @@ public class BlockMetaData {
@Setter
String topicName = null;
// The time when the block is persisted to clickhouse
// Useful to calculate lag between source DB(binlog)
// debezium connector timestamp vs sink connector timestamp.
@Getter
@Setter
Map<String, Long> topicToBlockTimestamp = new HashMap();
// Timestamp recorded when the block was written;
long blockInsertionTimestamp = System.currentTimeMillis();
public void update(ClickHouseStruct record) {
int gtId = record.getGtid();
@ -85,12 +96,22 @@ public class BlockMetaData {
this.partition = record.getKafkaPartition();
}
long offset = record.getKafkaOffset();
if(record.getTopic() != null) {
this.topicName = record.getTopic();
}
long lag = blockInsertionTimestamp - record.getTs_ms();
if(topicToBlockTimestamp.containsKey(this.topicName)) {
long storedLag = topicToBlockTimestamp.get(this.topicName);
if(lag > storedLag) {
topicToBlockTimestamp.put(this.topicName, lag);
}
} else {
topicToBlockTimestamp.put(this.topicName, lag);
}
long offset = record.getKafkaOffset();
if (partitionToOffsetMap.containsKey(this.topicName)) {
MutablePair<Integer, Long> mp = partitionToOffsetMap.get(this.topicName);
if (offset >= mp.right) {

View File

@ -41,6 +41,7 @@ public class ClickHouseStruct {
@Setter
private ArrayList<String> primaryKey;
// Database processed timestamp
@Getter
@Setter
private long ts_ms;
@ -181,6 +182,7 @@ public class ClickHouseStruct {
}
try {
if (fieldNames.contains(TS_MS) && source.get(TS_MS) != null && source.get(TS_MS) instanceof Long) {
// indicates the time that the change was made in the database.
this.setTs_ms((Long) source.get(TS_MS));
}
if (fieldNames.contains(SNAPSHOT) && source.get(SNAPSHOT) != null && source.get(SNAPSHOT) instanceof String) {