2026-05-13
Python
0

目录

🏭 那些年,我们被Excel坑过的岁月
🔍 先把问题摸透,再动手写代码
🛠️ 环境准备,先把工具备齐
📦 第一步:构建通用的Excel读取器
🗄️ 第二步:数据库层抽象——SQLite和MySQL统一接口
🚀 第三步:迁移主流程——把一切串起来
▶️ 实际运行示例
⚡ 性能这块,有几个坑必须提前说
💬 聊几句

🏭 那些年,我们被Excel坑过的岁月

说真的,我在工厂车间里做数据系统的时候,见过太多这种场景了——

一台老旧的工控机,桌面上摆着十几个Excel文件,文件名叫"设备数据_最终版_v3_真的最终版.xlsx"。每次要查历史数据,就得打开七八个表格,手动复制粘贴,搞个把小时才能出一份报表。更要命的是,有时候数据还对不上,因为两个班次的操作员各自维护了一份,格式还不一样。

这不是个例。制造业里,Excel作为"数据库"使用的现象极其普遍。它轻便、直观,入门门槛低,但一旦数据量上去了、多人协作了、需要实时查询了——它的局限性就暴露得一干二净。

今天咱们就聊一件很多工程师都绕不开的事:怎么用Python把Excel里的历史数据,优雅地迁移到SQLite或MySQL里,同时还得保证数据不丢、格式不乱、迁移过程可追溯。


🔍 先把问题摸透,再动手写代码

在我接触过的工业数据迁移项目里,有三类问题反复出现,踩坑率极高。

第一类:数据格式的混乱程度超出想象。 同一列"温度"字段,有的行写的是85.3,有的写85.3℃,有的写约85度,甚至还有--表示传感器离线。这种"人工智能"录入方式,直接导致数值列无法直接入库。

第二类:时间戳格式五花八门。 2023/8/52023-08-058月5日 14:30……同一个Excel文件里可能混用三种格式,pandas读进来直接变成object类型,后续时序查询全部废掉。

第三类:多Sheet、多文件的数据孤岛。 按月份拆分的Excel,每个文件有12个Sheet,字段名还不完全一致(有的叫"压力值",有的叫"压力",有的叫"P_value")。合并之前必须做字段映射,否则入库之后数据根本没法用。

搞清楚这三类问题,咱们的迁移方案就有了清晰的骨架。


🛠️ 环境准备,先把工具备齐

bash
pip install pandas openpyxl sqlalchemy pymysql tqdm

这几个包各有分工:pandas 负责读取和清洗Excel,openpyxl 是pandas读取.xlsx的底层引擎,sqlalchemy 提供统一的数据库抽象层(SQLite和MySQL都能用),pymysql 是MySQL的Python驱动,tqdm 用来显示迁移进度条——数据量大的时候,没有进度条真的会让人抓狂。


📦 第一步:构建通用的Excel读取器

先写一个能处理"脏数据"的读取模块,这是整个迁移流程的地基。

