编辑
2025-11-29
C#
00

目录

💡 问题分析:OPC DA开发的三大痛点
🔍 痛点一:代码重复率高
⚡ 痛点二:异步操作复杂
🛡️ 痛点三:错误处理分散
🔥 解决方案:通用OPC DA操作类设计
✨ 核心特性
💻 代码实战:完整的OPC DA管理器
🎯 使用示例:让复杂操作变简单
🎯 总结与展望

在工业自动化领域,OPC DA(OLE for Process Control Data Access)是连接上位机与PLC、DCS等设备的重要桥梁。然而,许多C#开发者在使用OPC DA时都会遇到同样的痛点:原生SDK代码冗长、异步操作复杂、错误处理繁琐。

想象一下,每次需要读写OPC数据时,都要写几十行重复代码?每个项目都要重新实现连接、订阅、异步回调逻辑?这不仅效率低下,还容易出错。

本文将通过实战代码,教你封装一个通用的OPC DA异步操作类,让复杂的工业通信变得像调用普通方法一样简单!

我记得最早用OPCDA 还是OPCDAAuto.dll ,这个只支持x86,不支持x64,这个OpcClientSdk472.dll优势还是有些,支持x64,不过有些地方不如OPCDAAuto。

💡 问题分析:OPC DA开发的三大痛点

🔍 痛点一:代码重复率高

原生OPC DA SDK需要大量样板代码:

  • 服务器发现与连接
  • 订阅组创建与配置
  • 数据项添加与管理
  • 异步回调处理

⚡ 痛点二:异步操作复杂

OPC DA的异步读写涉及:

  • 请求发起与句柄管理
  • 回调方法线程安全
  • UI更新的跨线程调用
  • 错误状态处理

🛡️ 痛点三:错误处理分散

异常可能出现在:

  • 连接建立阶段
  • 数据读写过程
  • 回调执行中
  • 资源释放时

🔥 解决方案:通用OPC DA操作类设计

基于上述痛点分析,我设计了一个通用的OpcDaManager类,具备以下特性:

✨ 核心特性

  • 链式调用:支持流畅的API设计
  • 事件驱动:统一的数据变化通知机制
  • 线程安全:自动处理跨线程UI更新
  • 异常统一:集中的错误处理与日志记录

💻 代码实战:完整的OPC DA管理器

