当前位置: 首页 > news >正文

板凳-------Mysql cookbook学习 (十二--------5)

11.4 导入和导出NULL值 404

-- 创建测试数据库
CREATE DATABASE IF NOT EXISTS test_null_db;
USE test_null_db;-- 创建测试表
CREATE TABLE test_data (id INT AUTO_INCREMENT PRIMARY KEY,name VARCHAR(50),description VARCHAR(100),is_active TINYINT(1),created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);-- 插入测试数据(包含NULL和空字符串)
INSERT INTO test_data (name, description, is_active) VALUES
('Alice', 'Regular user', 1),
('Bob', NULL, 0),  -- 显式NULL
('Charlie', '', 1),  -- 空字符串
('David', 'VIP member', NULL);  -- NULL值使用标准 mysqldump 
bash
复制
下载
# 标准 mysqldump 命令(不指定 NULL 表示方式)
mysqldump -u root -p test_null_db test_data > test_data_dump.sqlmysql> SHOW VARIABLES LIKE 'secure_file_priv';
+------------------+---------------------------------+
| Variable_name    | Value                           |
+------------------+---------------------------------+
| secure_file_priv | D:\software\MySql\Data\Uploads\ |
+------------------+---------------------------------+
1 row in set, 1 warning (0.00 sec)mysql> SELECT * FROM test_data-> INTO OUTFILE 'D:/software/MySql/Data/Uploads/test_data_export.csv'-> FIELDS TERMINATED BY ','-> ENCLOSED BY '"'-> LINES TERMINATED BY '\r\n';
Query OK, 4 rows affected (0.01 sec)处理 NULL 值的改进版本
sql
SELECT id,name,IFNULL(description, '\\N') AS description,IFNULL(is_active, '\\N') AS is_active,created_at
FROM test_data
INTO OUTFILE 'D:/software/MySql/Data/Uploads/test_data_export.csv'
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\r\n';先运行以下命令检查:sql
SHOW VARIABLES LIKE 'secure_file_priv';
最佳实践建议
对于 Windows 系统,建议:sql
LOAD DATA LOCAL INFILE 'C:\\Users\\lenovo\\test_data_import.csv'
INTO TABLE test_data_import
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\r\n'
(id, name, @description, @is_active, created_at)
SETdescription = NULLIF(@description, ''),is_active = NULLIF(@is_active, '');
对于 NULL 和空字符串的区别处理:sql
SETdescription = CASE WHEN @description = '\\N' THEN NULL ELSE @description END,is_active = CASE WHEN @is_active = '\\N' THEN NULL ELSE @is_active END;
请根据您实际的 MySQL 版本和需求选择合适的解决方案

11.5 编写数据导出程序 406

mysql> use cookbook;
Database changed
mysql> SELECT * FROM mail  INTO OUTFILE 'D:/software/MySql/Data/Uploads/mail.csv' ;
Query OK, 16 rows affected (0.02 sec)LOAD DATA INFILE 'D:/software/MySql/Data/Uploads/test_data_import.csv'
INTO TABLE test_data_import
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
ESCAPED BY '"'
LINES TERMINATED BY '\r\n'
-- IGNORE 1 LINES  -- 如果CSV有表头行,请取消注释此行
(id, name, description, is_active, create_time)  -- 替换为你的实际列名,顺序必须与CSV一致
SETdescription = NULLIF(description, '\\N'),
is_active = NULLIF(is_active, '\\N');

11.6 数据文件格式的转换 411