python
import pandas as pd import re import logging from pathlib import Path from typing import Optional, Dict, List, Union, Tuple from datetime import datetime # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class ExcelDataError(Exception): """自定义异常类,用于Excel数据处理错误""" pass class ExcelReader: """ 工业数据Excel读取器 专门处理格式混乱的历史数据文件,支持多种数据清洗和验证功能 """ # 字段名映射表——统一不同时期的命名习惯 FIELD_MAPPING = { # 压力相关字段 "压力值": "pressure", "压力": "pressure", "P_value": "pressure", "Pressure": "pressure", "press": "pressure", "压力(MPa)": "pressure", "压力/MPa": "pressure", # 温度相关字段 "温度值": "temperature", "温度": "temperature", "T_value": "temperature", "Temperature": "temperature", "temp": "temperature", "温度(℃)": "temperature", "温度/℃": "temperature", # 时间相关字段 "时间": "record_time", "记录时间": "record_time", "timestamp": "record_time", "Timestamp": "record_time", "采集时间": "record_time", "日期": "record_time", "Date": "record_time", "DateTime": "record_time", # 其他可能的工业数据字段 "流量": "flow_rate", "Flow": "flow_rate", "湿度": "humidity", "Humidity": "humidity", "设备状态": "device_status", "Status": "device_status" } # 数据质量阈值配置 QUALITY_THRESHOLDS = { "pressure": {"min": 0, "max": 1000}, # MPa "temperature": {"min": -50, "max": 200}, # ℃ "flow_rate": {"min": 0, "max": 10000}, # 根据实际情况调整 "humidity": {"min": 0, "max": 100} # % } def __init__(self, file_path: Union[str, Path], sheet_name: Union[str, int] = 0, encoding: str = 'utf-8', skip_rows: int = 0): """ 初始化Excel读取器 Args: file_path: Excel文件路径 sheet_name: 工作表名称或索引 encoding: 文件编码 skip_rows: 跳过的行数(用于处理表头前的说明文字) """ self.file_path = Path(file_path) self.sheet_name = sheet_name self.encoding = encoding self.skip_rows = skip_rows self.quality_report = {} # 验证文件存在性 if not self.file_path.exists(): raise FileNotFoundError(f"Excel文件不存在: {self.file_path}") # 验证文件格式 if self.file_path.suffix.lower() not in ['.xlsx', '.xls']: raise ExcelDataError(f"不支持的文件格式: {self.file_path.suffix}") def read(self, validate_data: bool = True, remove_duplicates: bool = True) -> pd.DataFrame: """ 读取并处理Excel数据 Args: validate_data: 是否进行数据质量验证 remove_duplicates: 是否移除重复行 Returns: 清洗后的DataFrame """ try: logger.info(f"开始读取Excel文件: {self.file_path}") # 读取原始数据 df = self._read_raw_data() original_rows = len(df) logger.info(f"原始数据行数: {original_rows}") # 数据处理流程 df = self._normalize_columns(df) df = self._clean_numeric_fields(df) df = self._parse_datetime(df) df = self._drop_invalid_rows(df) if remove_duplicates: df = self._remove_duplicates(df) if validate_data: df = self._validate_data_quality(df) # 按时间排序 if "record_time" in df.columns: df = df.sort_values("record_time").reset_index(drop=True) processed_rows = len(df) logger.info(f"处理完成,有效数据行数: {processed_rows} " f"(数据保留率: {processed_rows / original_rows * 100:.1f}%)") # 生成质量报告 self._generate_quality_report(df, original_rows) return df except Exception as e: logger.error(f"Excel读取失败: {str(e)}") raise ExcelDataError(f"Excel文件处理失败: {str(e)}") def _read_raw_data(self) -> pd.DataFrame: """读取原始Excel数据""" try: # 尝试读取指定工作表 df = pd.read_excel( self.file_path, sheet_name=self.sheet_name, engine="openpyxl", skiprows=self.skip_rows, na_values=['', ' ', 'N/A', 'null', 'NULL', '--', '——', '无数据'] ) # 检查是否为空文件 if df.empty: raise ExcelDataError("Excel文件为空或无有效数据") return df except Exception as e: # 如果指定工作表读取失败,尝试读取第一个工作表 if isinstance(self.sheet_name, str): logger.warning(f"工作表 '{self.sheet_name}' 不存在,尝试读取第一个工作表") return pd.read_excel(self.file_path, sheet_name=0, engine="openpyxl") else: raise ExcelDataError(f"无法读取Excel文件: {str(e)}") def _normalize_columns(self, df: pd.DataFrame) -> pd.DataFrame: """ 统一列名处理 1. 去除前后空格和特殊字符 2. 使用映射表统一字段名 3. 处理重复列名 """ # 清理列名 df.columns = [str(c).strip().replace('\n', '').replace('\r', '') for c in df.columns] # 处理重复列名 cols = pd.Series(df.columns) for dup in cols[cols.duplicated()].unique(): cols[cols[cols == dup].index.values.tolist()] = [ f"{dup}_{i}" if i != 0 else dup for i in range(sum(cols == dup)) ] df.columns = cols # 应用字段映射 df.rename(columns=self.FIELD_MAPPING, inplace=True) logger.info(f"列名标准化完成,字段: {list(df.columns)}") return df def _clean_numeric_fields(self, df: pd.DataFrame) -> pd.DataFrame: """ 清洗数值字段 处理各种异常情况:单位符号、异常标记、科学计数法等 """ # 识别可能的数值字段 numeric_candidates = [col for col in df.columns if col in ['pressure', 'temperature', 'flow_rate', 'humidity']] for col in numeric_candidates: if col not in df.columns: continue logger.info(f"清洗数值字段: {col}") original_valid = df[col].notna().sum() df[col] = df[col].apply(self._extract_number) cleaned_valid = df[col].notna().sum() logger.info(f" {col}: {original_valid}{cleaned_valid} 有效值") return df @staticmethod def _extract_number(val) -> Optional[float]: """ 从混合格式字符串中提取数值 支持格式:'85.3℃', '1.2MPa', '1.23e-4', '--', 'N/A' 等 """ if pd.isna(val): return None s = str(val).strip() # 处理明确的无效值 if s.lower() in ['--', '——', 'n/a', 'null', '', '无数据', '异常']: return None try: # 直接转换数值 return float(s) except ValueError: pass # 使用正则表达式提取数值(支持科学计数法) patterns = [ r'-?\d+\.?\d*[eE][-+]?\d+', # 科学计数法 r'-?\d+\.\d+', # 小数 r'-?\d+' # 整数 ] for pattern in patterns: match = re.search(pattern, s) if match: try: return float(match.group()) except ValueError: continue return None def _parse_datetime(self, df: pd.DataFrame) -> pd.DataFrame: """ 解析时间字段,支持多种时间格式 """ if "record_time" not in df.columns: logger.warning("未找到时间字段") return df logger.info("解析时间字段") original_valid = df["record_time"].notna().sum() try: df["record_time"] = pd.to_datetime( df["record_time"], errors="coerce", format='mixed' ) except TypeError: df["record_time"] = pd.to_datetime( df["record_time"], errors="coerce" ) parsed_valid = df["record_time"].notna().sum() logger.info(f"时间解析完成: {original_valid}{parsed_valid} 有效值") return df def _drop_invalid_rows(self, df: pd.DataFrame) -> pd.DataFrame: """删除无效行""" original_rows = len(df) # 删除全空行 df = df.dropna(how='all') # 删除时间戳无效的行 if "record_time" in df.columns: invalid_time = df["record_time"].isna().sum() if invalid_time > 0: logger.warning(f"删除 {invalid_time} 行无效时间数据") df = df.dropna(subset=["record_time"]) final_rows = len(df) logger.info(f"无效行清理完成: {original_rows}{final_rows}") return df.reset_index(drop=True) def _remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame: """移除重复数据""" original_rows = len(df) # 基于时间戳去重(保留最后一条记录) if "record_time" in df.columns: df = df.drop_duplicates(subset=["record_time"], keep="last") else: df = df.drop_duplicates() final_rows = len(df) if original_rows > final_rows: logger.info(f"移除重复数据: {original_rows - final_rows} 行") return df.reset_index(drop=True) def _validate_data_quality(self, df: pd.DataFrame) -> pd.DataFrame: """数据质量验证和异常值处理""" for col, thresholds in self.QUALITY_THRESHOLDS.items(): if col not in df.columns: continue # 统计异常值 valid_mask = (df[col] >= thresholds["min"]) & (df[col] <= thresholds["max"]) outliers = (~valid_mask & df[col].notna()).sum() if outliers > 0: logger.warning(f"{col} 发现 {outliers} 个异常值 " f"(范围: {thresholds['min']}-{thresholds['max']})") # 可选:将异常值设为NaN而不是删除整行 df.loc[~valid_mask, col] = None return df def _generate_quality_report(self, df: pd.DataFrame, original_rows: int): """生成数据质量报告""" self.quality_report = { "file_info": { "file_path": str(self.file_path), "processed_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }, "data_summary": { "original_rows": original_rows, "final_rows": len(df), "retention_rate": len(df) / original_rows if original_rows > 0 else 0, "columns": list(df.columns) }, "field_statistics": {} } # 各字段统计 for col in df.columns: if df[col].dtype in ['float64', 'int64']: self.quality_report["field_statistics"][col] = { "valid_count": df[col].notna().sum(), "missing_count": df[col].isna().sum(), "min": float(df[col].min()) if df[col].notna().any() else None, "max": float(df[col].max()) if df[col].notna().any() else None, "mean": float(df[col].mean()) if df[col].notna().any() else None } def get_quality_report(self) -> Dict: """获取数据质量报告""" return self.quality_report def print_quality_report(self): """打印数据质量报告""" if not self.quality_report: print("尚未生成质量报告,请先调用 read() 方法") return report = self.quality_report print("\n" + "=" * 50) print("📊 数据质量报告") print("=" * 50) # 文件信息 print(f"文件路径: {report['file_info']['file_path']}") print(f"处理时间: {report['file_info']['processed_time']}") # 数据概览 summary = report['data_summary'] print(f"\n原始行数: {summary['original_rows']}") print(f"有效行数: {summary['final_rows']}") print(f"数据保留率: {summary['retention_rate']:.1%}") print(f"字段列表: {', '.join(summary['columns'])}") # 字段统计 if report['field_statistics']: print("\n字段统计:") for field, stats in report['field_statistics'].items(): print(f" {field}:") print(f" 有效值: {stats['valid_count']} | " f"缺失值: {stats['missing_count']}") if stats['min'] is not None: print(f" 范围: {stats['min']:.3f} ~ {stats['max']:.3f} | " f"均值: {stats['mean']:.3f}") print("=" * 50) def create_sample_data(filename: str = "industrial_data.xlsx"): """ 创建示例数据文件用于测试 """ import numpy as np from datetime import datetime, timedelta # 生成示例数据 np.random.seed(42) n_records = 1000 # 时间序列 start_time = datetime(2024, 1, 1, 0, 0, 0) time_series = [start_time + timedelta(minutes=i * 5) for i in range(n_records)] # 模拟工业数据 data = { '时间': time_series, '压力(MPa)': np.random.normal(5.0, 0.5, n_records), '温度(℃)': np.random.normal(85.0, 10.0, n_records), 'Flow': np.random.normal(100.0, 15.0, n_records), 'Status': ['正常'] * (n_records - 50) + ['异常'] * 50 } # 添加一些异常值和缺失值 data['压力(MPa)'][50:60] = ['--'] * 10 # 缺失值 data['温度(℃)'][100:105] = [300, -100, 500, 'N/A', '异常'] # 异常值 df = pd.DataFrame(data) df.to_excel(filename, index=False) print(f"示例数据已创建: {filename}") return df def main(): """使用示例""" try: # 如果测试文件不存在,创建示例数据 test_file = "industrial_data.xlsx" if not Path(test_file).exists(): print("创建示例数据文件...") create_sample_data(test_file) # 创建读取器实例 reader = ExcelReader( file_path=test_file, sheet_name=0, skip_rows=0 ) # 读取和处理数据 df = reader.read( validate_data=True, remove_duplicates=True ) print(f"\n处理结果:") print(f"数据形状: {df.shape}") print(f"字段类型:\n{df.dtypes}") print(f"\n前5行数据:") print(df.head()) # 打印质量报告 reader.print_quality_report() # 保存清洗后的数据 output_path = "cleaned_industrial_data.xlsx" df.to_excel(output_path, index=False) print(f"\n清洗后数据已保存至: {output_path}") except Exception as e: logger.error(f"处理失败: {str(e)}") if __name__ == "__main__": main()

