mirror of
https://github.com/valitydev/clickhouse-sink-connector.git
synced 2024-11-06 10:35:21 +00:00
Fix the conflicts
This commit is contained in:
commit
bf86412141
@ -20,7 +20,6 @@ import datetime
|
||||
import zoneinfo
|
||||
from db_load.mysql_parser.mysql_parser import convert_to_clickhouse_table_antlr
|
||||
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
sys.path.append(os.path.dirname(SCRIPT_DIR))
|
||||
|
||||
@ -71,6 +70,7 @@ def clickhouse_connection(host, database='default', user='default', port=9000, p
|
||||
port=port,
|
||||
database=database,
|
||||
connect_timeout=20,
|
||||
secure=False
|
||||
)
|
||||
return conn
|
||||
|
||||
@ -154,7 +154,13 @@ def find_partitioning_options(source):
|
||||
return partitioning_options
|
||||
|
||||
|
||||
def convert_to_clickhouse_table_regexp(user_name, table_name, source):
|
||||
def convert_to_clickhouse_table(user_name, table_name, source, rmt_delete_support):
|
||||
|
||||
# do we have a table in the source
|
||||
|
||||
if not find_create_table(source) :
|
||||
return ('', [])
|
||||
|
||||
primary_key = find_primary_key(source)
|
||||
if primary_key is None:
|
||||
logging.warning("No PK found for "+table_name +
|
||||
@ -177,9 +183,9 @@ def convert_to_clickhouse_table_regexp(user_name, table_name, source):
|
||||
src = re.sub(r'\sjson\s', ' String ', src)
|
||||
# Date32 may be a better alternative as Date range are close to MySQL
|
||||
src = re.sub(r'\sdate\s', ' Date32 ', src)
|
||||
src = re.sub(r'\sdatetime\s', ' DateTime64(0) ', src)
|
||||
src = re.sub(r'\sdatetime\s', ' DateTime64(3) ', src)
|
||||
src = re.sub(r'\sdatetime(.*?)\s', ' DateTime64\\1 ', src)
|
||||
src = re.sub(r'\stimestamp\s', ' DateTime64(0) ', src)
|
||||
src = re.sub(r'\stimestamp\s', ' DateTime64(3) ', src)
|
||||
src = re.sub(r'\stimestamp(.*?)\s', ' DateTime64\\1 ', src)
|
||||
src = re.sub(r'\spoint\s', ' Point ', src)
|
||||
#src = re.sub(r'\sdouble\s', ' Decimal(38,10) ', src)
|
||||
@ -189,8 +195,8 @@ def convert_to_clickhouse_table_regexp(user_name, table_name, source):
|
||||
src = re.sub(r'\sCOLLATE\s(.*?)([\s,])', ' \\2', src, )
|
||||
src = re.sub(r'\sCHARACTER\sSET\s(.*?)([\s,])', ' \\2', src)
|
||||
# it is a challenge to convert MySQL expression in generated columns
|
||||
src = re.sub(r'GENERATED ALWAYS AS \(.*\)\s', ' ', src)
|
||||
src = re.sub(r'\bVIRTUAL\b', ' ', src)
|
||||
src = re.sub(r'.*GENERATED ALWAYS AS.*',' ',src)
|
||||
src = re.sub(r'\bVIRTUAL\b',' ', src)
|
||||
# ClickHouse does not support constraints, indices, primary and unique keys
|
||||
src = re.sub(r'.*\bCONSTRAINT\b.*', '', src)
|
||||
src = re.sub(r'.*\bPRIMARY KEY\b.*\(.*', '', src)
|
||||
@ -201,9 +207,18 @@ def convert_to_clickhouse_table_regexp(user_name, table_name, source):
|
||||
src = re.sub(r'.*\bforeign\b.*', '', src)
|
||||
src = re.sub(r'', '', src)
|
||||
# adding virtual columns ver and sign
|
||||
virtual_columns = "`_sign` Int8 DEFAULT 1,\n `_version` UInt64 DEFAULT 0\n"
|
||||
if rmt_delete_support:
|
||||
virtual_columns = "`is_deleted` UInt8 DEFAULT 0,\n `_version` UInt64 DEFAULT 0\n"
|
||||
|
||||
src = re.sub(r'\) ENGINE',
|
||||
' `_sign` Int8 DEFAULT 1,\n `_version` UInt64 DEFAULT 0\n) ENGINE', src)
|
||||
src = re.sub(r'ENGINE=InnoDB[^;]*', 'ENGINE = ReplacingMergeTree(_version) ' +
|
||||
' '+virtual_columns+') ENGINE', src)
|
||||
|
||||
rmt_engine = "ENGINE = ReplacingMergeTree(_version) "
|
||||
if rmt_delete_support:
|
||||
rmt_engine = "ENGINE = ReplacingMergeTree(_version, is_deleted) "
|
||||
|
||||
src = re.sub(r'ENGINE=InnoDB[^;]*', rmt_engine +
|
||||
partitioning_options + ' ORDER BY ('+primary_key+') SETTINGS '+settings, src)
|
||||
|
||||
lines = src.splitlines()
|
||||
@ -222,11 +237,12 @@ def convert_to_clickhouse_table_regexp(user_name, table_name, source):
|
||||
match = re.match(columns_pattern, altered_line)
|
||||
if match:
|
||||
column_name = match.group(1)
|
||||
datatype = match.group(2)
|
||||
datatype = match.group(2)
|
||||
nullable = False if "NOT NULL" in line else True
|
||||
logging.info(f"{column_name} {datatype}")
|
||||
columns.append({'column_name': column_name, 'datatype': datatype})
|
||||
|
||||
# tables with no PK miss commas
|
||||
columns.append({'column_name':column_name,'datatype':datatype,'nullable':nullable})
|
||||
|
||||
# tables with no PK miss commas
|
||||
if altered_line.strip() != "" and not altered_line.endswith(',') and not altered_line.endswith(';'):
|
||||
altered_line += ","
|
||||
|
||||
@ -318,8 +334,7 @@ def load_schema(args, dry_run=False):
|
||||
with gzip.open(file, "r") as schema_file:
|
||||
source = schema_file.read().decode('UTF-8')
|
||||
logging.info(source)
|
||||
(table_source, columns) = convert_to_clickhouse_table(
|
||||
db, table, source)
|
||||
(table_source, columns) = convert_to_clickhouse_table(db, table, source, args.rmt_delete_support)
|
||||
logging.info(table_source)
|
||||
timezone = find_dump_timezone(source)
|
||||
logging.info(f"Timezone {timezone}")
|
||||
@ -365,8 +380,7 @@ def load_schema_mysqlshell(args, dry_run=False):
|
||||
with open(file, "r") as schema_file:
|
||||
source = schema_file.read()
|
||||
logging.info(source)
|
||||
(table_source, columns) = convert_to_clickhouse_table(
|
||||
db, table, source)
|
||||
(table_source, columns) = convert_to_clickhouse_table(db, table, source, args.rmt_delete_support)
|
||||
logging.info(table_source)
|
||||
# timezone = find_dump_timezone(source)
|
||||
logging.info(f"Timezone {timezone}")
|
||||
@ -415,7 +429,7 @@ def load_data(args, timezone, schema_map, dry_run=False):
|
||||
|
||||
clickhouse_host = args.clickhouse_host
|
||||
ch_schema = args.clickhouse_database
|
||||
|
||||
password= args.clickhouse_password
|
||||
schema_file = args.dump_dir + '/*-schema.sql.gz'
|
||||
for files in glob.glob(schema_file):
|
||||
(schema, table_name) = parse_schema_path(files)
|
||||
@ -428,9 +442,8 @@ def load_data(args, timezone, schema_map, dry_run=False):
|
||||
schema_map, schema, table_name, args.virtual_columns, transform=True)
|
||||
for data_file in data_files:
|
||||
# double quote escape logic https://github.com/ClickHouse/ClickHouse/issues/10624
|
||||
structure = columns.replace(
|
||||
",", " Nullable(String),")+" Nullable(String)"
|
||||
cmd = f"""export TZ={timezone}; gunzip --stdout {data_file} | sed -e 's/\\\\"/""/g' | sed -e "s/\\\\\\'/'/g" | clickhouse-client --use_client_time_zone 1 -h {clickhouse_host} --query="INSERT INTO {ch_schema}.{table_name}({columns}) SELECT {transformed_columns} FROM input('{structure}') FORMAT CSV" -u{args.clickhouse_user} --password {args.clickhouse_password} -mn """
|
||||
structure = columns.replace(","," Nullable(String),")+" Nullable(String)"
|
||||
cmd = f"""export TZ={timezone}; gunzip --stdout {data_file} | sed -e 's/\\\\"/""/g' | sed -e "s/\\\\\\'/'/g" | clickhouse-client --use_client_time_zone 1 -h {clickhouse_host} --query="INSERT INTO {ch_schema}.{table_name}({columns}) SELECT {transformed_columns} FROM input('{structure}') FORMAT CSV" -u{args.clickhouse_user} --password '{password}' -mn """
|
||||
logging.info(cmd)
|
||||
(rc, result) = run_quick_command(cmd)
|
||||
logging.debug(result)
|
||||
@ -448,7 +461,7 @@ def load_data_mysqlshell(args, timezone, schema_map, dry_run=False):
|
||||
ch_schema = args.clickhouse_database
|
||||
|
||||
schema_files = args.dump_dir + f"/{args.mysql_source_database}@*.sql"
|
||||
|
||||
password = args.clickhouse_password
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
|
||||
futures = []
|
||||
for file in glob.glob(schema_files):
|
||||
@ -466,9 +479,22 @@ def load_data_mysqlshell(args, timezone, schema_map, dry_run=False):
|
||||
schema_map, schema, table_name, args.virtual_columns, transform=True, mysqlshell=args.mysqlshell)
|
||||
for data_file in data_files:
|
||||
# double quote escape logic https://github.com/ClickHouse/ClickHouse/issues/10624
|
||||
structure = columns.replace(
|
||||
",", " Nullable(String),")+" Nullable(String)"
|
||||
cmd = f"""export TZ={timezone}; zstd -d --stdout {data_file} | clickhouse-client --use_client_time_zone 1 --throw_if_no_data_to_insert=0 -h {clickhouse_host} --query="INSERT INTO {ch_schema}.{table_name}({columns}) SELECT {transformed_columns} FROM input('{structure}') FORMAT TSV" -u{args.clickhouse_user} --password {args.clickhouse_password} -mn """
|
||||
column_metadata_list = schema_map[schema+"."+table_name]
|
||||
structure = ""
|
||||
for column in column_metadata_list:
|
||||
logging.info(str(column))
|
||||
if column['column_name'] in args.virtual_columns:
|
||||
continue
|
||||
column_name = column['column_name'].replace('`','')
|
||||
if structure != "":
|
||||
structure += ", "
|
||||
structure +=" "+column_name + " "
|
||||
if column['nullable'] == True:
|
||||
structure +=" Nullable(String)"
|
||||
else:
|
||||
structure +=" String"
|
||||
|
||||
cmd = f"""export TZ={timezone}; zstd -d --stdout {data_file} | clickhouse-client --use_client_time_zone 1 --throw_if_no_data_to_insert=0 -h {clickhouse_host} --query="INSERT INTO {ch_schema}.{table_name}({columns}) SELECT {transformed_columns} FROM input('{structure}') FORMAT TSV" -u{args.clickhouse_user} --password '{password}' -mn """
|
||||
futures.append(executor.submit(execute_load, cmd))
|
||||
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
@ -520,11 +546,12 @@ def main():
|
||||
action='store_true', default=False)
|
||||
parser.add_argument('--dry_run', dest='dry_run',
|
||||
action='store_true', default=False)
|
||||
parser.add_argument('--virtual_columns', help='virtual_columns',
|
||||
nargs='+', default=['`_sign`', '`_version`'])
|
||||
parser.add_argument('--virtual_columns', help='virtual_columns', nargs='+', default=['`_sign`', '`_version`', '`is_deleted`'])
|
||||
parser.add_argument('--mysqlshell', help='using a util.dumpSchemas', dest='mysqlshell',
|
||||
action='store_true', default=False)
|
||||
|
||||
parser.add_argument('--rmt_delete_support', help='Use RMT deletes', dest='rmt_delete_support',
|
||||
action='store_true', default=False)
|
||||
|
||||
args = parser.parse_args()
|
||||
schema = not args.data_only
|
||||
data = not args.schema_only
|
||||
|
Loading…
Reference in New Issue
Block a user