Added test case to test bulk insert and rollback.

This commit is contained in:
Kanthi Subramanian 2022-04-07 13:07:34 -04:00
parent 740071a842
commit ac5e468927
4 changed files with 112 additions and 3 deletions

View File

@ -2,7 +2,7 @@ use test;
CREATE TABLE employees
(
`emp_no` Int8,
`emp_no` Int32,
`birth_date` Date32,
`first_name` String,
`last_name` String,

View File

@ -115,8 +115,8 @@ public class ClickHouseSinkTask extends SinkTask {
ClickHouseConverter converter = new ClickHouseConverter();
for (SinkRecord record : records) {
//if (this.deduplicator.isNew(record))
if(true)
if (this.deduplicator.isNew(record))
//if(true)
{
Struct c = converter.convert(record);
if (c != null) {

View File

@ -119,6 +119,9 @@ public class DbWriter {
boolean isFieldTypeFloat = (type == Schema.FLOAT32_SCHEMA.type()) ||
(type == Schema.FLOAT64_SCHEMA.type());
// MySQL BigInt -> INT64
boolean isFieldTypeBigInt = (type == Schema.INT64_SCHEMA.type());
// Text columns
if (type == Schema.Type.STRING) {
ps.setString(index, (String) value);
@ -136,6 +139,8 @@ public class DbWriter {
ps.setFloat(index, (Float) value);
} else if (type == Schema.BOOLEAN_SCHEMA.type()) {
ps.setBoolean(index, (Boolean) value);
} else if(isFieldTypeBigInt) {
//ps.setIn
}
index++;
}
@ -149,6 +154,10 @@ public class DbWriter {
// but if any of the records were not processed successfully
// How to we rollback or what action needs to be taken.
int[] result = ps.executeBatch();
// ToDo: Clear is not an atomic operation.
// It might delete the records that are inserted by the ingestion process.
records.clear();
} catch (Exception e) {
log.warn("insert Batch exception" + e);
}

View File

@ -1,9 +1,23 @@
package com.altinity.clickhouse.sink.connector.db;
import com.clickhouse.jdbc.ClickHouseConnection;
import com.clickhouse.jdbc.ClickHouseDataSource;
import io.debezium.time.Date;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.sql.PreparedStatement;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class DbWriterTest {
DbWriter writer;
@ -40,4 +54,90 @@ public class DbWriterTest {
Assert.assertEquals(connectionUrl, "jdbc:clickhouse://remoteClickHouse:8123/employees");
}
//@Test
public void testBatchInsert() {
String hostName = "localhost";
Integer port = 8123;
String database = "test";
String userName = "root";
String password = "root";
Properties properties = new Properties();
properties.setProperty("client_name", "Test_1");
DbWriter dbWriter = new DbWriter(hostName, port, database, userName, password);
String url = dbWriter.getConnectionString(hostName, port, database);
String insertQueryTemplate = "insert into employees values(?,?,?,?,?,?)";
try {
ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties);
ClickHouseConnection conn = dataSource.getConnection(userName, password);
PreparedStatement ps = conn.prepareStatement(insertQueryTemplate);
ps.setInt(1, 1);
ps.setDate(2, new java.sql.Date(10000));
ps.setString(3, "John1");
ps.setString(4, "Doe1");
ps.setString(5, "M");
ps.setDate(6, new java.sql.Date(10000));
ps.addBatch();
ps.setInt(1, 2);
ps.setDate(2, new java.sql.Date(10000));
ps.setString(3, "John2");
ps.setString(4, "Doe2");
ps.setString(5, "M");
ps.setDate(6, new java.sql.Date(10000));
ps.addBatch();
ps.setInt(1, 3);
ps.setDate(2, new java.sql.Date(10000));
ps.setInt(3, 22);
ps.setString(4, "012-03-01 08:32:53,431 WARN [2-BAM::Default Agent::Agent:pool-8-thread-1] [JDBCExceptionReporter] SQL Error: 0, SQLState: 22001\n" +
"2012-03-01 08:32:53,431 ERROR [2-BAM::Default Agent::Agent:pool-8-thread-1] [JDBCExceptionReporter] Batch entry 0 insert into TEST_CASE (TEST_CLASS_ID, TEST_CASE_NAME, SUCCESSFUL_RUNS, FAILED_RUNS, AVG_DURATION, FIRST_BUILD_NUM, LAST_BUILD_NUM, TEST_CASE_ID) values ('11993104', A_lot_of_data_goes_here_as_TEST_CASE_NAME, '0', '0', '0', '-1', '-1', '11960535') was aborted. Call getNextException to see the cause.\n" +
"2012-03-01 08:32:53,556 WARN [2-BAM::Default Agent::Agent:pool-8-thread-1] [JDBCExceptionReporter] SQL Error: 0, SQLState: 22001\n" +
"2012-03-01 08:32:53,556 ERROR [2-BAM::Default Agent::Agent:pool-8-thread-1] [JDBCExceptionReporter] ERROR: value too long for type character varying(4000)\n" +
"2012-03-01 08:32:53,556 ERROR [2-BAM::Default Agent::Agent:pool-8-thread-1] [SessionImpl] Could not synchronize database state with session\n" +
"2012-03-01 08:32:53,556 INFO [2-BAM::Default Agent::Agent:pool-8-thread-1] [DefaultErrorHandler] Recording an error: Could not save the build results. Data could be in an inconsistent state. : TWTHREE-MAIN-JOB1 : Hibernate operation: Could not execute JDBC batch update; SQL []; Batch entry 0 insert into TEST_CASE (TEST_CLASS_ID, TEST_CASE_NAME, SUCCESSFUL_RUNS, FAILED_RUNS, AVG_DURATION, FIRST_BUILD_NUM, LAST_BUILD_NUM, TEST_CASE_ID) values ('11993104', A_lot_of_data_goes_here_as_TEST_CASE_NAME, '0', '0', '0', '-1', '-1', '11960535') was aborted. Call getNextException to see the cause.; nested exception is java.sql.BatchUpdateException: Batch entry 0 insert into TEST_CASE (TEST_CLASS_ID, TEST_CASE_NAME, SUCCESSFUL_RUNS, FAILED_RUNS, AVG_DURATION, FIRST_BUILD_NUM, LAST_BUILD_NUM, TEST_CASE_ID) values ('11993104', A_lot_of_data_goes_here_as_TEST_CASE_NAME, '0', '0', '0', '-1', '-1', '11960535') was aborted. Call getNextException to see the cause.\n" +
"2012-03-01 08:32:53,666 FATAL [2-BAM::Default Agent::Agent:pool-8-thread-1] [PlanStatePersisterImpl] Could not save the build results BuildResults: TWTHREE-MAIN-JOB1-659. Data could be in an inconsistent state.\n" +
"org.springframework.dao.DataIntegrityViolationException: Hibernate operation: Could not execute JDBC batch update; SQL []; Batch entry 0 insert into TEST_CASE (TEST_CLASS_ID, TEST_CASE_NAME, SUCCESSFUL_RUNS, FAILED_RUNS, AVG_DURATION, FIRST_BUILD_NUM, LAST_BUILD_NUM, TEST_CASE_ID) values ('11993104', A_lot_of_data_goes_here_as_TEST_CASE_NAME, '0', '0', '0', '-1', '-1', '11960535') was aborted. Call getNextException to see the cause.\n" +
"Caused by: java.sql.BatchUpdateException: Batch entry 0 insert into TEST_CASE (TEST_CLASS_ID, TEST_CASE_NAME, SUCCESSFUL_RUNS, FAILED_RUNS, AVG_DURATION, FIRST_BUILD_NUM, LAST_BUILD_NUM, TEST_CASE_ID) values ('11993104', A_lot_of_data_goes_here_as_TEST_CASE_NAME, '0', '0', '0', '-1', '-1', '11960535') was aborted. Call getNextException to see the cause.\n" +
"\tat org.postgresql.jdbc2.AbstractJdbc2Statement$BatchResultHandler.handleError(AbstractJdbc2Statement.java:2569)\n" +
"\tat org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1796)\n" +
"\tat org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:407)\n" +
"\tat org.postgresql.jdbc2.AbstractJdbc2Statement.executeBatch(AbstractJdbc2Statement.java:2708)\n" +
"\tat com.mchange.v2.c3p0.impl.NewProxyPreparedStatement.executeBatch(NewProxyPreparedStatement.java:1723)\n" +
"\tat net.sf.hibernate.impl.BatchingBatcher.doExecuteBatch(BatchingBatcher.java:54)\n" +
"\tat net.sf.hibernate.impl.BatcherImpl.executeBatch(BatcherImpl.java:128)\n" +
"\tat net.sf.hibernate.impl.BatcherImpl.prepareStatement(BatcherImpl.java:61)\n" +
"\tat net.sf.hibernate.impl.BatcherImpl.prepareStatement(BatcherImpl.java:58)\n" +
"\tat net.sf.hibernate.impl.BatcherImpl.prepareBatchStatement(BatcherImpl.java:111)\n" +
"\tat net.sf.hibernate.persister.EntityPersister.insert(EntityPersister.java:454)\n" +
"\tat net.sf.hibernate.persister.EntityPersister.insert(EntityPersister.java:436)\n" +
"\tat net.sf.hibernate.impl.ScheduledInsertion.execute(ScheduledInsertion.java:37)\n" +
"\tat net.sf.hibernate.impl.SessionImpl.execute(SessionImpl.java:2447)\n" +
"...");
ps.setString(5, "M");
ps.setDate(6, new java.sql.Date(10000));
ps.addBatch();
ps.setInt(1, 2);
ps.setDate(2, new java.sql.Date(10000));
ps.setString(3, "John4");
ps.setString(4, "Doe4");
ps.setString(5, "M");
ps.setDate(6, new java.sql.Date(10000));
ps.addBatch();
int[] result = ps.executeBatch();
for(int i = 0; i < result.length; i++) {
System.out.println("Index:" + i + " Result:" + result[i]);
}
} catch (Exception e) {
System.out.println("Exception" + e);
}
}
}