image.png

这个读取器干了几件关键的事:字段名归一化、数值清洗(用正则提取数字部分)、时间解析容错处理。实际项目里,你可以根据自己的字段情况扩展FIELD_MAPPING,这个设计可以复用到大多数工业Excel文件上。


🗄️ 第二步:数据库层抽象——SQLite和MySQL统一接口

这里用SQLAlchemy做抽象层,好处是切换数据库只需要改一行连接字符串,迁移逻辑代码完全不用动。

python
from sqlalchemy import create_engine, text from sqlalchemy.engine import Engine class DatabaseManager: """ 支持SQLite和MySQL的数据库管理器 通过连接字符串区分目标数据库类型 """ def __init__(self, db_type: str = "sqlite", **kwargs): self.engine = self._create_engine(db_type, **kwargs) def _create_engine(self, db_type: str, **kwargs) -> Engine: if db_type == "sqlite": db_path = kwargs.get("db_path", "industrial_data.db") url = f"sqlite:///{db_path}" elif db_type == "mysql": host = kwargs.get("host", "localhost") port = kwargs.get("port", 3306) user = kwargs.get("user", "root") password = kwargs.get("password", "") database = kwargs.get("database", "industrial") url = ( f"mysql+pymysql://{user}:{password}" f"@{host}:{port}/{database}" f"?charset=utf8mb4" ) else: raise ValueError(f"不支持的数据库类型: {db_type}") return create_engine(url, echo=False) def init_table(self, table_name: str): """建表——如果已存在则跳过""" ddl = f""" CREATE TABLE IF NOT EXISTS `{table_name}` ( id INTEGER PRIMARY KEY AUTOINCREMENT, record_time DATETIME NOT NULL, pressure FLOAT, temperature FLOAT, source_file VARCHAR(255), created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """ # MySQL不支持AUTOINCREMENT关键字,需要换成AUTO_INCREMENT if "mysql" in str(self.engine.url): ddl = ddl.replace("AUTOINCREMENT", "AUTO_INCREMENT") with self.engine.connect() as conn: conn.execute(text(ddl)) conn.commit() print(f" ✅ 表 [{table_name}] 初始化完成") def write_dataframe( self, df: pd.DataFrame, table_name: str, chunk_size: int = 500 ) -> int: """ 分批写入DataFrame,返回实际写入行数 chunk_size控制每批次大小,防止单次提交过大导致内存溢出 """ if df.empty: return 0 df.to_sql( name=table_name, con=self.engine, if_exists="append", index=False, chunksize=chunk_size, method="multi" # 批量INSERT,比逐行快5-10倍 ) return len(df)

