在工业自动化领域,OPC DA(OLE for Process Control Data Access)是连接上位机与PLC、DCS等设备的重要桥梁。然而,许多C#开发者在使用OPC DA时都会遇到同样的痛点:原生SDK代码冗长、异步操作复杂、错误处理繁琐。
想象一下,每次需要读写OPC数据时,都要写几十行重复代码?每个项目都要重新实现连接、订阅、异步回调逻辑?这不仅效率低下,还容易出错。
本文将通过实战代码,教你封装一个通用的OPC DA异步操作类,让复杂的工业通信变得像调用普通方法一样简单!
我记得最早用OPCDA 还是OPCDAAuto.dll ,这个只支持x86,不支持x64,这个OpcClientSdk472.dll优势还是有些,支持x64,不过有些地方不如OPCDAAuto。
原生OPC DA SDK需要大量样板代码:
OPC DA的异步读写涉及:
异常可能出现在:
基于上述痛点分析,我设计了一个通用的OpcDaManager类,具备以下特性:
C#using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using OpcClientSdk;
using OpcClientSdk.Da;
namespace AppOpcDa472
{
/// <summary>
/// OPC DA通用管理器
/// </summary>
public class OpcDaManager : IDisposable
{
private TsCDaServer _opcServer;
private TsCDaSubscription _subscription;
private Dictionary<string, TsCDaItem> _itemDict;
private bool _isConnected;
#region 事件定义
/// <summary>
/// 数据变化事件
/// </summary>
public event EventHandler<OpcDataChangedEventArgs> DataChanged;
/// <summary>
/// 异步读取完成事件
/// </summary>
public event EventHandler<OpcReadCompletedEventArgs> ReadCompleted;
/// <summary>
/// 异步写入完成事件
/// </summary>
public event EventHandler<OpcWriteCompletedEventArgs> WriteCompleted;
/// <summary>
/// 错误发生事件
/// </summary>
public event EventHandler<OpcErrorEventArgs> ErrorOccurred;
#endregion
#region 构造函数
public OpcDaManager()
{
_opcServer = new TsCDaServer();
_itemDict = new Dictionary<string, TsCDaItem>();
}
#endregion
#region 公共方法
/// <summary>
/// 获取可用的OPC服务器列表
/// </summary>
/// <returns>服务器名称列表</returns>
public List<string> GetAvailableServers()
{
try
{
var servers = OpcDiscovery.GetServers(OpcSpecification.OPC_DA_20);
var serverNames = new List<string>();
if (servers != null)
{
foreach (var server in servers)
{
serverNames.Add(server.ServerName);
}
}
return serverNames;
}
catch (Exception ex)
{
OnErrorOccurred($"获取OPC服务器列表失败: {ex.Message}");
return new List<string>();
}
}
/// <summary>
/// 连接到指定的OPC服务器
/// </summary>
/// <param name="serverName">服务器名称</param>
/// <param name="updateRate">数据更新频率(毫秒)</param>
/// <returns>连接是否成功</returns>
public bool Connect(string serverName, int updateRate = 1000)
{
try
{
if (_isConnected)
{
OnErrorOccurred("OPC服务器已连接,请先断开连接");
return false;
}
// 创建连接URL
var opcUrl = new OpcUrl(OpcSpecification.OPC_DA_20, OpcUrlScheme.DA, serverName);
// 连接服务器
_opcServer.Connect(opcUrl, null);
// 创建订阅组
CreateSubscription(updateRate);
_isConnected = true;
return true;
}
catch (Exception ex)
{
OnErrorOccurred($"连接OPC服务器失败: {ex.Message}");
return false;
}
}
/// <summary>
/// 添加数据项到监控列表
/// </summary>
/// <param name="itemName">数据项名称</param>
/// <param name="clientHandle">客户端句柄(可选)</param>
/// <returns>添加是否成功</returns>
public bool AddItem(string itemName, int clientHandle = 0)
{
try
{
if (!_isConnected || _subscription == null)
{
OnErrorOccurred("OPC服务器未连接或订阅组未创建");
return false;
}
if (_itemDict.ContainsKey(itemName))
{
OnErrorOccurred($"数据项 {itemName} 已存在");
return false;
}
// 创建数据项
var items = new TsCDaItem[1];
items[0] = new TsCDaItem
{
ItemName = itemName,
ClientHandle = clientHandle == 0 ? itemName.GetHashCode() : clientHandle
};
// 添加到订阅组
var results = _subscription.AddItems(items);
if (results[0].Result.IsSuccess())
{
_itemDict[itemName] = results[0];
return true;
}
else
{
OnErrorOccurred($"添加数据项失败: {results[0].Result.Description()}");
return false;
}
}
catch (Exception ex)
{
OnErrorOccurred($"添加数据项异常: {ex.Message}");
return false;
}
}
/// <summary>
/// 异步读取指定数据项
/// </summary>
/// <param name="itemName">数据项名称</param>
/// <param name="requestId">请求ID(可选)</param>
/// <returns>请求是否发起成功</returns>
public bool ReadItemAsync(string itemName, int requestId = 0)
{
try
{
if (!_itemDict.ContainsKey(itemName))
{
OnErrorOccurred($"数据项 {itemName} 不存在,请先添加");
return false;
}
var items = new TsCDaItem[] { _itemDict[itemName] };
IOpcRequest request;
var results = _subscription.Read(items, requestId, new TsCDaReadCompleteEventHandler(OnReadComplete), out request);
if (results[0].Result.IsError())
{
OnErrorOccurred($"发起读取请求失败: {results[0].Result.Description()}");
return false;
}
return true;
}
catch (Exception ex)
{
OnErrorOccurred($"异步读取异常: {ex.Message}");
return false;
}
}
/// <summary>
/// 异步写入数据到指定项
/// </summary>
/// <param name="itemName">数据项名称</param>
/// <param name="value">要写入的值</param>
/// <param name="requestId">请求ID(可选)</param>
/// <returns>请求是否发起成功</returns>
public bool WriteItemAsync(string itemName, object value, int requestId = 0)
{
try
{
if (!_itemDict.ContainsKey(itemName))
{
OnErrorOccurred($"数据项 {itemName} 不存在,请先添加");
return false;
}
var writeValues = new TsCDaItemValue[1];
writeValues[0] = new TsCDaItemValue
{
ServerHandle = _itemDict[itemName].ServerHandle,
Value = value
};
IOpcRequest request;
var results = _subscription.Write(writeValues, requestId, new TsCDaWriteCompleteEventHandler(OnWriteComplete), out request);
if (results[0].Result.IsError())
{
OnErrorOccurred($"发起写入请求失败: {results[0].Result.Description()}");
return false;
}
return true;
}
catch (Exception ex)
{
OnErrorOccurred($"异步写入异常: {ex.Message}");
return false;
}
}
/// <summary>
/// 启用/禁用数据变化通知
/// </summary>
/// <param name="enabled">是否启用</param>
public void SetNotificationEnabled(bool enabled)
{
try
{
_subscription?.SetEnabled(enabled);
}
catch (Exception ex)
{
OnErrorOccurred($"设置通知状态失败: {ex.Message}");
}
}
/// <summary>
/// 断开OPC连接
/// </summary>
public void Disconnect()
{
try
{
if (_subscription != null)
{
_subscription.DataChangedEvent -= OnDataChanged;
_subscription.Dispose();
_subscription = null;
}
if (_opcServer != null && _isConnected)
{
_opcServer.Disconnect();
}
_itemDict.Clear();
_isConnected = false;
}
catch (Exception ex)
{
OnErrorOccurred($"断开连接异常: {ex.Message}");
}
}
#endregion
#region 私有方法
/// <summary>
/// 创建订阅组
/// </summary>
private void CreateSubscription(int updateRate)
{
var groupState = new TsCDaSubscriptionState
{
Name = $"OpcGroup_{DateTime.Now.Ticks}",
ClientHandle = "OpcDaManager",
Deadband = 0,
UpdateRate = updateRate,
KeepAlive = updateRate * 10
};
_subscription = (TsCDaSubscription)_opcServer.CreateSubscription(groupState);
_subscription.DataChangedEvent += OnDataChanged;
}
/// <summary>
/// 数据变化回调
/// </summary>
private void OnDataChanged(object subscriptionHandle, object requestHandle, TsCDaItemValueResult[] values)
{
try
{
foreach (var value in values)
{
if (value.Result.IsSuccess())
{
var args = new OpcDataChangedEventArgs
{
ItemName = GetItemNameByHandle(value.ClientHandle),
Value = value.Value,
Quality = value.Quality,
Timestamp = value.Timestamp
};
// 线程安全地触发事件
InvokeEvent(() => DataChanged?.Invoke(this, args));
}
}
}
catch (Exception ex)
{
OnErrorOccurred($"数据变化处理异常: {ex.Message}");
}
}
/// <summary>
/// 读取完成回调
/// </summary>
private void OnReadComplete(object requestHandle, TsCDaItemValueResult[] values)
{
try
{
var args = new OpcReadCompletedEventArgs
{
RequestId = requestHandle,
Values = values
};
InvokeEvent(() => ReadCompleted?.Invoke(this, args));
}
catch (Exception ex)
{
OnErrorOccurred($"读取完成处理异常: {ex.Message}");
}
}
/// <summary>
/// 写入完成回调
/// </summary>
private void OnWriteComplete(object requestHandle, OpcItemResult[] results)
{
try
{
var args = new OpcWriteCompletedEventArgs
{
RequestId = requestHandle,
Results = results
};
InvokeEvent(() => WriteCompleted?.Invoke(this, args));
}
catch (Exception ex)
{
OnErrorOccurred($"写入完成处理异常: {ex.Message}");
}
}
/// <summary>
/// 根据客户端句柄获取数据项名称
/// </summary>
private string GetItemNameByHandle(object clientHandle)
{
foreach (var kvp in _itemDict)
{
if (kvp.Value.ClientHandle.Equals(clientHandle))
return kvp.Key;
}
return "Unknown";
}
/// <summary>
/// 线程安全地调用事件
/// </summary>
private void InvokeEvent(Action action)
{
action();
}
/// <summary>
/// 触发错误事件
/// </summary>
private void OnErrorOccurred(string message)
{
InvokeEvent(() => ErrorOccurred?.Invoke(this, new OpcErrorEventArgs { Message = message }));
}
#endregion
#region IDisposable 实现
public void Dispose()
{
Disconnect();
_opcServer?.Dispose();
}
#endregion
}
#region 事件参数类定义
/// <summary>
/// 数据变化事件参数
/// </summary>
public class OpcDataChangedEventArgs : EventArgs
{
public string ItemName { get; set; }
public object Value { get; set; }
public TsCDaQuality Quality { get; set; }
public DateTime Timestamp { get; set; }
}
/// <summary>
/// 读取完成事件参数
/// </summary>
public class OpcReadCompletedEventArgs : EventArgs
{
public object RequestId { get; set; }
public TsCDaItemValueResult[] Values { get; set; }
}
/// <summary>
/// 写入完成事件参数
/// </summary>
public class OpcWriteCompletedEventArgs : EventArgs
{
public object RequestId { get; set; }
public OpcItemResult[] Results { get; set; }
}
/// <summary>
/// 错误事件参数
/// </summary>
public class OpcErrorEventArgs : EventArgs
{
public string Message { get; set; }
}
#endregion
}
C#using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Spectre.Console;
namespace AppOpcDa472
{
internal class Program
{
private static Table _dataTable;
private static Dictionary<string, object> _latestValues = new Dictionary<string, object>();
static void Main(string[] args)
{
Console.OutputEncoding = Encoding.UTF8;
// 显示欢迎横幅
ShowWelcomeBanner();
// 创建OPC DA管理器实例
using (var opcManager = new OpcDaManager())
{
// 订阅事件
opcManager.DataChanged += OnDataChanged;
opcManager.ReadCompleted += OnReadCompleted;
opcManager.WriteCompleted += OnWriteCompleted;
opcManager.ErrorOccurred += OnErrorOccurred;
try
{
// 连接到Kepware OPC服务器
string serverName = "Kepware.KEPServerEX.V6";
AnsiConsole.Status()
.Start($"正在连接到OPC服务器: [yellow]{serverName}[/]", ctx =>
{
ctx.Spinner(Spinner.Known.Star);
ctx.SpinnerStyle(Style.Parse("green"));
System.Threading.Thread.Sleep(1000); // 模拟连接时间
});
bool connected = opcManager.Connect(serverName, 500); // 500ms更新频率
if (connected)
{
AnsiConsole.MarkupLine("[green]✓ OPC服务器连接成功![/]");
// 添加要监控的数据项
string[] itemNames = {
"LMES.W1.Progress",
"LMES.W1.Progress_0021",
"LMES.W1.Progress_0026",
"LMES.W1.Status",
"LMES.W1.Test2"
};
ShowItemsAddition(opcManager, itemNames);
// 启用数据变化通知
opcManager.SetNotificationEnabled(true);
AnsiConsole.MarkupLine("[cyan]📡 数据变化通知已启用[/]");
// 演示异步读取
ShowAsyncOperations(opcManager, itemNames);
// 创建数据监控表格
CreateDataTable();
AnsiConsole.MarkupLine("\n[bold blue]监控数据变化中...[/] [dim]按任意键退出[/]");
AnsiConsole.Write(new Rule("[yellow]实时数据监控[/]").RuleStyle("grey"));
// 等待用户输入退出
Console.ReadKey();
}
else
{
ShowConnectionError();
}
}
catch (Exception ex)
{
AnsiConsole.WriteException(ex);
}
finally
{
AnsiConsole.Status()
.Start("正在断开连接...", ctx =>
{
ctx.Spinner(Spinner.Known.Dots);
ctx.SpinnerStyle(Style.Parse("red"));
opcManager.Disconnect();
System.Threading.Thread.Sleep(500);
});
AnsiConsole.MarkupLine("[red]程序结束[/]");
}
}
AnsiConsole.MarkupLine("\n[dim]按任意键关闭...[/]");
Console.ReadKey();
}
private static void ShowWelcomeBanner()
{
var figlet = new FigletText("OPC DA Manager")
.Centered()
.Color(Color.Blue);
AnsiConsole.Write(figlet);
AnsiConsole.Write(new Rule("[yellow]OPC DA 通信测试程序[/]").RuleStyle("grey"));
AnsiConsole.WriteLine();
}
private static void ShowItemsAddition(OpcDaManager opcManager, string[] itemNames)
{
var table = new Table();
table.AddColumn("[bold]数据项[/]");
table.AddColumn("[bold]状态[/]");
foreach (string itemName in itemNames)
{
if (opcManager.AddItem(itemName))
{
table.AddRow(itemName, "[green]✓ 添加成功[/]");
_latestValues[itemName] = "等待数据...";
}
else
{
table.AddRow(itemName, "[red]✗ 添加失败[/]");
}
}
AnsiConsole.Write(table);
}
private static void ShowAsyncOperations(OpcDaManager opcManager, string[] itemNames)
{
AnsiConsole.MarkupLine("\n[bold cyan]开始异步操作测试:[/]");
var panel = new Panel("正在执行异步读取操作...")
.Header("[bold]异步读取[/]")
.BorderStyle(Style.Parse("cyan"));
AnsiConsole.Write(panel);
foreach (string itemName in itemNames)
{
opcManager.ReadItemAsync(itemName);
}
// 演示异步写入
var result = opcManager.WriteItemAsync(itemNames[2], 120);
if (result)
{
AnsiConsole.MarkupLine("[green]✓ 异步写入请求已发送[/]");
}
}
private static void CreateDataTable()
{
_dataTable = new Table();
_dataTable.AddColumn("[bold blue]数据项[/]");
_dataTable.AddColumn("[bold green]当前值[/]");
_dataTable.AddColumn("[bold yellow]质量[/]");
_dataTable.AddColumn("[bold cyan]时间戳[/]");
_dataTable.Border(TableBorder.Rounded);
_dataTable.Title("[bold]实时数据监控[/]");
}
private static void ShowConnectionError()
{
var panel = new Panel(
"[red]连接OPC服务器失败![/]\n\n" +
"[yellow]请确保:[/]\n" +
"• Kepware服务器正在运行\n" +
"• 服务器名称正确\n" +
"• OPC客户端权限配置正确")
.Header("[red]连接错误[/]")
.BorderStyle(Style.Parse("red"));
AnsiConsole.Write(panel);
}
private static void UpdateDataDisplay(string itemName, object value, string quality, DateTime timestamp)
{
_latestValues[itemName] = value;
// 清除并重新绘制表格
Console.Clear();
ShowWelcomeBanner();
var liveTable = new Table();
liveTable.AddColumn("[bold blue]数据项[/]");
liveTable.AddColumn("[bold green]当前值[/]");
liveTable.AddColumn("[bold yellow]质量[/]");
liveTable.AddColumn("[bold cyan]时间戳[/]");
liveTable.Border(TableBorder.Rounded);
liveTable.Title("[bold]实时数据监控[/]");
foreach (var kvp in _latestValues)
{
var valueColor = kvp.Key == itemName ? "green" : "white";
liveTable.AddRow(
kvp.Key,
$"[{valueColor}]{kvp.Value}[/]",
quality,
timestamp.ToString("HH:mm:ss.fff")
);
}
AnsiConsole.Write(liveTable);
AnsiConsole.MarkupLine("\n[dim]按任意键退出[/]");
}
private static void OpcManager_WriteCompleted(object sender, OpcWriteCompletedEventArgs e)
{
AnsiConsole.MarkupLine("[green]✓ 写入数据完成![/]");
}
// 数据变化事件处理
private static void OnDataChanged(object sender, OpcDataChangedEventArgs e)
{
UpdateDataDisplay(e.ItemName, e.Value, e.Quality.ToString(), e.Timestamp);
// 同时显示变化通知
var markup = $"[bold green]📊 数据变化[/] [cyan]{e.ItemName}[/]: [yellow]{e.Value}[/] [dim]({e.Timestamp:HH:mm:ss.fff})[/]";
AnsiConsole.MarkupLine(markup);
}
// 异步读取完成事件处理
private static void OnReadCompleted(object sender, OpcReadCompletedEventArgs e)
{
var panel = new Panel($"请求ID: [cyan]{e.RequestId}[/]")
.Header("[bold green]📖 读取完成[/]")
.BorderStyle(Style.Parse("green"));
foreach (var value in e.Values)
{
if (value.Result.IsSuccess())
{
AnsiConsole.MarkupLine($"[green]✓ 读取成功:[/] [yellow]{value.Value}[/] [dim](质量: {value.Quality})[/]");
}
else
{
AnsiConsole.MarkupLine($"[red]✗ 读取失败:[/] {value.Result.Description()}");
}
}
}
// 异步写入完成事件处理
private static void OnWriteCompleted(object sender, OpcWriteCompletedEventArgs e)
{
var panel = new Panel($"请求ID: [cyan]{e.RequestId}[/]")
.Header("[bold blue]📝 写入完成[/]")
.BorderStyle(Style.Parse("blue"));
AnsiConsole.Write(panel);
foreach (var result in e.Results)
{
if (result.Result.IsSuccess())
{
AnsiConsole.MarkupLine("[green]✓ 写入成功[/]");
}
else
{
AnsiConsole.MarkupLine($"[red]✗ 写入失败:[/] {result.Result.Description()}");
}
}
}
// 错误事件处理
private static void OnErrorOccurred(object sender, OpcErrorEventArgs e)
{
var panel = new Panel($"[red]{e.Message}[/]")
.Header("[bold red]❌ 错误[/]")
.BorderStyle(Style.Parse("red"));
AnsiConsole.Write(panel);
}
}
}

通过封装这个通用OPC DA管理器,我们成功解决了三大核心问题:
这个封装类不仅适用于当前项目,更可以作为企业级工业通信基础组件,在多个项目中复用。
🤔 互动讨论:
你在OPC开发中还遇到过哪些棘手问题?欢迎在评论区分享你的经验和困惑!
👍 如果这篇文章对你有帮助,请转发给更多需要的同行!让我们一起推动C#工业通信技术的普及和发展!
#C#开发 #OPC通信 #工业自动化 #编程技巧
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!