Added integration tests to compare count of seed data and sysbench tables.

This commit is contained in:
Kanthi Subramanian 2022-07-22 19:01:19 -04:00
parent 2839b960c2
commit 17604d1ab6
11 changed files with 113 additions and 36 deletions

View File

@ -97,9 +97,9 @@ cat <<EOF | curl --request POST --url "${CONNECTORS_MANAGEMENT_URL}" --header 'C
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"topic.creation.$alias.partitions": 3,
"topic.creation.$alias.partitions": 6,
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 3,
"topic.creation.default.partitions": 6,
"provide.transaction.metadata": "true"
}

View File

@ -62,7 +62,7 @@ cat <<EOF | curl --request POST --url "${CONNECTORS_MANAGEMENT_URL}" --header 'C
"value.converter.apicurio.registry.auto-register": "true",
"value.converter.apicurio.registry.find-latest": "true",
"topic.creation.$alias.partitions": 3,
"topic.creation.$alias.partitions": 6,
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 6,

View File

@ -1,5 +1,6 @@
#!/bin/bash
./stop-docker-compose.sh
# Altinity sink images are tagged daily with this tag yyyy-mm-dd(2022-07-19)
if [ -z $1 ]
@ -10,5 +11,4 @@ else
export SINK_VERSION=$1
fi
docker-compose up
docker-compose up --remove-orphans --force-recreate --renew-anon-volumes

View File

@ -2,3 +2,4 @@
docker-compose down --remove-orphans
docker volume rm $(docker volume ls -q)

View File

@ -16,7 +16,7 @@ CLICKHOUSE_DATABASE="test"
BUFFER_COUNT=10000
#SERVER5432.transaction
TOPICS="SERVER5432.test.employees_predated, SERVER5432.test.products, , SERVER5432.test.customers, SERVER5432.test.t1, SERVER5432.sbtest.sbtest1, SERVER5432.public.Employee"
TOPICS="SERVER5432.test.employees_predated, SERVER5432.test.products, , SERVER5432.test.customers, SERVER5432.sbtest.sbtest1, SERVER5432.public.Employee"
TOPICS_TABLE_MAP="SERVER5432.test.employees_predated:employees, SERVER5432.test.products:products"
#TOPICS="SERVER5432"

View File

@ -308019,6 +308019,7 @@ CREATE USER 'sbtest'@'%' IDENTIFIED BY 'passw0rd';
GRANT ALL PRIVILEGES ON sbtest.* TO 'sbtest'@'%';
use sbtest;
/**
CREATE TABLE `sbtest1` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`k` int(11) NOT NULL DEFAULT '0',
@ -308030,6 +308031,6 @@ CREATE TABLE `sbtest1` (
PARTITION p1 VALUES LESS THAN (499999),
PARTITION p2 VALUES LESS THAN MAXVALUE
);
SET global general_log = 1;
SET global log_output = 'table';
**/
SET global general_log = 1;
SET global log_output = 'table';

View File

@ -1,5 +1,6 @@
#!/bin/bash
set +x
SHORT=t:,h
LONG=test-name:,help
@ -8,8 +9,8 @@ OPTS=$(getopt -a -n run_sysbench_tests --options $SHORT --longoptions $LONG -- "
eval set -- "$OPTS"
help() {
echo "./run_sysbench_tests.sh <test_name>, test_name should be one of the following
bulk_insert, oltp_insert, oltp_update_index, oltp_update_index, oltp_update_non_index"
echo "./run_sysbench_tests.sh -t <test_name>, test_name should be one of the following
bulk_insert, oltp_insert, oltp_delete, oltp_update_index, oltp_update_non_index"
}
### Supported Sysbench tests
@ -76,6 +77,7 @@ do
exit 2
;;
--)
help
shift;
break
;;

View File

