Merge pull request #28 from Altinity/json_datatype_support

Support for JSON data type.
This commit is contained in:
Kanthi 2022-08-02 12:33:46 -04:00 committed by GitHub
commit 5a7b659af2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 49 additions and 22 deletions

View File

@ -108,7 +108,7 @@ services:
sink:
container_name: sink
image: altinity/clickhouse-sink-connector:latest
image: altinity/clickhouse-sink-connector:${SINK_VERSION}
restart: "no"
ports:
- "18083:8083"
@ -167,12 +167,12 @@ services:
volumes:
- ../config/jmx-config.yml:/opt/jmx_exporter/config.yml
blackbox_exporter:
container_name: blackbox_exporter
image: prom/blackbox-exporter:master
restart: "no"
ports:
- "9115:9115"
# blackbox_exporter:
# container_name: blackbox_exporter
# image: prom/blackbox-exporter:master
# restart: "no"
# ports:
# - "9115:9115"
prometheus:
container_name: prometheus

View File

@ -16,15 +16,19 @@ CLICKHOUSE_DATABASE="test"
BUFFER_COUNT=10000
#SERVER5432.transaction
TOPICS="SERVER5432.test.employees_predated, SERVER5432.test.customers"
TOPICS_TABLE_MAP="SERVER5432.test.employees_predated:employees, SERVER5432.test.products:products"
if [[ $1 == "postgres" ]]; then
TOPICS="SERVER5432.public.Employee"
else
TOPICS="SERVER5432.test.employees_predated, SERVER5432.test.customers"
TOPICS_TABLE_MAP="SERVER5432.test.employees_predated:employees, SERVER5432.test.products:products"
fi
#TOPICS="SERVER5432"
#"topics.regex": "SERVER5432.sbtest.(.*), SERVER5432.test.(.*)",
#"topics": "${TOPICS}",
if [[ $1 == "apicurio" ]]; then
if [[ $2 == "apicurio" ]]; then
echo "APICURIO SCHEMA REGISTRY"
cat <<EOF | curl --request POST --url "${CONNECTORS_MANAGEMENT_URL}" --header 'Content-Type: application/json' --data @-
{
@ -32,6 +36,7 @@ if [[ $1 == "apicurio" ]]; then
"config": {
"connector.class": "com.altinity.clickhouse.sink.connector.ClickHouseSinkConnector",
"tasks.max": "10",
"topics": "${TOPICS}",
"clickhouse.topic2table.map": "${TOPICS_TABLE_MAP}",
"clickhouse.server.url": "${CLICKHOUSE_HOST}",
@ -66,7 +71,7 @@ if [[ $1 == "apicurio" ]]; then
"replacingmergetree.delete.column": "sign",
"auto.create.tables": true,
"auto.create.tables": false,
"schema.evolution": false,
"deduplication.policy": "off"
@ -110,7 +115,7 @@ else
"replacingmergetree.delete.column": "sign",
"auto.create.tables": true,
"auto.create.tables": false,
"schema.evolution": false,
"deduplication.policy": "off"

View File

@ -107,6 +107,8 @@ PRIMARY KEY intcol1;
--ENGINE = AggregatingMergeTree
--ORDER BY (_topic, _partition);
SET allow_experimental_object_type = 1;
-- Postgres tables --
CREATE TABLE Employee(
`EmployeeId` UInt64,
@ -123,6 +125,7 @@ CREATE TABLE Employee(
`PostalCode` String,
`Phone` String,
`Fax` String,
`Email` String)
`Email` String,
`json_data` String)
ENGINE = MergeTree
PRIMARY KEY EmployeeId;

View File

@ -64,6 +64,7 @@ CREATE TABLE "Employee"
"Email" VARCHAR(60),
CONSTRAINT "PK_Employee" PRIMARY KEY ("EmployeeId")
);
ALTER TABLE public."Employee" add column "json_data" JSON;
CREATE TABLE "Genre"
(
@ -4365,6 +4366,7 @@ INSERT INTO "Employee" ("EmployeeId", "LastName", "FirstName", "Title", "Reports
INSERT INTO "Employee" ("EmployeeId", "LastName", "FirstName", "Title", "ReportsTo", "BirthDate", "HireDate", "Address", "City", "State", "Country", "PostalCode", "Phone", "Fax", "Email") VALUES (6, N'Mitchell', N'Michael', N'IT Manager', 1, '1973/7/1', '2003/10/17', N'5827 Bowness Road NW', N'Calgary', N'AB', N'Canada', N'T3B 0C5', N'+1 (403) 246-9887', N'+1 (403) 246-9899', N'michael@chinookcorp.com');
INSERT INTO "Employee" ("EmployeeId", "LastName", "FirstName", "Title", "ReportsTo", "BirthDate", "HireDate", "Address", "City", "State", "Country", "PostalCode", "Phone", "Fax", "Email") VALUES (7, N'King', N'Robert', N'IT Staff', 6, '1970/5/29', '2004/1/2', N'590 Columbia Boulevard West', N'Lethbridge', N'AB', N'Canada', N'T1K 5N8', N'+1 (403) 456-9986', N'+1 (403) 456-8485', N'robert@chinookcorp.com');
INSERT INTO "Employee" ("EmployeeId", "LastName", "FirstName", "Title", "ReportsTo", "BirthDate", "HireDate", "Address", "City", "State", "Country", "PostalCode", "Phone", "Fax", "Email") VALUES (8, N'Callahan', N'Laura', N'IT Staff', 6, '1968/1/9', '2004/3/4', N'923 7 ST NW', N'Lethbridge', N'AB', N'Canada', N'T1H 1Y8', N'+1 (403) 467-3351', N'+1 (403) 467-8772', N'laura@chinookcorp.com');
INSERT INTO "Employee" ("EmployeeId", "LastName", "FirstName", "Title", "ReportsTo", "BirthDate", "HireDate", "Address", "City", "State", "Country", "PostalCode", "Phone", "Fax", "Email", "json_data") VALUES (9, N'Callahan', N'Laura', N'IT Staff', 6, '1968/1/9', '2004/3/4', N'923 7 ST NW', N'Lethbridge', N'AB', N'Canada', N'T1H 1Y8', N'+1 (403) 467-3351', N'+1 (403) 467-8772', N'laura@chinookcorp.com', '{"key":"value"}');
INSERT INTO "Customer" ("CustomerId", "FirstName", "LastName", "Company", "Address", "City", "State", "Country", "PostalCode", "Phone", "Fax", "Email", "SupportRepId") VALUES (1, N'Lu<EFBFBD>s', N'Gon<EFBFBD>alves', N'Embraer - Empresa Brasileira de Aeron<6F>utica S.A.', N'Av. Brigadeiro Faria Lima, 2170', N'S<EFBFBD>o Jos<6F> dos Campos', N'SP', N'Brazil', N'12227-000', N'+55 (12) 3923-5555', N'+55 (12) 3923-5566', N'luisg@embraer.com.br', 3);
INSERT INTO "Customer" ("CustomerId", "FirstName", "LastName", "Address", "City", "Country", "PostalCode", "Phone", "Email", "SupportRepId") VALUES (2, N'Leonie', N'K<EFBFBD>hler', N'Theodor-Heuss-Stra<72>e 34', N'Stuttgart', N'Germany', N'70174', N'+49 0711 2842222', N'leonekohler@surfeu.de', 5);

View File

@ -423,7 +423,7 @@
<groupId>com.clickhouse</groupId>
<!-- or clickhouse-grpc-client if you prefer gRPC -->
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2-patch8</version>
<version>0.3.2-patch11</version>
<!-- below is only needed when all you want is a shaded jar -->
<classifier>http</classifier>
<exclusions>

View File

@ -2,6 +2,7 @@ package com.altinity.clickhouse.sink.connector.converters;
import com.clickhouse.client.ClickHouseDataType;
import io.debezium.data.Enum;
import io.debezium.data.Json;
import io.debezium.time.*;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.kafka.connect.data.Decimal;
@ -61,6 +62,8 @@ public class ClickHouseDataTypeMapper {
dataTypesMap.put(new MutablePair<>(Schema.Type.STRING, ZonedTimestamp.SCHEMA_NAME), ClickHouseDataType.String);
dataTypesMap.put(new MutablePair<>(Schema.Type.STRING, Enum.LOGICAL_NAME), ClickHouseDataType.String);
dataTypesMap.put(new MutablePair<>(Schema.Type.STRING, Json.LOGICAL_NAME), ClickHouseDataType.JSON);
}
public static ClickHouseDataType getClickHouseDataType(Schema.Type kafkaConnectType, String schemaName) {

View File

@ -15,6 +15,7 @@ import com.altinity.clickhouse.sink.connector.model.KafkaMetaData;
import com.clickhouse.client.ClickHouseCredentials;
import com.clickhouse.client.ClickHouseNode;
import com.google.common.io.BaseEncoding;
import io.debezium.data.Json;
import io.debezium.time.Date;
import io.debezium.time.*;
import org.apache.commons.lang3.tuple.MutablePair;
@ -381,7 +382,11 @@ public class DbWriter extends BaseDbWriter {
List<ClickHouseStruct> recordsList = entry.getValue();
for (ClickHouseStruct record : recordsList) {
bmd.update(record);
try {
bmd.update(record);
} catch(Exception e) {
log.error("**** ERROR: updating Prometheus", e);
}
//List<Field> fields = record.getStruct().schema().fields();
//ToDO:
@ -569,7 +574,10 @@ public class DbWriter extends BaseDbWriter {
// MySQL(Timestamp) -> String, name(ZonedTimestamp) -> Clickhouse(DateTime)
ps.setString(index, DebeziumConverter.ZonedTimestampConverter.convert(value));
} else {
} else if(schemaName != null && schemaName.equalsIgnoreCase(Json.LOGICAL_NAME)) {
// if the column is JSON, it should be written, String otherwise
ps.setObject(index, value);
}else {
ps.setString(index, (String) value);
}
} else if (isFieldTypeInt) {
@ -617,7 +625,8 @@ public class DbWriter extends BaseDbWriter {
ps.setString(index, BaseEncoding.base16().lowerCase().encode(((ByteBuffer) value).array()));
}
} else {
}
else {
log.error("Data Type not supported: {}", colName);
}

View File

@ -77,14 +77,19 @@ public class BlockMetaData {
this.transactionId = gtId;
}
}
if (record.getPos() > binLogPosition) {
if (record.getPos() != null && record.getPos() > binLogPosition) {
this.binLogPosition = record.getPos();
}
this.partition = record.getKafkaPartition();
if(record.getKafkaPartition() != null) {
this.partition = record.getKafkaPartition();
}
long offset = record.getKafkaOffset();
this.topicName = record.getTopic();
if(record.getTopic() != null) {
this.topicName = record.getTopic();
}
if (partitionToOffsetMap.containsKey(this.topicName)) {
MutablePair<Integer, Long> mp = partitionToOffsetMap.get(this.topicName);

View File

@ -57,11 +57,11 @@ public class ClickHouseStruct {
@Getter
@Setter
private String file;
private String file = "";
@Getter
@Setter
private Long pos;
private Long pos = 0L;
@Getter
@Setter