1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
|
__author__ = 'xiaowu'
import pymysql import argparse import sys
def get_connection(host, port, user, password, db=None): return pymysql.connect( host=host, port=port, user=user, password=password, database=db, charset='utf8mb4', autocommit=True )
def parse_table_mappings(table_args): mappings = [] if isinstance(table_args, str): table_args = table_args.strip().split() for t in table_args: if ':' in t: src, tgt = t.split(':', 1) else: src = tgt = t mappings.append((src, tgt)) return mappings
def copy_table(src_conn, tgt_conn, src_table, tgt_table, src_db, tgt_db): with src_conn.cursor() as src_cur, tgt_conn.cursor() as tgt_cur: src_cur.execute(f"SHOW CREATE TABLE `{src_db}`.`{src_table}`") create_stmt = src_cur.fetchone()[1].replace( f'CREATE TABLE `{src_table}`', f'CREATE TABLE IF NOT EXISTS `{tgt_db}`.`{tgt_table}`' ) tgt_cur.execute(f"DROP TABLE IF EXISTS `{tgt_db}`.`{tgt_table}`") tgt_cur.execute(create_stmt) src_cur.execute(f"SELECT * FROM `{src_db}`.`{src_table}`") rows = src_cur.fetchall() if rows: cols = [desc[0] for desc in src_cur.description] placeholders = ','.join(['%s'] * len(cols)) insert_sql = f"INSERT INTO `{tgt_db}`.`{tgt_table}` ({','.join(cols)}) VALUES ({placeholders})" tgt_cur.executemany(insert_sql, rows) print(f"Table `{src_table}` copied to `{tgt_table}` ({len(rows)} rows).")
def copy_database(src_conn, tgt_conn, src_db, tgt_db): with src_conn.cursor() as src_cur, tgt_conn.cursor() as tgt_cur: tgt_cur.execute(f"CREATE DATABASE IF NOT EXISTS `{tgt_db}`") src_cur.execute(f"SHOW TABLES FROM `{src_db}`") tables = [row[0] for row in src_cur.fetchall()] for table in tables: copy_table(src_conn, tgt_conn, table, table, src_db, tgt_db)
def main(): parser = argparse.ArgumentParser(description="MySQL Data Transfer Tool") parser.add_argument('--src-host', default="127.0.0.1") parser.add_argument('--src-port', type=int, default=3306) parser.add_argument('--src-user', default="root") parser.add_argument('--src-password', default="root") parser.add_argument('--src-db', default="test") parser.add_argument('--tgt-host', default="127.0.0.1") parser.add_argument('--tgt-port', type=int, default=3306) parser.add_argument('--tgt-user', default="root") parser.add_argument('--tgt-password', default="root") parser.add_argument('--tgt-db', default="test") parser.add_argument('--tables', nargs='*', help='Table mappings as source:target (default: all)', default="*") args = parser.parse_args() src_conn = get_connection(args.src_host, args.src_port, args.src_user, args.src_password, args.src_db) tgt_conn = get_connection(args.tgt_host, args.tgt_port, args.tgt_user, args.tgt_password, args.tgt_db) try: if args.tables: if args.tables == '*': copy_database(src_conn, tgt_conn, args.src_db, args.tgt_db) else: with tgt_conn.cursor() as cur: cur.execute(f"CREATE DATABASE IF NOT EXISTS `{args.tgt_db}`") for src_table, tgt_table in parse_table_mappings(args.tables): copy_table(src_conn, tgt_conn, src_table, tgt_table, args.src_db, args.tgt_db) else: copy_database(src_conn, tgt_conn, args.src_db, args.tgt_db) finally: src_conn.close() tgt_conn.close()
if __name__ == '__main__': main()
|