@ -213,30 +213,33 @@ public class ClickHouseBatchRunnable implements Runnable {
long diffInMs = currentTime - lastFlushTimeInMs;
long bufferFlushTimeout = this.config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIMEOUT);
// Step 2: Check if the buffer can be flushed
// One if the max buffer size is reached
// or if the Buffer flush timeout is reached.
if (diffInMs > bufferFlushTimeout) {
// Time to flush.
log.info(String.format("*** TIME EXCEEDED %s to FLUSH", bufferFlushTimeout));
writer.addToPreparedStatementBatch(queryToRecordsMap);
lastFlushTimeInMs = currentTime;
result = true;
} else {
long totalSize = 0;
for (Map.Entry<MutablePair<String, Map<String, Integer>>, List<ClickHouseStruct>>
mutablePairListEntry : queryToRecordsMap.entrySet()) {
totalSize += mutablePairListEntry.getValue().size();
}
long minRecordsToFlush = config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_MAX_RECORDS);
if (totalSize >= minRecordsToFlush) {
log.info("**** MAX RECORDS EXCEEDED to FLUSH:" + "Total Records: " + totalSize);
writer.addToPreparedStatementBatch(queryToRecordsMap);
lastFlushTimeInMs = currentTime;
result = true;
}
}
writer.addToPreparedStatementBatch(queryToRecordsMap);
result = true;
//
// // Step 2: Check if the buffer can be flushed
// // One if the max buffer size is reached
// // or if the Buffer flush timeout is reached.
// if (diffInMs > bufferFlushTimeout) {
// // Time to flush.
// log.info(String.format("*** TIME EXCEEDED %s to FLUSH", bufferFlushTimeout));
// writer.addToPreparedStatementBatch(queryToRecordsMap);
// lastFlushTimeInMs = currentTime;
// result = true;
// } else {
// long totalSize = 0;
// for (Map.Entry<MutablePair<String, Map<String, Integer>>, List<ClickHouseStruct>>
// mutablePairListEntry : queryToRecordsMap.entrySet()) {
// totalSize += mutablePairListEntry.getValue().size();
// }
// long minRecordsToFlush = config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_MAX_RECORDS);
//
// if (totalSize >= minRecordsToFlush) {
// log.info("**** MAX RECORDS EXCEEDED to FLUSH:" + "Total Records: " + totalSize);
// writer.addToPreparedStatementBatch(queryToRecordsMap);
// lastFlushTimeInMs = currentTime;
// result = true;
// }
// }
return result;
}

View File

@ -17,4 +17,7 @@ class ClickHouseConnection:
def execute_sql(self, query):
result = self.client.execute(query)
return result
return result
def close(self):
self.client.disconnect()

View File

@ -50,6 +50,7 @@ class MySqlConnection:
def execute_sql(self, sql, data=None):
result = None
if self.conn.is_connected():
self.cursor = self.conn.cursor()
@ -68,6 +69,8 @@ class MySqlConnection:
except Exception as e:
print("Error executing SQL", e)
return result
def get_connection(self) -> MySQLConnection:
return self.conn

View File

@ -0,0 +1,64 @@
import os
import time
import unittest
from datetime import date
from tests.clickhouse_connection import ClickHouseConnection
from tests.mysql_connection import MySqlConnection
from fake_data import FakeData
class MyTestCase(unittest.TestCase):
conn = None
@classmethod
def setUpClass(cls):
print("Setup class")
@classmethod
def tearDownClass(cls):
print("Teardown class")
def test_seed_data_count(self):
mysql_conn = MySqlConnection()
mysql_conn.create_connection()
clickhouse_conn = ClickHouseConnection('localhost', 'root', 'root', 'test')
clickhouse_conn.create_connection()
ch_employees_count = clickhouse_conn.execute_sql("select count(*) from employees")
mysql_employees_count = mysql_conn.execute_sql("select count(*) from employees_predated")
ch_customers_count = clickhouse_conn.execute_sql("select count(*) from customers")
mysql_customers_count = mysql_conn.execute_sql("select count(*) from customers")
ch_products_count = clickhouse_conn.execute_sql("select count(*) from products")
mysql_products_count = mysql_conn.execute_sql("select count(*) from products")
mysql_conn.close()
clickhouse_conn.close()
self.assertEqual(ch_employees_count[0], mysql_employees_count)
self.assertEqual(ch_customers_count[0], mysql_customers_count)
self.assertEqual(ch_products_count[0], mysql_products_count)
def test_sysbench_test_data_count(self):
os.environ['DB_NAME'] = 'sbtest'
mysql_conn = MySqlConnection()
mysql_conn.create_connection()
clickhouse_conn = ClickHouseConnection('localhost', 'root', 'root', 'test')
clickhouse_conn.create_connection()
clickhouse_conn.execute_sql('optimize table sbtest1 final')
ch_count = clickhouse_conn.execute_sql("select count(*) from sbtest1")
mysql_count = mysql_conn.execute_sql("select count(*) from sbtest1")
mysql_conn.close()
clickhouse_conn.close()
self.assertEqual(ch_count[0], mysql_count)