2026-05-18
Python
0

目录

🏭 那些年,我们被日志坑过的经历
🔍 工业日志的特殊性:哪里和普通日志不一样?
🏗️ 系统架构设计:分层才是王道
🚀 核心实现:从基础到进阶
第一步:构建线程安全的日志基础设施
第二步:自定义Formatter,注入工业上下文
第三步:设备日志门面——业务代码的唯一入口
第四步:实战组装——模拟PLC通信场景
⚠️ 踩坑预警:这几个地方最容易出问题

在真实的工厂车间里,一台设备的异常报警可能意味着数十万的损失。而那条关键的错误日志,往往就是唯一的"案发现场还原"线索。


🏭 那些年,我们被日志坑过的经历

说真的,工控软件的日志问题,是我职业生涯里踩得最深的坑之一。

刚入行那会儿,我负责维护一套PLC通信程序。某天凌晨两点,产线突然停了。翻遍整个系统,日志文件里只有寥寥几行print("error")——连时间戳都没有。那一夜,我和同事对着设备发呆了三个小时,愣是没定位到根因。

这种痛,相信很多做工控、MES、SCADA系统的朋友都懂。

工业场景的日志需求,和普通Web应用完全不是一个量级:多线程并发写入、跨设备数据聚合、毫秒级时序追踪、海量数据的长期归档……随便拎出一条,都够折腾一阵子。

本文就从实战出发,带你搭一套真正能在工业环境里"扛造"的Python日志系统。代码全部可运行,架构可直接迁移到生产项目。


🔍 工业日志的特殊性:哪里和普通日志不一样?

先把问题说清楚,再谈方案。

普通应用的日志,核心诉求是记录和排错。但工业系统的日志,承担的职责要复杂得多——

时序精度要求极高。 一条焊接指令和一条质检结果,如果时间戳偏差超过50ms,数据关联就会失效。这不是"差不多"能过去的事。

多源并发是常态。 同一时刻,温控模块、运动控制模块、视觉检测模块可能同时在写日志。锁竞争、写入顺序错乱,是家常便饭。

日志本身就是业务数据。 工厂的质量追溯、工艺优化,全靠历史日志。这意味着日志不能丢、不能乱、还得方便查。

存储压力大。 一条产线一天可能产生几个GB的原始日志。怎么压缩、怎么分片、怎么归档,都得提前设计好。

带着这四个问题,咱们开始搭架子。


🏗️ 系统架构设计:分层才是王道

太多项目,把所有日志逻辑塞进一个utils.py,然后在几十个模块里importimport去。这玩意儿,短期看没问题,长期必然是一团乱麻。

工业日志系统,我建议采用三层架构

image.png

业务层不关心日志怎么存、存哪里。门面层负责统一收口、注入设备ID/工单号等上下文。处理器层各司其职,互不干扰。


🚀 核心实现:从基础到进阶

第一步:构建线程安全的日志基础设施

Python标准库的logging模块本身是线程安全的——但很多人不知道,FileHandler在Windows下的多进程写入是有问题的。工控机上跑多进程采集的场景,必须用RotatingFileHandler配合文件锁,或者直接上队列方案。

python
import logging import logging.handlers import threading import queue from datetime import datetime from pathlib import Path class IndustrialLoggerFactory: """ 工业日志工厂类 核心设计:单例 + 异步队列写入,避免IO阻塞业务线程 """ _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self.log_dir = Path("logs") self.log_dir.mkdir(exist_ok=True) # 异步队列:业务线程只管往队列扔,IO线程负责实际写入 self._log_queue = queue.Queue(maxsize=10000) self._setup_handlers() self._start_async_worker() self._initialized = True def _setup_handlers(self): """配置多目标Handler""" self.logger = logging.getLogger("industrial_system") self.logger.setLevel(logging.DEBUG) # 按日期滚动的文件Handler file_handler = logging.handlers.TimedRotatingFileHandler( filename=self.log_dir / "system.log", when="midnight", # 每天零点切割 interval=1, backupCount=90, # 保留90天 encoding="utf-8" ) # 关键告警单独存一份,方便快速检索 alarm_handler = logging.handlers.RotatingFileHandler( filename=self.log_dir / "alarm.log", maxBytes=50 * 1024 * 1024, # 50MB切割 backupCount=20, encoding="utf-8" ) alarm_handler.setLevel(logging.WARNING) formatter = IndustrialFormatter() file_handler.setFormatter(formatter) alarm_handler.setFormatter(formatter) self.logger.addHandler(file_handler) self.logger.addHandler(alarm_handler) def _start_async_worker(self): """启动后台IO线程,消费日志队列""" worker = threading.Thread( target=self._async_write_worker, daemon=True, # 随主进程退出,不阻塞关闭 name="LogIOWorker" ) worker.start() def _async_write_worker(self): while True: try: record = self._log_queue.get(timeout=1) if record is None: # 优雅停止信号 break self.logger.handle(record) except queue.Empty: continue

