主程序:** main_converter.py **
import re
import json
import argparsedef load_config(config_path: str) -> dict:'''配置管理模块:加载JSON格式配置文件'''with open(config_path, mode="r", encoding="utf-8") as f: return json.load(f)class BaseConverter:'''转换规则接口及实现'''def apply(self, field: str) -> str: raise NotlmplementedError("子类必须实现apply方法")class TypeMappingConverter(BaseConverter):'''数据类型转换规则'''def __init__(self, mapping: dict): self.mapping = mappingdef apply(self, field: str) -> str:for mysql_type, ch_type in self.mapping.items(): if mysql_type.upper() == "DECIMAL":field = re.sub(r'\bDECIMAL\((\d+\s*,\s*\d+)\)\b',lambda m: f"{ch_type}({m.group(1)})", field,flags=re.IGNORECASE)else:pattern = r'\b' + mysql_type + r'(\(\d+\))?\b'field = re.sub(pattern, ch_type, field, flags=re.IGNORECASE) return field
class DefaultValueConverter(BaseConverter):def __init__(self, replacements: dict):self.replacements = replacementsdef apply(self, field: str) -> str:for key, val in self.replacements.items():field = re.sub(r'\bDEFAULT\s+' + key + r'\b','DEFAULT' + val, field,flags=re.IGNORECASE)return field
class IgnoreKeywordConverter(BaseConverter):def __init__(self, keywords: list): self.keywords = keywordsdef apply(self, field: str) -> str:for keyword in self.keywords:field = re.sub(keyword, ", field, flags=re.IGNORECASE) return field
class CommentConverter(BaseConverter):def __init__(self, keep: bool =True):self.keep = keepdef apply(self, field: str) -> str:if self.keep: return field else:field = re.sub(r'\bCOMMENT\s+\'[^\']*\", ", field, flags=re.IGNORECASE)return field
class RuleRegistry: def __init__(self): self.rules = []def register(self, rule: BaseConverter):self.rules.append(rule)def process(self,field: str) -> str:for rule in self.rules:field = rule.apply(field)return re.sub(r'\s+','', field).strip()
class MySQLSyntaxCleaner(BaseConverter):def apply(self, field: str) -> str:field = re.sub(r'CHARACTER SET\s+\w+', '', field, flags=re.IGNORECASE)
field = re.sub(r'COLLATE\s+\w+', ", field, flags=re.IGNORECASE)field = re.sub(r'(String|VARCHAR|CHAR)\s*\(\d+\)', 'String', field, flags=re.IGNORECASE)field = re.sub(r'\btext\b', 'String', field, flags=re.IGNORECASE)field = re.sub(r'\bNOT NULL\b', '', field, flags=re.IGNORECASE)field = re.sub(r'\bNULL\b', '', field, flags=re.IGNORECASE)field = re.sub(r'\bDEFAULT\s+[^,]+', '', field, flags=re.IGNORECASE)field = re.sub(r'\bAUTO_INCREMENT\b', '', field, flags=re.IGNORECASE)field = re.sub(r'\s+', '', field).strip()return fielddef split_fields(fields_part: str) -> list: fields = [] buf = ''depth = 0for c in fields_part:if c == '(': depth += 1 elif c == ')': depth -= 1if c == ',' and depth ==0:if buf.strip():fields.append(buf.strip()) buf = ''else:buf += c if buf.strip():fields.append(buf.strip()) return fieldsdef parse_mysql_create_table(sql: str) -> (str, list, str): sql = re.sub(r'\s+', '', sql.strip()) table_match = re.search(r'CREATE\s+TABLE\s+`?(\w+)`?\s*\(', sql, re.IGNORECASE)if not table_match:raise ValueError("无法解析表名")table_name = table_match.group(1)fields_part_match = re.search(r'\((.*)\)\s*ENGINE=', sql, re.IGNORECASE)if not fields_part_match:raise ValueError("无法解析字段部分")fields_part = fields_part_match.group(1)fields = split_fields(fields_part)engine_match = re.search(r'ENGINE\s*=\s*(\w+)', sql, re.IGNORECASE) engine = engine_match.group(1) if engine_match else ""return table_name, fields, enginedef generate_clickhouse_create_table(mysql_sql: str, config: dict) -> str:table_name, fields, _ = parse_mysql_create_table(mysql_sql) registry = RuleRegistry()registry.register(TypeMappingConverter(config.get("type_mapping", {})))registry.register(DefaultValueConverter(config.get("default_value_replacements", {})))registry.register(IgnoreKeywordConverter(config.get("ignore_keywords", [])))registry.register(CommentConverter(keep=False))registry.register(MySQLSyntaxCleaner()) new_fields = []partition_field = Nonefor field in fields:if re.match(r'^(KEY|CONSTRAINT|FOREIGN\s+KEY|PRIMARY\s+KEY|FULLTE XT)', field, re.IGNORECASE):continueconv_field = registry.process(field)if conv_field == "" or conv_field == ",": continueconv_field = conv_field.rstrip(',')new_fields.append(conv_field)if (not partition_field) and re.search(config.get("partition_rule",
{}).get("match", "DateTime|date|create"), conv_field, re.IGNORECASE):fld_match = re.match(r'` ?(\w+)`?\s+', conv_field) if fld_match:partition_field = fld_match.group(1)if not partition_field:partition_field = config.get("order_rule", {}).get("default_field", new_fields[0].split()[0])ch_table_name = table_name + "_local"new_fields_str = ",\n ".join(new_fields)partition_expr = config.get("partition_rule", {}).get("expression", "{field}").format(field=partition_field) order_by = partition_fieldch_sql = f"""CREATE TABLE{ch_table_name}({new_fields_str}
) ENGINE = MergeTree()
PARTITION BY {partition_expr}
ORDER BY ({order_by});
"""return ch_sql
def main():parser = argparse.ArgumentParser(description="将MySQL 建表语句转换为 ClickHouse建表语句")parser.add_argument("--config", type=str, default="rules.json", help="转换规则配置文件路径")parser.add_argument("--input", type=str, required=True, help="包含 MySQL 建表语句的 SQL 文件路径")parser.add_argument("--output", type=str, default="clickhouse.sql", help="输出 ClickHouse 建表语句的文件路径")args = parser.parse_args()config = load_config(args.config)with open(args.input, mode="r", encoding="utf-8") as f:mysql_sql = f.read()config = load_config("rules.json")ch_sql = generate_clickhouse_create_table(mysql_sql, config)with open(args.output, mode="w", encoding="utf-8") as f:f.write(ch_sql)print("ClickHouse 建表语句已生成,文件位置为:",args.output)if __name__ == '__main__': main()
测试用例:** test converter.py **
import unittest
from main_converter import generate_clickhouse_create_tableclass TestMySQLToClickHouseConversion(unittest.TestCase):def setUp(self): self.config = {"type_mapping":{ "INT": "UInt32", "BIGINT": "UInt64", "VARCHAR": "String", "CHAR": "String", "DATETIME": "DateTime", "DATE": "Date", "DECIMAL": "Decimal"}, "default_value_replacements":{"CURRENT_TIMESTAMP": "now()"}, "ignore_keywords":
["AUTO_INCREMENT", "PRIMARY KEY", "KEY", "CONSTRAINT", "FOREIGN KEY", "CHECK"], "partition_rule": {"match": "DateTime", "expression": "toYYYYMM({field})"}, "order_rule": { "default_field": "order_date" }}self.mysql_sql = """CREATE TABLE orders ( order_id INT
AUTO_INCREMENT PRIMARY KEY, user_id INT NOT NULL, region VARCHAR(50) COMMENT '区域', order_date DATETIME DEFAULT
CURRENT_TIMESTAMP, amount DECIMAL(10,2), KEY idx_user (user_id), CONSTRAINT fk_user FOREIGN KEY (user_id) REFERENCES users(id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;"""def test_conversion(self):ch_sql = generate_clickhouse_create_table(self.mysql_sql, self.config)self.assertIn("ENGINE = MergeTree()", ch_sql)self.assertIn("PARTITION BY toYYYYMM(order_date)", ch_sql) self.assertIn("ORDER BY (order_date)", ch_sql)self.assertIn("order_id UInt32", ch_sql)self.assertIn("DEFAULT now()", ch_sql)self.assertNotIn("AUTO_INCREMENT", ch_sql)self.assertNotIn("PRIMARY KEY", ch_sql)unittest.main()
配置文件:** rules.json **
{"type_mapping": {"INT": "UInt32","BIGINT": "UInt64","VARCHAR": "String","CHAR": "String","DATETIME": "DateTime","DATE": "Date","DECIMAL": "Decimal"},"default_value_replacements": {"CURRENT_TIMESTAMP": "now()"},"ignore_keywords": ["AUTO_INCREMENT","PRIMARY KEY","KEY","CONSTRAINT","FOREIGN KEY","CHECK"],"partition_rule":{"match": "DateTime","expression": "toYYYYMM({field})"},"order_rule": {"default_field": "order_date"}
}