Added partition to offset metrics(Prometheus)

This commit is contained in:
Kanthi Subramanian 2022-07-27 11:01:52 -04:00
parent 7572f29f9e
commit 9c6b376667
7 changed files with 231 additions and 11 deletions

View File

@ -181,6 +181,25 @@ services:
# KAFKA_BROKERS: kafka:19092
# restart: unless-stopped
console:
image: docker.redpanda.com/vectorized/console:master-217260f
restart: on-failure
entrypoint: /bin/sh
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
environment:
CONFIG_FILEPATH: /tmp/config.yml
CONSOLE_CONFIG_FILE: |
kafka:
brokers: ["kafka:9092"]
connect:
enabled: true
clusters:
- name: datagen
url: http://debezium:8083
ports:
- "8089:8080"
depends_on:
- kafka
sink:

View File

@ -4,7 +4,7 @@
rm -fr MySQL.tsv
rm -fr CH.tsv
docker exec -it clickhouse clickhouse-client -uroot --password root --query "select id ,k ,c , pad from test.sbtest1 where sign !=-1 order by id format TSV" | grep -v "<jemalloc>" >CH.tsv
docker exec -it clickhouse clickhouse-client -uroot --password root --query "select id ,k from test.sbtest1 where sign !=-1 order by id format TSV" | grep -v "<jemalloc>" >CH.tsv
docker exec -it mysql-master mysql -uroot -proot -B -N -e "select * from sbtest.sbtest1 order by id" | grep -v "Using a password on the command line interface" >MySQL.tsv
diff --strip-trailing-cr MySQL.tsv CH.tsv

View File

@ -50,4 +50,10 @@ group by database, table, event_type, partition_id
order by c desc`
Target:
5 threads , 600k/second
5 threads , 600k/second
## Binary logs
`show binary logs;`
`show binlog events in `mysql-bin.000003`

View File

@ -4,5 +4,5 @@ FROM grafana/grafana:${GRAFANA_VERSION}
COPY ./config/dashboard.yml /etc/grafana/provisioning/dashboards
COPY ./config/datasource.yml /etc/grafana/provisioning/datasources
COPY ./config/debezium-dashboard.json /var/lib/grafana/dashboards/debezium-dashboard.json
COPY ./config/altinity_sink_connector.json /var/lib/grafana/dashboards/altinity_sink_connector.json
COPY ./config/debezium-mysql-connector.json /var/lib/grafana/dashboards/debezium-mysql-connector.json
COPY ./config/altinity_sink_connector.json /etc/grafana/provisioning/dashboards
COPY ./config/debezium-mysql-connector.json /etc/grafana/provisioning/dashboards

View File

@ -228,8 +228,8 @@
]
},
"gridPos": {
"h": 5,
"w": 22,
"h": 8,
"w": 10,
"x": 0,
"y": 5
},
@ -253,6 +253,169 @@
],
"title": "Records (Topic)",
"type": "table"
},
{
"datasource": "prometheus",
"description": "",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "stepAfter",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 10,
"y": 5
},
"id": 10,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
},
"tooltip": {
"mode": "single"
}
},
"targets": [
{
"exemplar": true,
"expr": "clickhouse_topics_num_records_total{}",
"interval": "",
"legendFormat": "",
"refId": "A"
}
],
"title": "Total Records",
"type": "timeseries"
},
{
"datasource": "prometheus",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 13
},
"id": 12,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
},
"tooltip": {
"mode": "single"
}
},
"targets": [
{
"exemplar": true,
"expr": "clickhouse_sink_partition_offset{}",
"interval": "",
"legendFormat": "",
"refId": "A"
}
],
"title": "Kafka Offset",
"type": "timeseries"
}
],
"refresh": false,

View File

@ -52,6 +52,8 @@ public class Metrics {
private static Gauge maxBinLogPositionCounter;
private static Gauge partitionOffsetCounter;
private static Gauge gtidCounter;
private static HttpServer server;
@ -116,6 +118,10 @@ public class Metrics {
maxBinLogPositionCounter = Gauge.build().name("clickhouse_sink_binlog_pos").help("Bin Log Position").register(collectorRegistry);
gtidCounter = Gauge.build().name("clickhouse_sink_gtid").help("GTID Transaction Id").register(collectorRegistry);
partitionOffsetCounter = Gauge.build().
labelNames("Topic", "Partition").
name("clickhouse_sink_partition_offset").help("Kafka partition Offset").register(collectorRegistry);
topicsNumRecordsCounter = Counter.builder("clickhouse.topics.num.records");
}
@ -158,6 +164,17 @@ public class Metrics {
gtidCounter.set(bmd.getTransactionId());
//tag("partition", Integer.toString(bmd.getPartition())).
//register(Metrics.meterRegistry()).increment(bmd.getTransactionId());
HashMap<String, MutablePair<Integer, Long>> partitionToOffsetMap = bmd.getPartitionToOffsetMap();
if(!partitionToOffsetMap.isEmpty()) {
for(Map.Entry<String, MutablePair<Integer, Long>> entry : partitionToOffsetMap.entrySet()) {
MutablePair<Integer, Long> mp = entry.getValue();
partitionOffsetCounter.labels(entry.getKey(), Integer.toString(mp.left))
.set(mp.right);
}
}
}
public static void updateSinkRecordsCounter(String blockUUid, Long taskId, String topicName, String tableName,

View File

@ -16,7 +16,7 @@ public class BlockMetaData {
// Map of partitions to offsets.
@Getter
@Setter
HashMap<Integer, MutablePair<Long, Long>> partitionToOffsetMap;
HashMap<String, MutablePair<Integer, Long>> partitionToOffsetMap = new HashMap<>();
@Getter
@Setter
@ -72,19 +72,34 @@ public class BlockMetaData {
public void update(ClickHouseStruct record) {
int gtId = record.getGtid();
if(gtId != -1) {
if(gtId > this.transactionId) {
if (gtId != -1) {
if (gtId > this.transactionId) {
this.transactionId = gtId;
}
}
if(record.getPos() > binLogPosition) {
if (record.getPos() > binLogPosition) {
this.binLogPosition = record.getPos();
}
this.partition = record.getKafkaPartition();
long offset = record.getKafkaOffset();
this.topicName = record.getTopic();
if (partitionToOffsetMap.containsKey(this.topicName)) {
MutablePair<Integer, Long> mp = partitionToOffsetMap.get(this.topicName);
if (offset >= mp.right) {
// Update ap.
mp.right = offset;
mp.left = partition;
partitionToOffsetMap.put(topicName, mp);
}
} else {
MutablePair<Integer, Long> mp = new MutablePair<>();
mp.right = offset;
mp.left = partition;
partitionToOffsetMap.put(topicName, mp);
}
}
}