这里有个细节值得注意:daemon=True让IO线程随主进程退出,但这意味着程序崩溃时队列里可能还有未写入的日志。生产环境里,建议在主程序的finally块里发送停止信号,等队列清空再退出。


第二步:自定义Formatter,注入工业上下文

标准的日志格式对工业系统来说信息量严重不足。我们需要在每条日志里自动带上设备编号、工单号、操作员ID这些关键字段。

python
import json import traceback class IndustrialFormatter(logging.Formatter): """ 结构化日志格式器 输出JSON格式,方便后续用ELK或自研平台解析 """ # 线程本地存储:每个线程独立维护自己的上下文 _context = threading.local() @classmethod def set_context(cls, **kwargs): """在业务代码入口处设置上下文,后续日志自动携带""" for key, value in kwargs.items(): setattr(cls._context, key, value) @classmethod def clear_context(cls): cls._context.__dict__.clear() def format(self, record: logging.LogRecord) -> str: # 基础字段 log_entry = { "timestamp": datetime.fromtimestamp(record.created).strftime( "%Y-%m-%d %H:%M:%S.%f" )[:-3], # 精确到毫秒 "level": record.levelname, "module": record.module, "func": record.funcName, "line": record.lineno, "message": record.getMessage(), # 从线程本地存储中读取业务上下文 "device_id": getattr(self._context, "device_id", "UNKNOWN"), "work_order": getattr(self._context, "work_order", None), "operator": getattr(self._context, "operator", None), } # 异常信息单独格式化,保留完整堆栈 if record.exc_info: log_entry["exception"] = { "type": record.exc_info[0].__name__, "message": str(record.exc_info[1]), "traceback": traceback.format_exception(*record.exc_info) } # 过滤None值,减少存储冗余 log_entry = {k: v for k, v in log_entry.items() if v is not None} return json.dumps(log_entry, ensure_ascii=False)

线程本地存储(threading.local())是这里的关键。 每个采集线程处理不同设备,上下文互不污染。在线程入口处调一次set_context(device_id="PLC_001"),后续这个线程产生的所有日志都会自动带上设备ID。


第三步:设备日志门面——业务代码的唯一入口

有了底层基础设施,再封装一个面向业务的门面类。让业务代码写日志时,不需要关心任何底层细节。

python
from contextlib import contextmanager from functools import wraps import time class DeviceLogger: """ 设备日志门面 业务模块统一通过这个类记录日志 """ def __init__(self, device_id: str, device_type: str = "GENERIC"): self.device_id = device_id self.device_type = device_type self._factory = IndustrialLoggerFactory() # 初始化时就把设备信息注入上下文 IndustrialFormatter.set_context( device_id=device_id, device_type=device_type ) def info(self, message: str, **extra): self._log(logging.INFO, message, **extra) def warning(self, message: str, **extra): self._log(logging.WARNING, message, **extra) def error(self, message: str, exc_info=False, **extra): self._log(logging.ERROR, message, exc_info=exc_info, **extra) def critical(self, message: str, **extra): self._log(logging.CRITICAL, message, **extra) def _log(self, level: int, message: str, exc_info=False, **extra): record = logging.LogRecord( name="industrial_system", level=level, pathname="", lineno=0, msg=message, args=(), exc_info=logging.sys.exc_info() if exc_info else None ) # 把extra字段挂到record上,Formatter可以读取 for key, value in extra.items(): setattr(record, key, value) # 非阻塞放入队列 try: self._factory._log_queue.put_nowait(record) except queue.Full: # 队列满了,这条日志只能丢弃——但这本身也是个告警信号 print(f"[WARN] Log queue full, dropping: {message}") @contextmanager def operation_trace(self, operation_name: str): """ 上下文管理器:自动记录操作耗时 用法:with logger.operation_trace("焊接动作"): """ start = time.perf_counter() self.info(f"[START] {operation_name}") try: yield elapsed = (time.perf_counter() - start) * 1000 self.info(f"[END] {operation_name} | 耗时: {elapsed:.2f}ms") except Exception as e: elapsed = (time.perf_counter() - start) * 1000 self.error( f"[FAILED] {operation_name} | 耗时: {elapsed:.2f}ms | 原因: {e}", exc_info=True ) raise # 异常继续向上传播,不在日志层吞掉 def log_device_action(operation: str): """ 装饰器版本:给函数自动加上日志追踪 适合那些每次调用都需要记录的设备操作函数 """ def decorator(func): @wraps(func) def wrapper(self, *args, **kwargs): # 假设self上有logger属性 logger = getattr(self, 'logger', None) if logger and hasattr(logger, 'operation_trace'): with logger.operation_trace(operation): return func(self, *args, **kwargs) return func(self, *args, **kwargs) return wrapper return decorator