关于SQLite和MySQL的选择,有个实际经验分享一下:数据量在500万行以内、单机查询为主,SQLite完全够用,零配置、零维护,文件直接拷走就能用;超过这个量级、需要多人并发读写或者对接其他系统,就上MySQL。别一上来就觉得SQLite"不够专业"——很多工控场景里,SQLite的读写性能其实相当惊人。


🚀 第三步:迁移主流程——把一切串起来

python
from tqdm import tqdm import glob import os class MigrationPipeline: """ Excel → 数据库的完整迁移流水线 支持单文件、多文件、多Sheet批量处理 """ def __init__(self, db_manager: DatabaseManager, table_name: str): self.db = db_manager self.table = table_name self.stats = {"total": 0, "success": 0, "failed": 0} def migrate_file(self, file_path: str): """迁移单个Excel文件(处理所有Sheet)""" path = Path(file_path) print(f"\n📂 处理文件: {path.name}") # 获取所有Sheet名称 xl = pd.ExcelFile(file_path, engine="openpyxl") sheets = xl.sheet_names for sheet in tqdm(sheets, desc=f" Sheet进度", leave=False): try: reader = ExcelReader(file_path, sheet_name=sheet) df = reader.read() if df.empty: continue # 记录数据来源,方便后续追溯 df["source_file"] = f"{path.name}::{sheet}" written = self.db.write_dataframe(df, self.table) self.stats["success"] += written self.stats["total"] += written except Exception as e: print(f" ❌ Sheet [{sheet}] 处理失败: {e}") self.stats["failed"] += 1 def migrate_directory(self, dir_path: str, pattern: str = "*.xlsx"): """批量迁移目录下所有匹配的Excel文件""" files = glob.glob(os.path.join(dir_path, pattern)) if not files: print(f"⚠️ 目录 [{dir_path}] 下没有找到匹配文件") return print(f"🔍 共发现 {len(files)} 个文件,开始迁移...\n") for f in tqdm(files, desc="总体进度"): self.migrate_file(f) self._print_summary() def _print_summary(self): print("\n" + "="*45) print("📊 迁移完成报告") print(f" 成功写入: {self.stats['success']:,} 行") print(f" 失败批次: {self.stats['failed']} 个") print("="*45)