import csv
from pathlib import Pathclass DataConverter:# (省略类中的方法,与之前相同,确保完整复制)def __init__(self, delimiter=',', quotechar='"'):self.delimiter = delimiterself.quotechar = quotechardef txt_to_csv(self, txt_path, csv_path, has_headers=True):with open(txt_path, 'r', encoding='utf-8') as txtfile, \open(csv_path, 'w', encoding='utf-8', newline='') as csvfile:writer = csv.writer(csvfile, delimiter=self.delimiter, quotechar=self.quotechar)lines = [line.strip() for line in txtfile if line.strip()]if has_headers:headers = lines[0].split(self.delimiter)writer.writerow(headers)data_lines = lines[1:]else:data_lines = linesfor line in data_lines:writer.writerow(line.split(self.delimiter))print(f"已将 {txt_path} 转换为 {csv_path}")def csv_to_pl(self, csv_path, pl_path, include_headers=True):with open(csv_path, 'r', encoding='utf-8') as csvfile, \open(pl_path, 'w', encoding='utf-8') as plfile:reader = csv.reader(csvfile, delimiter=self.delimiter, quotechar=self.quotechar)lines = list(reader)plfile.write("# MySQL exported data file\n")plfile.write("# Format: field1|field2|field3...\n")if not include_headers:lines = lines[1:]for line in lines:plfile.write('|'.join(line) + '\n')print(f"已将 {csv_path} 转换为 {pl_path}")def pl_to_txt(self, pl_path, txt_path, has_headers=True):temp_csv = Path(pl_path).with_suffix('.temp.csv')# 先将PL转CSV(内部逻辑)with open(pl_path, 'r', encoding='utf-8') as plfile, \open(temp_csv, 'w', encoding='utf-8', newline='') as csvfile:writer = csv.writer(csvfile, delimiter=self.delimiter, quotechar=self.quotechar)lines = [line.strip() for line in plfile if line.strip() and not line.startswith('#')]if has_headers:headers = lines[0].split('|')writer.writerow(headers)data_lines = lines[1:]else:data_lines = linesfor line in data_lines:writer.writerow(line.split('|'))# 再将CSV转TXTwith open(temp_csv, 'r', encoding='utf-8') as csvfile, \open(txt_path, 'w', encoding='utf-8') as txtfile:reader = csv.reader(csvfile, delimiter=self.delimiter, quotechar=self.quotechar)lines = list(reader)if not has_headers:lines = lines[1:]for line in lines:txtfile.write(self.delimiter.join(line) + '\n')temp_csv.unlink()print(f"已将 {pl_path} 转换为 {txt_path}")# --------------------------
# 分步执行,确保每个步骤的输入文件存在
# --------------------------# 初始化转换器
converter = DataConverter()# 步骤1:创建一个测试用的data.txt,并转换为CSV
# 先手动创建data.txt,内容如下:
# id,name,age
# 1,张三,30
# 2,李四,25
# 3,王五,35
converter.txt_to_csv(txt_path="data.txt",    # 确保当前目录有这个文件csv_path="data_from_txt.csv",has_headers=True
)# 步骤2:用步骤1生成的CSV文件(data_from_txt.csv)转换为PL
converter.csv_to_pl(csv_path="data_from_txt.csv",  # 使用上一步生成的CSV,确保存在pl_path="data_from_csv.pl",include_headers=True
)# 步骤3:用步骤2生成的PL文件(data_from_csv.pl)转换为TXT
converter.pl_to_txt(pl_path="data_from_csv.pl",  # 使用上一步生成的PL,确保存在txt_path="data_from_pl.txt",has_headers=True
)

已将 data.txt 转换为 data_from_txt.csv
已将 data_from_txt.csv 转换为 data_from_csv.pl
已将 data_from_csv.pl 转换为 data_from_pl.txt

11.7 提取并重新排列数据文件的列 412


