article.txt

最近终于完成了对9.2G大数据的导入工作,由于需要把SQL SERVER的导出文档导入MYSQL数据库,还涉及到对大文件的编码转换,用到了下面几个脚本。

这个脚本能检测文件的编码,但大文件慎用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import chardet


def detect_encoding(file_path):
with open(file_path, 'rb') as file:
rawdata = file.read()
result = chardet.detect(rawdata)
return result['encoding']


if __name__ == "__main__":
file_path = 'codeerror.sql'
encoding = detect_encoding(file_path)
print(f"检测到文件 {file_path} 的编码为: {encoding}")

这个脚本能将大文件分批次转换编码再写入新文件,避免内存的过大消耗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import os
import shutil


def check_disk_space(file_path, required_space_mb=100):
"""
检查磁盘空间是否充足,这里简单估算所需空间为源文件大小的1.5倍(可根据实际调整)
:param file_path: 源文件路径
:param required_space_mb: 预估需要的额外磁盘空间(MB),默认100MB
:return: True如果磁盘空间充足,False反之
"""
file_size = os.path.getsize(file_path)
# 预估转换后文件大小,这里简单按1.5倍估算(可根据实际情况优化)
estimated_size = file_size * 1.5
dir_path = os.path.dirname(file_path)
_, _, free_space = shutil.disk_usage(dir_path)
return free_space >= estimated_size + required_space_mb * 1024 * 1024


def convert_encoding(file_path):
"""
将指定的utf16编码的文件转换为utf8编码,按1000行读取处理
:param file_path: 源文件路径
"""
if not check_disk_space(file_path):
print("磁盘空间不足,无法进行文件编码转换,请清理磁盘空间后再试。")
return

base_name = os.path.basename(file_path)
file_name_without_ext, ext = os.path.splitext(base_name)
new_file_path = os.path.join(os.path.dirname(file_path), file_name_without_ext + "_converted" + ext)
with open(file_path, 'r', encoding='utf16') as source_file, open(new_file_path, 'w', encoding='utf8') as target_file:
lines_buffer = []
line_count = 0
for line in source_file:
lines_buffer.append(line)
line_count += 1
if line_count == 1000:
target_file.writelines(lines_buffer)
lines_buffer = []
line_count = 0
# 处理剩余不足1000行的部分
if lines_buffer:
target_file.writelines(lines_buffer)

print(f"文件编码转换成功,新文件路径为: {new_file_path}")


if __name__ == "__main__":
source_file_path = input("请输入要转换编码的源文件的路径: ")
convert_encoding(source_file_path)

接着用这个脚本把除了insert以外的语句删除,改变sql语句语法用于导入mysql数据库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import re
import os

# 定义一个函数来转换 SQL Server 语法为 MySQL 兼容语法
def convert_sql(sql_server_command):
# 替换 INSERT -> INSERT INTO
sql_server_command = re.sub(r'\bINSERT\b', 'INSERT INTO', sql_server_command)

# 替换 [dbo].[Big_1_shunfeng] -> Big_1_shunfeng
sql_server_command = re.sub(r'\[dbo\]\.\[([^\]]+)\]', r'\1', sql_server_command)

# 替换字段名:例如:[name] -> name
sql_server_command = re.sub(r'\[([^\]]+)\]', r'\1', sql_server_command)

# 替换 N'' -> ''(去除 N 字符)
sql_server_command = re.sub(r"N''", "''", sql_server_command)

# 替换 N'xxx' -> 'xxx'(去除 N 前缀,适应 MySQL)
sql_server_command = re.sub(r"N'([^']+)'", r"'\1'", sql_server_command)

# 为每行末尾加上分号(排除已经带有分号的行)
sql_server_command = re.sub(r'(?<!;)\s*$', ';', sql_server_command, flags=re.MULTILINE)

return sql_server_command

# 定义一个函数来处理整个 SQL 文件
def process_sql_file(input_file_path):
# 获取源文件名(不包含扩展名)
file_name, file_extension = os.path.splitext(os.path.basename(input_file_path))

# 新文件名
output_file_path = f"{file_name}_mysql{file_extension}"

with open(input_file_path, 'r', encoding='utf-8') as input_file, \
open(output_file_path, 'w', encoding='utf-8') as output_file:

# 逐行读取文件
for line in input_file:
# 去掉行两端的空白字符
line = line.strip()

# 仅处理以 INSERT 开头的行
if line.startswith('INSERT'):
# 转换 SQL 语法
converted_line = convert_sql(line)
# 写入新文件并加上换行符
output_file.write(converted_line + '\n')

print(f"转换完成,新文件已保存为: {output_file_path}")

# 调用函数,处理文件
input_file_path = 'shunfeng_converted.sql' # 替换为你的源文件路径
process_sql_file(input_file_path)

最后用这个脚本逐行读取,逐行写入,并把写入不成功的数据条目进行备份,实测效果挺好,效率也不错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import mysql.connector


def main():
# 建立数据库连接
cnx = mysql.connector.connect(
host="127.0.0.1",
port=" ",
user=" ",
password=" ",
database=" "
)
cursor = cnx.cursor()

# 逐行读取 SQL 文件并执行
with open('mysql.sql', 'r', encoding='utf-8') as file, open('error_lines.sql', 'w', encoding='utf-8') as error_file:
lines = []
line_count = 0
for line in file:
# 检查行是否以 INSERT 开头
if line.strip().upper().startswith("INSERT"):
lines.append(line)
if len(lines) >= 1000:
execute_sql_chunk(cursor, lines, error_file)
lines = []
line_count += 1000
print(f"已写入 {line_count} 行")
else:
error_file.write(line)
if lines:
execute_sql_chunk(cursor, lines, error_file)
print(f"已写入 {line_count + len(lines)} 行")

cnx.commit()
cursor.close()
cnx.close()


def execute_sql_chunk(cursor, lines, error_file):
error_lines = []
for line in lines:
try:
cursor.execute(line)
except mysql.connector.Error:
error_lines.append(line)
if error_lines:
error_file.writelines(error_lines)


if __name__ == "__main__":
main()

    Comments

    2025-01-17

    回到顶部