说真的,我在工厂车间里做数据系统的时候,见过太多这种场景了——
一台老旧的工控机,桌面上摆着十几个Excel文件,文件名叫"设备数据_最终版_v3_真的最终版.xlsx"。每次要查历史数据,就得打开七八个表格,手动复制粘贴,搞个把小时才能出一份报表。更要命的是,有时候数据还对不上,因为两个班次的操作员各自维护了一份,格式还不一样。
这不是个例。制造业里,Excel作为"数据库"使用的现象极其普遍。它轻便、直观,入门门槛低,但一旦数据量上去了、多人协作了、需要实时查询了——它的局限性就暴露得一干二净。
今天咱们就聊一件很多工程师都绕不开的事:怎么用Python把Excel里的历史数据,优雅地迁移到SQLite或MySQL里,同时还得保证数据不丢、格式不乱、迁移过程可追溯。
在我接触过的工业数据迁移项目里,有三类问题反复出现,踩坑率极高。
第一类:数据格式的混乱程度超出想象。 同一列"温度"字段,有的行写的是85.3,有的写85.3℃,有的写约85度,甚至还有--表示传感器离线。这种"人工智能"录入方式,直接导致数值列无法直接入库。
第二类:时间戳格式五花八门。 2023/8/5、2023-08-05、8月5日 14:30……同一个Excel文件里可能混用三种格式,pandas读进来直接变成object类型,后续时序查询全部废掉。
第三类:多Sheet、多文件的数据孤岛。 按月份拆分的Excel,每个文件有12个Sheet,字段名还不完全一致(有的叫"压力值",有的叫"压力",有的叫"P_value")。合并之前必须做字段映射,否则入库之后数据根本没法用。
搞清楚这三类问题,咱们的迁移方案就有了清晰的骨架。
bashpip install pandas openpyxl sqlalchemy pymysql tqdm
这几个包各有分工:pandas 负责读取和清洗Excel,openpyxl 是pandas读取.xlsx的底层引擎,sqlalchemy 提供统一的数据库抽象层(SQLite和MySQL都能用),pymysql 是MySQL的Python驱动,tqdm 用来显示迁移进度条——数据量大的时候,没有进度条真的会让人抓狂。
先写一个能处理"脏数据"的读取模块,这是整个迁移流程的地基。
pythonimport pandas as pd
import re
import logging
from pathlib import Path
from typing import Optional, Dict, List, Union, Tuple
from datetime import datetime
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class ExcelDataError(Exception):
"""自定义异常类,用于Excel数据处理错误"""
pass
class ExcelReader:
"""
工业数据Excel读取器
专门处理格式混乱的历史数据文件,支持多种数据清洗和验证功能
""" # 字段名映射表——统一不同时期的命名习惯
FIELD_MAPPING = {
# 压力相关字段
"压力值": "pressure",
"压力": "pressure",
"P_value": "pressure",
"Pressure": "pressure",
"press": "pressure",
"压力(MPa)": "pressure",
"压力/MPa": "pressure",
# 温度相关字段
"温度值": "temperature",
"温度": "temperature",
"T_value": "temperature",
"Temperature": "temperature",
"temp": "temperature",
"温度(℃)": "temperature",
"温度/℃": "temperature",
# 时间相关字段
"时间": "record_time",
"记录时间": "record_time",
"timestamp": "record_time",
"Timestamp": "record_time",
"采集时间": "record_time",
"日期": "record_time",
"Date": "record_time",
"DateTime": "record_time",
# 其他可能的工业数据字段
"流量": "flow_rate",
"Flow": "flow_rate",
"湿度": "humidity",
"Humidity": "humidity",
"设备状态": "device_status",
"Status": "device_status"
}
# 数据质量阈值配置
QUALITY_THRESHOLDS = {
"pressure": {"min": 0, "max": 1000}, # MPa
"temperature": {"min": -50, "max": 200}, # ℃
"flow_rate": {"min": 0, "max": 10000}, # 根据实际情况调整
"humidity": {"min": 0, "max": 100} # %
}
def __init__(self, file_path: Union[str, Path], sheet_name: Union[str, int] = 0,
encoding: str = 'utf-8', skip_rows: int = 0):
"""
初始化Excel读取器
Args: file_path: Excel文件路径
sheet_name: 工作表名称或索引
encoding: 文件编码
skip_rows: 跳过的行数(用于处理表头前的说明文字)
""" self.file_path = Path(file_path)
self.sheet_name = sheet_name
self.encoding = encoding
self.skip_rows = skip_rows
self.quality_report = {}
# 验证文件存在性
if not self.file_path.exists():
raise FileNotFoundError(f"Excel文件不存在: {self.file_path}")
# 验证文件格式
if self.file_path.suffix.lower() not in ['.xlsx', '.xls']:
raise ExcelDataError(f"不支持的文件格式: {self.file_path.suffix}")
def read(self, validate_data: bool = True,
remove_duplicates: bool = True) -> pd.DataFrame:
"""
读取并处理Excel数据
Args: validate_data: 是否进行数据质量验证
remove_duplicates: 是否移除重复行
Returns: 清洗后的DataFrame
""" try:
logger.info(f"开始读取Excel文件: {self.file_path}")
# 读取原始数据
df = self._read_raw_data()
original_rows = len(df)
logger.info(f"原始数据行数: {original_rows}")
# 数据处理流程
df = self._normalize_columns(df)
df = self._clean_numeric_fields(df)
df = self._parse_datetime(df)
df = self._drop_invalid_rows(df)
if remove_duplicates:
df = self._remove_duplicates(df)
if validate_data:
df = self._validate_data_quality(df)
# 按时间排序
if "record_time" in df.columns:
df = df.sort_values("record_time").reset_index(drop=True)
processed_rows = len(df)
logger.info(f"处理完成,有效数据行数: {processed_rows} "
f"(数据保留率: {processed_rows / original_rows * 100:.1f}%)")
# 生成质量报告
self._generate_quality_report(df, original_rows)
return df
except Exception as e:
logger.error(f"Excel读取失败: {str(e)}")
raise ExcelDataError(f"Excel文件处理失败: {str(e)}")
def _read_raw_data(self) -> pd.DataFrame:
"""读取原始Excel数据"""
try:
# 尝试读取指定工作表
df = pd.read_excel(
self.file_path,
sheet_name=self.sheet_name,
engine="openpyxl",
skiprows=self.skip_rows,
na_values=['', ' ', 'N/A', 'null', 'NULL', '--', '——', '无数据']
) # 检查是否为空文件
if df.empty:
raise ExcelDataError("Excel文件为空或无有效数据")
return df
except Exception as e:
# 如果指定工作表读取失败,尝试读取第一个工作表
if isinstance(self.sheet_name, str):
logger.warning(f"工作表 '{self.sheet_name}' 不存在,尝试读取第一个工作表")
return pd.read_excel(self.file_path, sheet_name=0, engine="openpyxl")
else:
raise ExcelDataError(f"无法读取Excel文件: {str(e)}")
def _normalize_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""
统一列名处理
1. 去除前后空格和特殊字符
2. 使用映射表统一字段名
3. 处理重复列名
""" # 清理列名
df.columns = [str(c).strip().replace('\n', '').replace('\r', '')
for c in df.columns]
# 处理重复列名
cols = pd.Series(df.columns)
for dup in cols[cols.duplicated()].unique():
cols[cols[cols == dup].index.values.tolist()] = [
f"{dup}_{i}" if i != 0 else dup
for i in range(sum(cols == dup))
] df.columns = cols
# 应用字段映射
df.rename(columns=self.FIELD_MAPPING, inplace=True)
logger.info(f"列名标准化完成,字段: {list(df.columns)}")
return df
def _clean_numeric_fields(self, df: pd.DataFrame) -> pd.DataFrame:
"""
清洗数值字段
处理各种异常情况:单位符号、异常标记、科学计数法等
""" # 识别可能的数值字段
numeric_candidates = [col for col in df.columns
if col in ['pressure', 'temperature', 'flow_rate', 'humidity']]
for col in numeric_candidates:
if col not in df.columns:
continue
logger.info(f"清洗数值字段: {col}")
original_valid = df[col].notna().sum()
df[col] = df[col].apply(self._extract_number)
cleaned_valid = df[col].notna().sum()
logger.info(f" {col}: {original_valid} → {cleaned_valid} 有效值")
return df
@staticmethod
def _extract_number(val) -> Optional[float]:
"""
从混合格式字符串中提取数值
支持格式:'85.3℃', '1.2MPa', '1.23e-4', '--', 'N/A' 等
""" if pd.isna(val):
return None
s = str(val).strip()
# 处理明确的无效值
if s.lower() in ['--', '——', 'n/a', 'null', '', '无数据', '异常']:
return None
try:
# 直接转换数值
return float(s)
except ValueError:
pass
# 使用正则表达式提取数值(支持科学计数法)
patterns = [
r'-?\d+\.?\d*[eE][-+]?\d+', # 科学计数法
r'-?\d+\.\d+', # 小数
r'-?\d+' # 整数
]
for pattern in patterns:
match = re.search(pattern, s)
if match:
try:
return float(match.group())
except ValueError:
continue
return None
def _parse_datetime(self, df: pd.DataFrame) -> pd.DataFrame:
"""
解析时间字段,支持多种时间格式
""" if "record_time" not in df.columns:
logger.warning("未找到时间字段")
return df
logger.info("解析时间字段")
original_valid = df["record_time"].notna().sum()
try:
df["record_time"] = pd.to_datetime(
df["record_time"],
errors="coerce",
format='mixed'
)
except TypeError:
df["record_time"] = pd.to_datetime(
df["record_time"],
errors="coerce"
)
parsed_valid = df["record_time"].notna().sum()
logger.info(f"时间解析完成: {original_valid} → {parsed_valid} 有效值")
return df
def _drop_invalid_rows(self, df: pd.DataFrame) -> pd.DataFrame:
"""删除无效行"""
original_rows = len(df)
# 删除全空行
df = df.dropna(how='all')
# 删除时间戳无效的行
if "record_time" in df.columns:
invalid_time = df["record_time"].isna().sum()
if invalid_time > 0:
logger.warning(f"删除 {invalid_time} 行无效时间数据")
df = df.dropna(subset=["record_time"])
final_rows = len(df)
logger.info(f"无效行清理完成: {original_rows} → {final_rows}")
return df.reset_index(drop=True)
def _remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
"""移除重复数据"""
original_rows = len(df)
# 基于时间戳去重(保留最后一条记录)
if "record_time" in df.columns:
df = df.drop_duplicates(subset=["record_time"], keep="last")
else:
df = df.drop_duplicates()
final_rows = len(df)
if original_rows > final_rows:
logger.info(f"移除重复数据: {original_rows - final_rows} 行")
return df.reset_index(drop=True)
def _validate_data_quality(self, df: pd.DataFrame) -> pd.DataFrame:
"""数据质量验证和异常值处理"""
for col, thresholds in self.QUALITY_THRESHOLDS.items():
if col not in df.columns:
continue
# 统计异常值
valid_mask = (df[col] >= thresholds["min"]) & (df[col] <= thresholds["max"])
outliers = (~valid_mask & df[col].notna()).sum()
if outliers > 0:
logger.warning(f"{col} 发现 {outliers} 个异常值 "
f"(范围: {thresholds['min']}-{thresholds['max']})")
# 可选:将异常值设为NaN而不是删除整行
df.loc[~valid_mask, col] = None
return df
def _generate_quality_report(self, df: pd.DataFrame, original_rows: int):
"""生成数据质量报告"""
self.quality_report = {
"file_info": {
"file_path": str(self.file_path),
"processed_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
},
"data_summary": {
"original_rows": original_rows,
"final_rows": len(df),
"retention_rate": len(df) / original_rows if original_rows > 0 else 0,
"columns": list(df.columns)
},
"field_statistics": {}
}
# 各字段统计
for col in df.columns:
if df[col].dtype in ['float64', 'int64']:
self.quality_report["field_statistics"][col] = {
"valid_count": df[col].notna().sum(),
"missing_count": df[col].isna().sum(),
"min": float(df[col].min()) if df[col].notna().any() else None,
"max": float(df[col].max()) if df[col].notna().any() else None,
"mean": float(df[col].mean()) if df[col].notna().any() else None
}
def get_quality_report(self) -> Dict:
"""获取数据质量报告"""
return self.quality_report
def print_quality_report(self):
"""打印数据质量报告"""
if not self.quality_report:
print("尚未生成质量报告,请先调用 read() 方法")
return
report = self.quality_report
print("\n" + "=" * 50)
print("📊 数据质量报告")
print("=" * 50)
# 文件信息
print(f"文件路径: {report['file_info']['file_path']}")
print(f"处理时间: {report['file_info']['processed_time']}")
# 数据概览
summary = report['data_summary']
print(f"\n原始行数: {summary['original_rows']}")
print(f"有效行数: {summary['final_rows']}")
print(f"数据保留率: {summary['retention_rate']:.1%}")
print(f"字段列表: {', '.join(summary['columns'])}")
# 字段统计
if report['field_statistics']:
print("\n字段统计:")
for field, stats in report['field_statistics'].items():
print(f" {field}:")
print(f" 有效值: {stats['valid_count']} | " f"缺失值: {stats['missing_count']}")
if stats['min'] is not None:
print(f" 范围: {stats['min']:.3f} ~ {stats['max']:.3f} | " f"均值: {stats['mean']:.3f}")
print("=" * 50)
def create_sample_data(filename: str = "industrial_data.xlsx"):
"""
创建示例数据文件用于测试
""" import numpy as np
from datetime import datetime, timedelta
# 生成示例数据
np.random.seed(42)
n_records = 1000
# 时间序列
start_time = datetime(2024, 1, 1, 0, 0, 0)
time_series = [start_time + timedelta(minutes=i * 5) for i in range(n_records)]
# 模拟工业数据
data = {
'时间': time_series,
'压力(MPa)': np.random.normal(5.0, 0.5, n_records),
'温度(℃)': np.random.normal(85.0, 10.0, n_records),
'Flow': np.random.normal(100.0, 15.0, n_records),
'Status': ['正常'] * (n_records - 50) + ['异常'] * 50
}
# 添加一些异常值和缺失值
data['压力(MPa)'][50:60] = ['--'] * 10 # 缺失值
data['温度(℃)'][100:105] = [300, -100, 500, 'N/A', '异常'] # 异常值
df = pd.DataFrame(data)
df.to_excel(filename, index=False)
print(f"示例数据已创建: {filename}")
return df
def main():
"""使用示例"""
try:
# 如果测试文件不存在,创建示例数据
test_file = "industrial_data.xlsx"
if not Path(test_file).exists():
print("创建示例数据文件...")
create_sample_data(test_file)
# 创建读取器实例
reader = ExcelReader(
file_path=test_file,
sheet_name=0,
skip_rows=0
)
# 读取和处理数据
df = reader.read(
validate_data=True,
remove_duplicates=True
)
print(f"\n处理结果:")
print(f"数据形状: {df.shape}")
print(f"字段类型:\n{df.dtypes}")
print(f"\n前5行数据:")
print(df.head())
# 打印质量报告
reader.print_quality_report()
# 保存清洗后的数据
output_path = "cleaned_industrial_data.xlsx"
df.to_excel(output_path, index=False)
print(f"\n清洗后数据已保存至: {output_path}")
except Exception as e:
logger.error(f"处理失败: {str(e)}")
if __name__ == "__main__":
main()

这个读取器干了几件关键的事:字段名归一化、数值清洗(用正则提取数字部分)、时间解析容错处理。实际项目里,你可以根据自己的字段情况扩展FIELD_MAPPING,这个设计可以复用到大多数工业Excel文件上。
这里用SQLAlchemy做抽象层,好处是切换数据库只需要改一行连接字符串,迁移逻辑代码完全不用动。
pythonfrom sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
class DatabaseManager:
"""
支持SQLite和MySQL的数据库管理器
通过连接字符串区分目标数据库类型
"""
def __init__(self, db_type: str = "sqlite", **kwargs):
self.engine = self._create_engine(db_type, **kwargs)
def _create_engine(self, db_type: str, **kwargs) -> Engine:
if db_type == "sqlite":
db_path = kwargs.get("db_path", "industrial_data.db")
url = f"sqlite:///{db_path}"
elif db_type == "mysql":
host = kwargs.get("host", "localhost")
port = kwargs.get("port", 3306)
user = kwargs.get("user", "root")
password = kwargs.get("password", "")
database = kwargs.get("database", "industrial")
url = (
f"mysql+pymysql://{user}:{password}"
f"@{host}:{port}/{database}"
f"?charset=utf8mb4"
)
else:
raise ValueError(f"不支持的数据库类型: {db_type}")
return create_engine(url, echo=False)
def init_table(self, table_name: str):
"""建表——如果已存在则跳过"""
ddl = f"""
CREATE TABLE IF NOT EXISTS `{table_name}` (
id INTEGER PRIMARY KEY AUTOINCREMENT,
record_time DATETIME NOT NULL,
pressure FLOAT,
temperature FLOAT,
source_file VARCHAR(255),
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
# MySQL不支持AUTOINCREMENT关键字,需要换成AUTO_INCREMENT
if "mysql" in str(self.engine.url):
ddl = ddl.replace("AUTOINCREMENT", "AUTO_INCREMENT")
with self.engine.connect() as conn:
conn.execute(text(ddl))
conn.commit()
print(f" ✅ 表 [{table_name}] 初始化完成")
def write_dataframe(
self,
df: pd.DataFrame,
table_name: str,
chunk_size: int = 500
) -> int:
"""
分批写入DataFrame,返回实际写入行数
chunk_size控制每批次大小,防止单次提交过大导致内存溢出
"""
if df.empty:
return 0
df.to_sql(
name=table_name,
con=self.engine,
if_exists="append",
index=False,
chunksize=chunk_size,
method="multi" # 批量INSERT,比逐行快5-10倍
)
return len(df)
关于SQLite和MySQL的选择,有个实际经验分享一下:数据量在500万行以内、单机查询为主,SQLite完全够用,零配置、零维护,文件直接拷走就能用;超过这个量级、需要多人并发读写或者对接其他系统,就上MySQL。别一上来就觉得SQLite"不够专业"——很多工控场景里,SQLite的读写性能其实相当惊人。
pythonfrom tqdm import tqdm
import glob
import os
class MigrationPipeline:
"""
Excel → 数据库的完整迁移流水线
支持单文件、多文件、多Sheet批量处理
"""
def __init__(self, db_manager: DatabaseManager, table_name: str):
self.db = db_manager
self.table = table_name
self.stats = {"total": 0, "success": 0, "failed": 0}
def migrate_file(self, file_path: str):
"""迁移单个Excel文件(处理所有Sheet)"""
path = Path(file_path)
print(f"\n📂 处理文件: {path.name}")
# 获取所有Sheet名称
xl = pd.ExcelFile(file_path, engine="openpyxl")
sheets = xl.sheet_names
for sheet in tqdm(sheets, desc=f" Sheet进度", leave=False):
try:
reader = ExcelReader(file_path, sheet_name=sheet)
df = reader.read()
if df.empty:
continue
# 记录数据来源,方便后续追溯
df["source_file"] = f"{path.name}::{sheet}"
written = self.db.write_dataframe(df, self.table)
self.stats["success"] += written
self.stats["total"] += written
except Exception as e:
print(f" ❌ Sheet [{sheet}] 处理失败: {e}")
self.stats["failed"] += 1
def migrate_directory(self, dir_path: str, pattern: str = "*.xlsx"):
"""批量迁移目录下所有匹配的Excel文件"""
files = glob.glob(os.path.join(dir_path, pattern))
if not files:
print(f"⚠️ 目录 [{dir_path}] 下没有找到匹配文件")
return
print(f"🔍 共发现 {len(files)} 个文件,开始迁移...\n")
for f in tqdm(files, desc="总体进度"):
self.migrate_file(f)
self._print_summary()
def _print_summary(self):
print("\n" + "="*45)
print("📊 迁移完成报告")
print(f" 成功写入: {self.stats['success']:,} 行")
print(f" 失败批次: {self.stats['failed']} 个")
print("="*45)
把上面的模块拼起来,跑起来就是这样:
pythonif __name__ == "__main__":
# ---- 目标:SQLite(本地轻量场景)----
sqlite_db = DatabaseManager(
db_type="sqlite",
db_path="./factory_data.db"
)
sqlite_db.init_table("device_records")
pipeline = MigrationPipeline(sqlite_db, "device_records")
pipeline.migrate_directory("./excel_data/", pattern="*.xlsx")
# ---- 目标:MySQL(生产环境场景)----
# mysql_db = DatabaseManager(
# db_type="mysql",
# host="192.168.1.100",
# port=3306,
# user="data_admin",
# password="your_password",
# database="industrial_db"
# )
# mysql_db.init_table("device_records")
# pipeline = MigrationPipeline(mysql_db, "device_records")
# pipeline.migrate_directory("./excel_data/")

坑一:df.to_sql默认逐行插入,慢到怀疑人生。 一定要加method="multi",实测在MySQL上,10万行数据从原来的180秒降到了约22秒——差了将近8倍。
坑二:MySQL连接字符串里别忘了charset=utf8mb4。 工厂数据里经常有中文备注,utf8(三字节版本)遇到某些特殊字符会直接报错,utf8mb4才是正确选择。
坑三:大文件迁移时内存会撑爆。 如果单个Excel文件超过10万行,建议在ExcelReader里加分块读取逻辑(pandas的chunksize参数),每次只处理一批,处理完立刻释放内存,别想着一口气全读进来。
坑四:迁移前务必检查重复数据。 如果同一份数据被迁移了两次,数据库里就会出现重复记录。可以在建表时对(record_time, source_file)加唯一约束,或者在写入前先做去重查询。
数据迁移这件事,表面上看是个体力活,实际上藏着很多细节——数据清洗策略、异常容错机制、写入性能调优,每一块都能单独展开讲很久。
我见过有人写了个"一键迁移脚本",跑完发现数据量对不上,查了两天才发现是时区问题导致时间解析错乱,损失惨重。所以迁移完之后,一定要做数据核验:抽样比对原始Excel和数据库里的记录,验证行数、验证关键字段的统计值(最大值、最小值、均值),这步不能省。
你在项目里做过类似的数据迁移工作吗?遇到过什么奇葩的数据格式问题?欢迎在评论区分享,说不定你的经历能帮到下一个踩坑的人。
#Python #数据迁移 #SQLite #MySQL #工业数据
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!