▶️ 实际运行示例

把上面的模块拼起来,跑起来就是这样:

python
if __name__ == "__main__": # ---- 目标:SQLite(本地轻量场景)---- sqlite_db = DatabaseManager( db_type="sqlite", db_path="./factory_data.db" ) sqlite_db.init_table("device_records") pipeline = MigrationPipeline(sqlite_db, "device_records") pipeline.migrate_directory("./excel_data/", pattern="*.xlsx") # ---- 目标:MySQL(生产环境场景)---- # mysql_db = DatabaseManager( # db_type="mysql", # host="192.168.1.100", # port=3306, # user="data_admin", # password="your_password", # database="industrial_db" # ) # mysql_db.init_table("device_records") # pipeline = MigrationPipeline(mysql_db, "device_records") # pipeline.migrate_directory("./excel_data/")

image.png


⚡ 性能这块,有几个坑必须提前说

坑一:df.to_sql默认逐行插入,慢到怀疑人生。 一定要加method="multi",实测在MySQL上,10万行数据从原来的180秒降到了约22秒——差了将近8倍。

坑二:MySQL连接字符串里别忘了charset=utf8mb4 工厂数据里经常有中文备注,utf8(三字节版本)遇到某些特殊字符会直接报错,utf8mb4才是正确选择。

坑三:大文件迁移时内存会撑爆。 如果单个Excel文件超过10万行,建议在ExcelReader里加分块读取逻辑(pandas的chunksize参数),每次只处理一批,处理完立刻释放内存,别想着一口气全读进来。

坑四:迁移前务必检查重复数据。 如果同一份数据被迁移了两次,数据库里就会出现重复记录。可以在建表时对(record_time, source_file)加唯一约束,或者在写入前先做去重查询。


💬 聊几句

数据迁移这件事,表面上看是个体力活,实际上藏着很多细节——数据清洗策略、异常容错机制、写入性能调优,每一块都能单独展开讲很久。

我见过有人写了个"一键迁移脚本",跑完发现数据量对不上,查了两天才发现是时区问题导致时间解析错乱,损失惨重。所以迁移完之后,一定要做数据核验:抽样比对原始Excel和数据库里的记录,验证行数、验证关键字段的统计值(最大值、最小值、均值),这步不能省。

你在项目里做过类似的数据迁移工作吗?遇到过什么奇葩的数据格式问题?欢迎在评论区分享,说不定你的经历能帮到下一个踩坑的人。


#Python #数据迁移 #SQLite #MySQL #工业数据

本文作者:技术老小子

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!