C#
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using OpcClientSdk; using OpcClientSdk.Da; namespace AppOpcDa472 { /// <summary> /// OPC DA通用管理器 /// </summary> public class OpcDaManager : IDisposable { private TsCDaServer _opcServer; private TsCDaSubscription _subscription; private Dictionary<string, TsCDaItem> _itemDict; private bool _isConnected; #region 事件定义 /// <summary> /// 数据变化事件 /// </summary> public event EventHandler<OpcDataChangedEventArgs> DataChanged; /// <summary> /// 异步读取完成事件 /// </summary> public event EventHandler<OpcReadCompletedEventArgs> ReadCompleted; /// <summary> /// 异步写入完成事件 /// </summary> public event EventHandler<OpcWriteCompletedEventArgs> WriteCompleted; /// <summary> /// 错误发生事件 /// </summary> public event EventHandler<OpcErrorEventArgs> ErrorOccurred; #endregion #region 构造函数 public OpcDaManager() { _opcServer = new TsCDaServer(); _itemDict = new Dictionary<string, TsCDaItem>(); } #endregion #region 公共方法 /// <summary> /// 获取可用的OPC服务器列表 /// </summary> /// <returns>服务器名称列表</returns> public List<string> GetAvailableServers() { try { var servers = OpcDiscovery.GetServers(OpcSpecification.OPC_DA_20); var serverNames = new List<string>(); if (servers != null) { foreach (var server in servers) { serverNames.Add(server.ServerName); } } return serverNames; } catch (Exception ex) { OnErrorOccurred($"获取OPC服务器列表失败: {ex.Message}"); return new List<string>(); } } /// <summary> /// 连接到指定的OPC服务器 /// </summary> /// <param name="serverName">服务器名称</param> /// <param name="updateRate">数据更新频率(毫秒)</param> /// <returns>连接是否成功</returns> public bool Connect(string serverName, int updateRate = 1000) { try { if (_isConnected) { OnErrorOccurred("OPC服务器已连接,请先断开连接"); return false; } // 创建连接URL var opcUrl = new OpcUrl(OpcSpecification.OPC_DA_20, OpcUrlScheme.DA, serverName); // 连接服务器 _opcServer.Connect(opcUrl, null); // 创建订阅组 CreateSubscription(updateRate); _isConnected = true; return true; } catch (Exception ex) { OnErrorOccurred($"连接OPC服务器失败: {ex.Message}"); return false; } } /// <summary> /// 添加数据项到监控列表 /// </summary> /// <param name="itemName">数据项名称</param> /// <param name="clientHandle">客户端句柄(可选)</param> /// <returns>添加是否成功</returns> public bool AddItem(string itemName, int clientHandle = 0) { try { if (!_isConnected || _subscription == null) { OnErrorOccurred("OPC服务器未连接或订阅组未创建"); return false; } if (_itemDict.ContainsKey(itemName)) { OnErrorOccurred($"数据项 {itemName} 已存在"); return false; } // 创建数据项 var items = new TsCDaItem[1]; items[0] = new TsCDaItem { ItemName = itemName, ClientHandle = clientHandle == 0 ? itemName.GetHashCode() : clientHandle }; // 添加到订阅组 var results = _subscription.AddItems(items); if (results[0].Result.IsSuccess()) { _itemDict[itemName] = results[0]; return true; } else { OnErrorOccurred($"添加数据项失败: {results[0].Result.Description()}"); return false; } } catch (Exception ex) { OnErrorOccurred($"添加数据项异常: {ex.Message}"); return false; } } /// <summary> /// 异步读取指定数据项 /// </summary> /// <param name="itemName">数据项名称</param> /// <param name="requestId">请求ID(可选)</param> /// <returns>请求是否发起成功</returns> public bool ReadItemAsync(string itemName, int requestId = 0) { try { if (!_itemDict.ContainsKey(itemName)) { OnErrorOccurred($"数据项 {itemName} 不存在,请先添加"); return false; } var items = new TsCDaItem[] { _itemDict[itemName] }; IOpcRequest request; var results = _subscription.Read(items, requestId, new TsCDaReadCompleteEventHandler(OnReadComplete), out request); if (results[0].Result.IsError()) { OnErrorOccurred($"发起读取请求失败: {results[0].Result.Description()}"); return false; } return true; } catch (Exception ex) { OnErrorOccurred($"异步读取异常: {ex.Message}"); return false; } } /// <summary> /// 异步写入数据到指定项 /// </summary> /// <param name="itemName">数据项名称</param> /// <param name="value">要写入的值</param> /// <param name="requestId">请求ID(可选)</param> /// <returns>请求是否发起成功</returns> public bool WriteItemAsync(string itemName, object value, int requestId = 0) { try { if (!_itemDict.ContainsKey(itemName)) { OnErrorOccurred($"数据项 {itemName} 不存在,请先添加"); return false; } var writeValues = new TsCDaItemValue[1]; writeValues[0] = new TsCDaItemValue { ServerHandle = _itemDict[itemName].ServerHandle, Value = value }; IOpcRequest request; var results = _subscription.Write(writeValues, requestId, new TsCDaWriteCompleteEventHandler(OnWriteComplete), out request); if (results[0].Result.IsError()) { OnErrorOccurred($"发起写入请求失败: {results[0].Result.Description()}"); return false; } return true; } catch (Exception ex) { OnErrorOccurred($"异步写入异常: {ex.Message}"); return false; } } /// <summary> /// 启用/禁用数据变化通知 /// </summary> /// <param name="enabled">是否启用</param> public void SetNotificationEnabled(bool enabled) { try { _subscription?.SetEnabled(enabled); } catch (Exception ex) { OnErrorOccurred($"设置通知状态失败: {ex.Message}"); } } /// <summary> /// 断开OPC连接 /// </summary> public void Disconnect() { try { if (_subscription != null) { _subscription.DataChangedEvent -= OnDataChanged; _subscription.Dispose(); _subscription = null; } if (_opcServer != null && _isConnected) { _opcServer.Disconnect(); } _itemDict.Clear(); _isConnected = false; } catch (Exception ex) { OnErrorOccurred($"断开连接异常: {ex.Message}"); } } #endregion #region 私有方法 /// <summary> /// 创建订阅组 /// </summary> private void CreateSubscription(int updateRate) { var groupState = new TsCDaSubscriptionState { Name = $"OpcGroup_{DateTime.Now.Ticks}", ClientHandle = "OpcDaManager", Deadband = 0, UpdateRate = updateRate, KeepAlive = updateRate * 10 }; _subscription = (TsCDaSubscription)_opcServer.CreateSubscription(groupState); _subscription.DataChangedEvent += OnDataChanged; } /// <summary> /// 数据变化回调 /// </summary> private void OnDataChanged(object subscriptionHandle, object requestHandle, TsCDaItemValueResult[] values) { try { foreach (var value in values) { if (value.Result.IsSuccess()) { var args = new OpcDataChangedEventArgs { ItemName = GetItemNameByHandle(value.ClientHandle), Value = value.Value, Quality = value.Quality, Timestamp = value.Timestamp }; // 线程安全地触发事件 InvokeEvent(() => DataChanged?.Invoke(this, args)); } } } catch (Exception ex) { OnErrorOccurred($"数据变化处理异常: {ex.Message}"); } } /// <summary> /// 读取完成回调 /// </summary> private void OnReadComplete(object requestHandle, TsCDaItemValueResult[] values) { try { var args = new OpcReadCompletedEventArgs { RequestId = requestHandle, Values = values }; InvokeEvent(() => ReadCompleted?.Invoke(this, args)); } catch (Exception ex) { OnErrorOccurred($"读取完成处理异常: {ex.Message}"); } } /// <summary> /// 写入完成回调 /// </summary> private void OnWriteComplete(object requestHandle, OpcItemResult[] results) { try { var args = new OpcWriteCompletedEventArgs { RequestId = requestHandle, Results = results }; InvokeEvent(() => WriteCompleted?.Invoke(this, args)); } catch (Exception ex) { OnErrorOccurred($"写入完成处理异常: {ex.Message}"); } } /// <summary> /// 根据客户端句柄获取数据项名称 /// </summary> private string GetItemNameByHandle(object clientHandle) { foreach (var kvp in _itemDict) { if (kvp.Value.ClientHandle.Equals(clientHandle)) return kvp.Key; } return "Unknown"; } /// <summary> /// 线程安全地调用事件 /// </summary> private void InvokeEvent(Action action) { action(); } /// <summary> /// 触发错误事件 /// </summary> private void OnErrorOccurred(string message) { InvokeEvent(() => ErrorOccurred?.Invoke(this, new OpcErrorEventArgs { Message = message })); } #endregion #region IDisposable 实现 public void Dispose() { Disconnect(); _opcServer?.Dispose(); } #endregion } #region 事件参数类定义 /// <summary> /// 数据变化事件参数 /// </summary> public class OpcDataChangedEventArgs : EventArgs { public string ItemName { get; set; } public object Value { get; set; } public TsCDaQuality Quality { get; set; } public DateTime Timestamp { get; set; } } /// <summary> /// 读取完成事件参数 /// </summary> public class OpcReadCompletedEventArgs : EventArgs { public object RequestId { get; set; } public TsCDaItemValueResult[] Values { get; set; } } /// <summary> /// 写入完成事件参数 /// </summary> public class OpcWriteCompletedEventArgs : EventArgs { public object RequestId { get; set; } public OpcItemResult[] Results { get; set; } } /// <summary> /// 错误事件参数 /// </summary> public class OpcErrorEventArgs : EventArgs { public string Message { get; set; } } #endregion }

🎯 使用示例:让复杂操作变简单

C#
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Spectre.Console; namespace AppOpcDa472 { internal class Program { private static Table _dataTable; private static Dictionary<string, object> _latestValues = new Dictionary<string, object>(); static void Main(string[] args) { Console.OutputEncoding = Encoding.UTF8; // 显示欢迎横幅 ShowWelcomeBanner(); // 创建OPC DA管理器实例 using (var opcManager = new OpcDaManager()) { // 订阅事件 opcManager.DataChanged += OnDataChanged; opcManager.ReadCompleted += OnReadCompleted; opcManager.WriteCompleted += OnWriteCompleted; opcManager.ErrorOccurred += OnErrorOccurred; try { // 连接到Kepware OPC服务器 string serverName = "Kepware.KEPServerEX.V6"; AnsiConsole.Status() .Start($"正在连接到OPC服务器: [yellow]{serverName}[/]", ctx => { ctx.Spinner(Spinner.Known.Star); ctx.SpinnerStyle(Style.Parse("green")); System.Threading.Thread.Sleep(1000); // 模拟连接时间 }); bool connected = opcManager.Connect(serverName, 500); // 500ms更新频率 if (connected) { AnsiConsole.MarkupLine("[green]✓ OPC服务器连接成功![/]"); // 添加要监控的数据项 string[] itemNames = { "LMES.W1.Progress", "LMES.W1.Progress_0021", "LMES.W1.Progress_0026", "LMES.W1.Status", "LMES.W1.Test2" }; ShowItemsAddition(opcManager, itemNames); // 启用数据变化通知 opcManager.SetNotificationEnabled(true); AnsiConsole.MarkupLine("[cyan]📡 数据变化通知已启用[/]"); // 演示异步读取 ShowAsyncOperations(opcManager, itemNames); // 创建数据监控表格 CreateDataTable(); AnsiConsole.MarkupLine("\n[bold blue]监控数据变化中...[/] [dim]按任意键退出[/]"); AnsiConsole.Write(new Rule("[yellow]实时数据监控[/]").RuleStyle("grey")); // 等待用户输入退出 Console.ReadKey(); } else { ShowConnectionError(); } } catch (Exception ex) { AnsiConsole.WriteException(ex); } finally { AnsiConsole.Status() .Start("正在断开连接...", ctx => { ctx.Spinner(Spinner.Known.Dots); ctx.SpinnerStyle(Style.Parse("red")); opcManager.Disconnect(); System.Threading.Thread.Sleep(500); }); AnsiConsole.MarkupLine("[red]程序结束[/]"); } } AnsiConsole.MarkupLine("\n[dim]按任意键关闭...[/]"); Console.ReadKey(); } private static void ShowWelcomeBanner() { var figlet = new FigletText("OPC DA Manager") .Centered() .Color(Color.Blue); AnsiConsole.Write(figlet); AnsiConsole.Write(new Rule("[yellow]OPC DA 通信测试程序[/]").RuleStyle("grey")); AnsiConsole.WriteLine(); } private static void ShowItemsAddition(OpcDaManager opcManager, string[] itemNames) { var table = new Table(); table.AddColumn("[bold]数据项[/]"); table.AddColumn("[bold]状态[/]"); foreach (string itemName in itemNames) { if (opcManager.AddItem(itemName)) { table.AddRow(itemName, "[green]✓ 添加成功[/]"); _latestValues[itemName] = "等待数据..."; } else { table.AddRow(itemName, "[red]✗ 添加失败[/]"); } } AnsiConsole.Write(table); } private static void ShowAsyncOperations(OpcDaManager opcManager, string[] itemNames) { AnsiConsole.MarkupLine("\n[bold cyan]开始异步操作测试:[/]"); var panel = new Panel("正在执行异步读取操作...") .Header("[bold]异步读取[/]") .BorderStyle(Style.Parse("cyan")); AnsiConsole.Write(panel); foreach (string itemName in itemNames) { opcManager.ReadItemAsync(itemName); } // 演示异步写入 var result = opcManager.WriteItemAsync(itemNames[2], 120); if (result) { AnsiConsole.MarkupLine("[green]✓ 异步写入请求已发送[/]"); } } private static void CreateDataTable() { _dataTable = new Table(); _dataTable.AddColumn("[bold blue]数据项[/]"); _dataTable.AddColumn("[bold green]当前值[/]"); _dataTable.AddColumn("[bold yellow]质量[/]"); _dataTable.AddColumn("[bold cyan]时间戳[/]"); _dataTable.Border(TableBorder.Rounded); _dataTable.Title("[bold]实时数据监控[/]"); } private static void ShowConnectionError() { var panel = new Panel( "[red]连接OPC服务器失败![/]\n\n" + "[yellow]请确保:[/]\n" + "• Kepware服务器正在运行\n" + "• 服务器名称正确\n" + "• OPC客户端权限配置正确") .Header("[red]连接错误[/]") .BorderStyle(Style.Parse("red")); AnsiConsole.Write(panel); } private static void UpdateDataDisplay(string itemName, object value, string quality, DateTime timestamp) { _latestValues[itemName] = value; // 清除并重新绘制表格 Console.Clear(); ShowWelcomeBanner(); var liveTable = new Table(); liveTable.AddColumn("[bold blue]数据项[/]"); liveTable.AddColumn("[bold green]当前值[/]"); liveTable.AddColumn("[bold yellow]质量[/]"); liveTable.AddColumn("[bold cyan]时间戳[/]"); liveTable.Border(TableBorder.Rounded); liveTable.Title("[bold]实时数据监控[/]"); foreach (var kvp in _latestValues) { var valueColor = kvp.Key == itemName ? "green" : "white"; liveTable.AddRow( kvp.Key, $"[{valueColor}]{kvp.Value}[/]", quality, timestamp.ToString("HH:mm:ss.fff") ); } AnsiConsole.Write(liveTable); AnsiConsole.MarkupLine("\n[dim]按任意键退出[/]"); } private static void OpcManager_WriteCompleted(object sender, OpcWriteCompletedEventArgs e) { AnsiConsole.MarkupLine("[green]✓ 写入数据完成![/]"); } // 数据变化事件处理 private static void OnDataChanged(object sender, OpcDataChangedEventArgs e) { UpdateDataDisplay(e.ItemName, e.Value, e.Quality.ToString(), e.Timestamp); // 同时显示变化通知 var markup = $"[bold green]📊 数据变化[/] [cyan]{e.ItemName}[/]: [yellow]{e.Value}[/] [dim]({e.Timestamp:HH:mm:ss.fff})[/]"; AnsiConsole.MarkupLine(markup); } // 异步读取完成事件处理 private static void OnReadCompleted(object sender, OpcReadCompletedEventArgs e) { var panel = new Panel($"请求ID: [cyan]{e.RequestId}[/]") .Header("[bold green]📖 读取完成[/]") .BorderStyle(Style.Parse("green")); foreach (var value in e.Values) { if (value.Result.IsSuccess()) { AnsiConsole.MarkupLine($"[green]✓ 读取成功:[/] [yellow]{value.Value}[/] [dim](质量: {value.Quality})[/]"); } else { AnsiConsole.MarkupLine($"[red]✗ 读取失败:[/] {value.Result.Description()}"); } } } // 异步写入完成事件处理 private static void OnWriteCompleted(object sender, OpcWriteCompletedEventArgs e) { var panel = new Panel($"请求ID: [cyan]{e.RequestId}[/]") .Header("[bold blue]📝 写入完成[/]") .BorderStyle(Style.Parse("blue")); AnsiConsole.Write(panel); foreach (var result in e.Results) { if (result.Result.IsSuccess()) { AnsiConsole.MarkupLine("[green]✓ 写入成功[/]"); } else { AnsiConsole.MarkupLine($"[red]✗ 写入失败:[/] {result.Result.Description()}"); } } } // 错误事件处理 private static void OnErrorOccurred(object sender, OpcErrorEventArgs e) { var panel = new Panel($"[red]{e.Message}[/]") .Header("[bold red]❌ 错误[/]") .BorderStyle(Style.Parse("red")); AnsiConsole.Write(panel); } } }

image.png

🎯 总结与展望

通过封装这个通用OPC DA管理器,我们成功解决了三大核心问题:

  1. 📝 代码复用:从每次几十行代码缩减到几行调用
  2. ⚡ 异步简化:统一的事件驱动模型,告别复杂回调
  3. 🛡️ 错误统一:集中的异常处理,提升系统稳定性

这个封装类不仅适用于当前项目,更可以作为企业级工业通信基础组件,在多个项目中复用。


🤔 互动讨论

你在OPC开发中还遇到过哪些棘手问题?欢迎在评论区分享你的经验和困惑!

👍 如果这篇文章对你有帮助,请转发给更多需要的同行!让我们一起推动C#工业通信技术的普及和发展!

#C#开发 #OPC通信 #工业自动化 #编程技巧

本文作者:技术老小子

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!