第四步:实战组装——模拟PLC通信场景

把上面这些拼起来,看看实际用起来是什么感觉:

python
class PLCController: """PLC控制器示例:展示日志系统在真实业务中的用法""" def __init__(self, plc_id: str): self.plc_id = plc_id self.logger = DeviceLogger( device_id=plc_id, device_type="SIEMENS_S7" ) self.connected = False @log_device_action("PLC连接建立") def connect(self, ip: str, port: int = 102): """建立PLC连接""" self.logger.info(f"尝试连接 {ip}:{port}") # 模拟连接逻辑... self.connected = True self.logger.info("连接成功", ip=ip, port=port) def read_registers(self, start_addr: int, count: int) -> list: """读取寄存器数据""" if not self.connected: self.logger.error("读取失败:设备未连接") raise ConnectionError("PLC未连接") with self.logger.operation_trace(f"读取寄存器 DB{start_addr}[0..{count}]"): # 实际项目里这里是snap7或pycomm3的调用 data = [0] * count self.logger.info( f"寄存器读取完成", start_addr=start_addr, count=count, sample_value=data[0] if data else None ) return data def write_coil(self, addr: int, value: bool): """写线圈——这类操作必须有完整的操作记录""" IndustrialFormatter.set_context( device_id=self.plc_id, action_type="WRITE", target_addr=addr ) self.logger.warning( f"写入线圈 addr={addr}, value={value}", safety_level="MEDIUM" ) # 使用示例 if __name__ == "__main__": # 设置当前工单上下文 IndustrialFormatter.set_context( work_order="WO-20260318-001", operator="张工" ) plc = PLCController("PLC_LINE_A_01") plc.connect("192.168.1.100") data = plc.read_registers(start_addr=100, count=10) plc.write_coil(addr=200, value=True)

运行后,logs/system.log里的每一行都是结构化JSON,长这样:

json
{ "timestamp": "2026-03-18 07:16:49.698", "level": "INFO", "module": "plc_controller", "func": "connect", "line": 28, "message": "连接成功", "device_id": "PLC_LINE_A_01", "device_type": "SIEMENS_S7", "work_order": "WO-20260318-001", "operator": "张工", "ip": "192.168.1.100", "port": 102 }

image.png


⚠️ 踩坑预警:这几个地方最容易出问题

坑1:日志队列满了怎么办? 上面代码里用的是put_nowait,队列满直接丢弃。这在大多数场景是合理的——日志不能反过来阻塞业务。但如果你的场景要求日志绝对不丢,可以改成put(block=True, timeout=0.1),超时后写到应急文件。

坑2:Windows下的文件时间戳精度问题。 datetime.now()在Windows上的精度只有约15ms,不足以区分高频操作。建议改用time.perf_counter()计算相对时间差,或者引入time.time_ns()

坑3:JSON序列化遇到numpy类型。 工控采集数据里经常有numpy.float32numpy.int64这类类型,直接json.dumps会报错。需要自定义一个JSONEncoder

python
import numpy as np class IndustrialJSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() return super().default(obj) # 使用时替换json.dumps的encoder参数 json.dumps(log_entry, cls=IndustrialJSONEncoder, ensure_ascii=False)

坑4:多进程场景下的Handler冲突。 如果你的系统是多进程架构(比如用multiprocessing跑多个设备采集进程),每个进程都有自己的FileHandler,同时写同一个文件会导致内容交错。标准解法是用QueueHandler + 独立的日志服务进程,或者每个进程写独

本文作者:技术老小子

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!