```python
import csv
from pathlib import Pathdef rearrange_columns(input_file, output_file, new_order, delimiter=',', has_headers=True
):"""提取并重新排列文本文件中的列参数:input_file: 输入文件路径output_file: 输出文件路径new_order: 新的列顺序列表,例如 ['name', 'id', 'age'] 或 [1, 0, 2]delimiter: 列分隔符,默认是逗号has_headers: 是否包含表头"""with open(input_file, 'r', encoding='utf-8') as infile, \open(output_file, 'w', encoding='utf-8', newline='') as outfile:reader = csv.reader(infile, delimiter=delimiter)writer = csv.writer(outfile, delimiter=delimiter)# 处理表头if has_headers:headers = next(reader)# 确定新列顺序的索引if all(isinstance(x, str) for x in new_order):# 如果new_order是列名,转换为索引try:new_indices = [headers.index(col) for col in new_order]except ValueError as e:raise ValueError(f"列名不存在: {e}")else:# 如果new_order是索引new_indices = new_order# 验证索引有效性for idx in new_indices:if idx < 0 or idx >= len(headers):raise IndexError(f"无效的列索引: {idx}")# 写入新表头new_headers = [headers[i] for i in new_indices]writer.writerow(new_headers)else:# 无表头时,直接使用索引new_indices = new_order# 先读取一行获取列数first_row = next(reader)for idx in new_indices:if idx < 0 or idx >= len(first_row):raise IndexError(f"无效的列索引: {idx}")# 写回第一行writer.writerow([first_row[i] for i in new_indices])# 处理数据行for row in reader:# 跳过空行if not row:continue# 按新顺序写入列new_row = [row[i] for i in new_indices]writer.writerow(new_row)print(f"列重排完成!\n输入文件: {input_file}\n输出文件: {output_file}")print(f"新的列顺序: {new_order}")if __name__ == "__main__":# 示例使用# 1. 准备测试数据(如果不存在则创建)test_data = """id,name,age,city,email
1,张三,30,北京,zhangsan@example.com
2,李四,25,上海,lisi@example.com
3,王五,35,广州,wangwu@example.com
"""with open("data.txt", "w", encoding="utf-8") as f:f.write(test_data)# 2. 列重排示例# 示例1: 使用列名重新排列(保留需要的列)rearrange_columns(input_file="data.txt",output_file="rearranged_by_name.txt",new_order=['name', 'age', 'city', 'id'],  # 新的列顺序(按列名)delimiter=',',has_headers=True)# 示例2: 使用列索引重新排列(索引从0开始)rearrange_columns(input_file="data.txt",output_file="rearranged_by_index.txt",new_order=[1, 3, 0],  # 新的列顺序(按索引,对应name, city, id)delimiter=',',has_headers=True)# 示例3: 处理无表头的文件(假设用Tab分隔)# 先创建一个无表头的Tab分隔文件no_header_data = """1\t张三\t30\t北京
2\t李四\t25\t上海
3\t王五\t35\t广州
"""with open("data_no_header.txt", "w", encoding="utf-8") as f:f.write(no_header_data)# 重排无表头文件的列rearrange_columns(input_file="data_no_header.txt",output_file="rearranged_no_header.txt",new_order=[1, 0, 3],  # 按索引重排(姓名, ID, 城市)delimiter='\t',has_headers=False
)列重排完成!
输入文件: data.txt
输出文件: rearranged_by_name.txt
新的列顺序: ['name', 'age', 'city', 'id']
列重排完成!
输入文件: data.txt
输出文件: rearranged_by_index.txt
新的列顺序: [1, 3, 0]
列重排完成!
输入文件: data_no_header.txt
输出文件: rearranged_no_header.txt
新的列顺序: [1, 0, 3]选择建议
追求极致压缩:优先选 Parquet(适合批量分析)或 Feather(适合快速读写)。
需要兼容文本格式:用 GZIP 压缩 CSV(简单易用,支持大多数工具)。
需要查询功能:用 SQLite(单文件数据库,支持 SQL 查询)。
半结构化数据:用 JSONL(比普通 JSON 紧凑,适合日志类数据)。
以文档中的 employees.txt 为例,转换为 Parquet 后大小通常可减少 70% 以上,且保留完整结构化信息,是最推荐的高效存储方式。
import mysql.connector
from mysql.connector import Error
import csvdef mysql_to_txt(host,database,user,password,table_name="employees",output_file="employees.txt",delimiter="|",include_headers=True,query=None
):"""从MySQL数据库提取表数据并保存为文本文件参数:host: 数据库主机地址database: 数据库名称user: 数据库用户名password: 数据库密码table_name: 要提取数据的表名,默认是employeesoutput_file: 输出的文本文件名,默认是employees.txtdelimiter: 文本文件的列分隔符,默认是|include_headers: 是否包含表头,默认是Truequery: 自定义查询语句,如果提供则忽略table_name"""connection = Nonetry:# 连接数据库connection = mysql.connector.connect(host=host,database=database,user=user,password=password)if connection.is_connected():cursor = connection.cursor()# 准备查询语句if query is None:query = f"SELECT * FROM {table_name}"# 执行查询cursor.execute(query)records = cursor.fetchall()columns = [column[0] for column in cursor.description]  # 获取列名# 写入文本文件with open(output_file, 'w', encoding='utf-8', newline='') as txtfile:writer = csv.writer(txtfile,delimiter=delimiter,quoting=csv.QUOTE_MINIMAL)# 写入表头if include_headers:writer.writerow(columns)# 写入数据行writer.writerows(records)print(f"数据提取成功!")print(f"表名: {table_name}")print(f"提取记录数: {len(records)}")print(f"输出文件: {output_file}")print(f"分隔符: '{delimiter}'")except Error as e:print(f"数据库操作错误: {e}")finally:# 关闭数据库连接if connection is not None and connection.is_connected():cursor.close()connection.close()print("数据库连接已关闭")if __name__ == "__main__":# 配置数据库连接信息(请根据实际情况修改)db_config = {"host": "localhost",        # 数据库主机地址"database": "employees",# 数据库名称"user": "root",    # 数据库用户名"password": "root" # 数据库密码}# 提取employees表数据到employees.txtmysql_to_txt(**db_config,table_name="employees",output_file="employees.txt",delimiter="|",  # 使用|作为分隔符,便于后续导入MySQLinclude_headers=True)# 示例:提取特定列(使用自定义查询)# custom_query = "SELECT emp_no, first_name, last_name, hire_date FROM employees WHERE hire_date > '2000-01-01'"# mysql_to_txt(#     ** db_config,#     query=custom_query,#     output_file="employees_filtered.txt",#     delimiter=","# )
数据提取成功!
表名: employees
提取记录数: 300024
输出文件: employees.txt
分隔符: '|'
数据库连接已关闭import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path  # 新增导入Path类# 读取 TXT 文件(按|分隔)
df = pd.read_csv("employees.txt",sep="|",header=0,  # 第一行为表头names=["emp_no", "birth_date", "first_name", "last_name", "gender", "hire_date"]
)# 转换为 Parquet 并压缩
pq.write_table(pa.Table.from_pandas(df),"employees.parquet",compression="snappy"  # 可选压缩算法:snappy、gzip、lz4等
)# 计算并打印文件大小(修正后)
print(f"原文件大小: {Path('employees.txt').stat().st_size / 1024:.2f} KB")  # 更准确的原文件大小计算方式
print(f"Parquet文件大小: {Path('employees.parquet').stat().st_size / 1024:.2f} KB")原文件大小: 13791.09 KB
Parquet文件大小: 3318.18 KBimport gzip
import csv
from pathlib import Path# 读取 TXT 并写入 GZIP 压缩的 CSV
with open("employees.txt", "r", encoding="utf-8") as f_in, \gzip.open("employees.csv.gz", "wt", encoding="utf-8") as f_out:reader = csv.reader(f_in, delimiter="|")writer = csv.writer(f_out, delimiter=",")  # 转为逗号分隔for row in reader:writer.writerow(row)print(f"GZIP压缩后大小: {Path('employees.csv.gz').stat().st_size / 1024:.2f} KB")GZIP压缩后大小: 4971.34 KBimport sqlite3
import csv
from pathlib import Path# 连接 SQLite 数据库(文件不存在则创建)
conn = sqlite3.connect("employees.db")
cursor = conn.cursor()# 创建表结构
cursor.execute("""CREATE TABLE IF NOT EXISTS employees (emp_no INT,birth_date DATE,first_name TEXT,last_name TEXT,gender TEXT,hire_date DATE)
""")# 读取 TXT 并插入数据
with open("employees.txt", "r", encoding="utf-8") as f:reader = csv.reader(f, delimiter="|")next(reader)  # 跳过表头cursor.executemany("""INSERT INTO employees VALUES (?, ?, ?, ?, ?, ?)""", reader)conn.commit()
conn.close()print(f"SQLite文件大小: {Path('employees.db').stat().st_size / 1024:.2f} KB")SQLite文件大小: 14884.00 KB为什么 Parquet 压缩效率这么高?
列存储特性:Parquet 按列存储数据,对于重复值较多的列(如 gender 只有 M/F 两种值),压缩算法(如 snappy)能高效消除冗余。
类型优化:自动识别数据类型(如 emp_no 为整数、birth_date 为日期),用更紧凑的二进制格式存储,比文本格式更节省空间。
按需读取:后续使用时可以只读取需要的列(如只查 name 和 hire_date),无需加载整个文件,进一步提升效率。
后续操作建议
验证数据完整性:可以用以下代码确认转换后的数据是否完整:
python
运行
import pandas as pd
# 读取Parquet文件
parquet_df = pd.read_parquet("employees.parquet")
# 对比行数是否与原文件一致
txt_df = pd.read_csv("employees.txt", sep="|")
print(f"原文件行数: {len(txt_df)}, Parquet文件行数: {len(parquet_df)}")  # 应相等原文件行数: 300024, Parquet文件行数: 300024当数据量达到 5 亿行以上时,选择存储格式需要重点考虑压缩效率、分布式处理兼容性、读写性能和查询效率(尤其是列级操作和过滤能力)。此时,Parquet 是最优选择,其次是 ORC(针对特定生态),以下是具体分析:
一、5 亿 + 数据量的核心需求
极致压缩率:减少存储成本(5 亿行数据若用 CSV 可能占用数十 TB,压缩后需控制在数 TB 内)。
分布式友好:支持 Spark、Flink、Hadoop 等分布式框架,避免单机处理瓶颈。
列级操作支持:可只读取需要的列(如仅查询hire_date和gender),减少 IO 和计算量。
稳定的读写性能:大规模数据下避免内存溢出,支持批量读写和并行处理。
二、最优选择:Parquet
核心优势:
压缩效率碾压文本格式
5 亿行数据中,大量列(如gender仅 2 个值、birth_date格式固定)存在极高重复度,Parquet 的列存储 + 压缩算法(如 ZSTD、Gzip)可将压缩比做到 10:1 甚至 20:1(远高于 CSV 的 3:1)。例如:5 亿行员工数据用 CSV 可能占 50TB,Parquet 压缩后可降至 3-5TB。
完美适配分布式生态
支持 Spark、Hive、Flink 等分布式计算框架,可直接进行分区存储(如按hire_year分区),实现 “数据分片 + 并行处理”,避免单机加载 5 亿行数据的内存压力。
谓词下推与列剪枝
查询时可通过where条件(如hire_date > '2000-01-01')直接在存储层过滤数据,且只读取需要的列(如仅emp_no和last_name),IO 量减少 80% 以上。
成熟的工业级支持
广泛用于大数据场景(如 Netflix、Uber 的 PB 级数据存储),兼容性强,工具链完善(Python、Java、SQL 均可直接操作)。
import pandas as pd
from sqlalchemy import create_engine
from pathlib import Path# 1. 读取 Parquet 文件
parquet_df = pd.read_parquet("employees.parquet")
print(f"待导入数据量:{len(parquet_df)} 行")# 2. 连接 MySQL 数据库(替换为你的数据库信息)
db_config = {"host": "localhost","user": "你的用户名","password": "你的密码","database": "目标数据库名","port": 3306
}
engine = create_engine(f"mysql+mysqlconnector://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}")# 3. 写入 MySQL(自动创建表,若表已存在可添加 if_exists='append' 追加)
parquet_df.to_sql(name="employees_from_parquet",  # 目标表名con=engine,index=False,  # 不导入 DataFrame 的索引if_exists="replace"  # 若表存在则替换(可选:'fail' 报错 / 'append' 追加)
)print("导入完成!可在 MySQL 中查询表 employees_from_parquet")待导入数据量:300024 行
导入完成!可在 MySQL 中查询表 employees_from_parquet
http://www.lryc.cn/news/596529.html

相关文章:

  • 【RAG优化】PDF复杂表格解析问题分析
  • 阶段1--Linux中的文件服务器(FTP、NAS、SSH)
  • 从差异到协同:OKR 与 KPI 的管理逻辑,Moka 让适配更简单
  • 苹果app应用ipa文件程序开发后如何运行到苹果iOS真机上测试?
  • C# 析构函数
  • 【论文阅读 | TIV 2024 | CDC-YOLOFusion:利用跨尺度动态卷积融合实现可见光-红外目标检测】
  • 2025年07月22日Github流行趋势
  • 坑机介绍学习研究
  • 激活函数Focal Loss 详解​
  • 数组——初识数据结构
  • DMZ网络安全基础知识
  • [3-02-02].第04节:开发应用 - RequestMapping注解的属性2
  • Fluent许可与网络安全策略
  • 【kubernetes】-2 K8S的资源管理
  • Java数据结构——ArrayList
  • 【黑马SpringCloud微服务开发与实战】(五)微服务保护
  • 嵌入式学习-土堆目标检测(3)-day27
  • 【自定义一个简单的CNN模型】——深度学习.卷积神经网络
  • 【Java】SVN 版本控制软件的快速安装(可视化)
  • 洛谷刷题7..22
  • (Arxiv-2025)HiDream-I1:一种高效图像生成基础模型,采用稀疏扩散Transformer
  • CMake实践:CMake3.30版本之前和之后链接boost的方式差异
  • Day20-二叉树基础知识
  • 智能Agent场景实战指南 Day 18:Agent决策树与规划能力
  • Java 动态导出 Word 登记表:多人员、分页、动态表格的最佳实践
  • IntelliJ IDEA (2024.3.1)优雅导入 Maven 项目的两种方式详解
  • 【IDEA】如何在IDEA中通过git创建项目?
  • IDEA-通过IDEA导入第三方的依赖包
  • Spring5的IOC原理
  • Node.js:Web模块、Express框架