编辑
2025-11-29
C#
00

目录

OPC UA简介
OpcUaService类设计与实现
基础连接管理
数据读取
数据写入
数据变更订阅
断线重连机制
完整代码OpcUaService
测试程序实现
基础连接测试
读取操作测试
写入操作测试
订阅测试
断线重连机制测试
完整测试代码
工业应用场景
结论

工业4.0和物联网时代,OPC UA (OPC Unified Architecture) 已成为工业自动化和设备通信的标准协议。本文将深入探讨如何在.NET环境中实现可靠的OPC UA通信,通过一个功能完备的OPC UA客户端库及其测试程序,展示工业环境下数据采集、监控和控制的实现方法。

OPC UA简介

OPC UA是一种独立于平台的、面向服务的架构,用于工业自动化领域的数据交换。它具有以下特点:

  • 平台无关性:可在各种硬件平台和操作系统上运行
  • 安全性:提供认证、授权和加密机制
  • 统一数据模型:提供完整的信息模型,表示复杂系统和关系
  • 可扩展性:支持从简单设备到复杂系统的多种应用场景

OpcUaService类设计与实现

我们设计的OpcUaService类是对OPC UA客户端操作的封装,提供了以下核心功能:

基础连接管理

C#
// 匿名连接方式 public async Task<bool> ConnectServerAsync(string serverUrl) { try { ServerUrl = serverUrl; _lastUsername = null; _lastPassword = null; _opcUaClient.UserIdentity = new UserIdentity(new AnonymousIdentityToken()); await _opcUaClient.ConnectServer(serverUrl); IsConnected = true; return true; } catch (Exception ex) { IsConnected = false; OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs { IsConnected = false, Message = $"连接失败: {ex.Message}" }); return false; } } // 用户名密码连接方式 public async Task<bool> ConnectServerAsync(string serverUrl, string username, string password) { // 实现代码... }

数据读取

读取操作支持多种灵活的方式:

C#
// 读取单个节点 public DataValue ReadNode(string nodeId) { if (!IsConnected) throw new InvalidOperationException("未连接到OPC UA服务器"); return _opcUaClient.ReadNode(new NodeId(nodeId)); } // 泛型读取方法 public T ReadNode<T>(string nodeId) { // 实现代码... } // 异步读取 public async Task<T> ReadNodeAsync<T>(string nodeId) { // 实现代码... } // 批量读取 public List<DataValue> ReadNodes(string[] nodeIds) { // 实现代码... }

数据写入

C#
// 写入单个节点 public bool WriteNode<T>(string nodeId, T value) { if (!IsConnected) throw new InvalidOperationException("未连接到OPC UA服务器"); return _opcUaClient.WriteNode(nodeId, value); } // 批量写入 public bool WriteNodes(string[] nodeIds, object[] values) { // 实现代码... }

数据变更订阅

订阅功能是实时监控系统的核心:

C#
// 添加节点订阅 public bool AddSubscription(string key, string nodeId, Action<string, MonitoredItem, MonitoredItemNotificationEventArgs> callback) { return AddSubscription(key, nodeId, callback, 1000, 1000); } // 详细订阅实现 public bool AddSubscription(string key, string nodeId, Action<string, MonitoredItem, MonitoredItemNotificationEventArgs> callback, int publishingInterval = 1000, int samplingInterval = 1000) { // 详细实现代码... // 创建订阅和监控项 Subscription subscription = new Subscription(_opcUaClient.Session.DefaultSubscription); subscription.PublishingEnabled = true; subscription.PublishingInterval = publishingInterval; MonitoredItem monitoredItem = new MonitoredItem { StartNodeId = new NodeId(nodeId), AttributeId = Attributes.Value, DisplayName = key, SamplingInterval = samplingInterval }; // 设置回调 monitoredItem.Notification += (item, args) => callback(key, item, args); // 实现剩余代码... }

断线重连机制

工业环境中网络不稳定是常见问题,自动重连机制至关重要:

C#
// 重连配置类 public class ReconnectionConfig { public bool EnableAutoReconnect { get; set; } = true; public int ReconnectInterval { get; set; } = 5000; public int MaxReconnectAttempts { get; set; } = 0; public bool RestoreSubscriptionsAfterReconnect { get; set; } = true; } // 配置重连参数 public void ConfigureReconnection(ReconnectionConfig config) { _reconnectionConfig = config ?? new ReconnectionConfig(); } // 重连实现 private async Task ReconnectAsync() { // 检查重连条件 if (IsConnected || (_reconnectionConfig.MaxReconnectAttempts > 0 && _reconnectAttempts >= _reconnectionConfig.MaxReconnectAttempts)) { StopReconnection(); return; } _reconnectAttempts++; try { // 尝试使用原有凭据重连 bool success; if (!string.IsNullOrEmpty(_lastUsername)) { success = await ConnectServerAsync(ServerUrl, _lastUsername, _lastPassword); } else { success = await ConnectServerAsync(ServerUrl); } if (success) { // 重连成功后恢复订阅 if (_reconnectionConfig.RestoreSubscriptionsAfterReconnect) { await RestoreSubscriptionsAsync(); } StopReconnection(); } // 其他重连逻辑... } catch (Exception ex) { // 异常处理... } }

完整代码OpcUaService

C#
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Opc.Ua.Client; using Opc.Ua; using OpcUaHelper; namespace Rick.Core.Services.OpcUa { /// <summary> /// 通用OPC UA操作类,封装OPC UA基本操作 /// </summary> public class OpcUaService: IDisposable { #region 变量与属性 /// <summary> /// OPC UA客户端实例 /// </summary> private OpcUaClient _opcUaClient; /// <summary> /// 订阅字典,用于管理订阅 /// </summary> private Dictionary<string, Subscription> _subscriptions = new Dictionary<string, Subscription>(); /// <summary> /// 连接状态 /// </summary> public bool IsConnected { get; private set; } = false; /// <summary> /// 服务器地址 /// </summary> public string ServerUrl { get; private set; } /// <summary> /// OPC状态变化事件 /// </summary> public event EventHandler<OpcStatusEventArgs> OpcStatusChanged; #endregion #region 构造函数和初始化 /// <summary> /// 构造函数 /// </summary> public OpcUaService() { _opcUaClient = new OpcUaClient(); _opcUaClient.OpcStatusChange += OpcUaClient_OpcStatusChange; } /// <summary> /// OPC UA状态改变事件处理 /// </summary> private void OpcUaClient_OpcStatusChange(object sender, OpcUaStatusEventArgs e) { bool previousState = IsConnected; IsConnected = !e.Error; OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs { IsConnected = IsConnected, Message = e.Error ? "连接断开" : "连接成功" }); // 如果连接状态从已连接变为断开,则启动重连 if (previousState && !IsConnected) { StartReconnection(); } } #endregion #region 连接和断开 /// <summary> /// 连接到OPC UA服务器(匿名方式) /// </summary> public async Task<bool> ConnectServerAsync(string serverUrl) { try { ServerUrl = serverUrl; _lastUsername = null; _lastPassword = null; _opcUaClient.UserIdentity = new UserIdentity(new AnonymousIdentityToken()); await _opcUaClient.ConnectServer(serverUrl); IsConnected = true; return true; } catch (Exception ex) { IsConnected = false; OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs { IsConnected = false, Message = $"连接失败: {ex.Message}" }); return false; } } /// <summary> /// 使用用户名密码连接到OPC UA服务器 /// </summary> public async Task<bool> ConnectServerAsync(string serverUrl, string username, string password) { try { ServerUrl = serverUrl; _lastUsername = username; _lastPassword = password; _opcUaClient.UserIdentity = new UserIdentity(username, password); await _opcUaClient.ConnectServer(serverUrl); IsConnected = true; return true; } catch (Exception ex) { IsConnected = false; OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs { IsConnected = false, Message = $"连接失败: {ex.Message}" }); return false; } } /// <summary> /// 断开与服务器的连接 /// </summary> public void Disconnect() { try { StopReconnection(); if (_opcUaClient != null) { _opcUaClient.Disconnect(); } IsConnected = false; } catch (Exception ex) { Console.WriteLine($"断开连接时发生错误: {ex.Message}"); } } #endregion #region 读取操作 /// <summary> /// 读取单个节点的值 /// </summary> /// <param name="nodeId">节点ID,例如:"ns=2;s=Channel1.Device1.Tag1"</param> /// <returns>数据值</returns> public DataValue ReadNode(string nodeId) { if (!IsConnected) throw new InvalidOperationException("未连接到OPC UA服务器"); return _opcUaClient.ReadNode(new NodeId(nodeId)); } /// <summary> /// 读取单个节点的值并转换为指定类型 /// </summary> /// <typeparam name="T">值的类型</typeparam> /// <param name="nodeId">节点ID</param> /// <returns>指定类型的值</returns> public T ReadNode<T>(string nodeId) { if (!IsConnected) throw new InvalidOperationException("未连接到OPC UA服务器"); return _opcUaClient.ReadNode<T>(nodeId); } /// <summary> /// 异步读取节点值 /// </summary> /// <typeparam name="T">值的类型</typeparam> /// <param name="nodeId">节点ID</param> /// <returns>指定类型的值</returns> public async Task<T> ReadNodeAsync<T>(string nodeId) { if (!IsConnected) throw new InvalidOperationException("未连接到OPC UA服务器"); return await _opcUaClient.ReadNodeAsync<T>(nodeId); } /// <summary> /// 批量读取多个节点的值 /// </summary> /// <param name="nodeIds">节点ID列表</param> /// <returns>数据值列表</returns> public List<DataValue> ReadNodes(List<NodeId> nodeIds) { if (!IsConnected) throw new InvalidOperationException("未连接到OPC UA服务器"); return _opcUaClient.ReadNodes(nodeIds.ToArray()); } /// <summary> /// 批量读取多个字符串节点ID的值 /// </summary> /// <param name="nodeIds">字符串节点ID数组</param> /// <returns>数据值列表</returns> public List<DataValue> ReadNodes(string[] nodeIds) { if (!IsConnected) throw new InvalidOperationException("未连接到OPC UA服务器"); List<NodeId> nodes = new List<NodeId>(); foreach (var nodeId in nodeIds) { nodes.Add(new NodeId(nodeId)); } return _opcUaClient.ReadNodes(nodes.ToArray()); } /// <summary> /// 批量读取相同类型的多个节点的值 /// </summary> /// <typeparam name="T">值的类型</typeparam> /// <param name="nodeIds">节点ID数组</param> /// <returns>指定类型的值列表</returns> public List<T> ReadNodes<T>(string[] nodeIds) { if (!IsConnected) throw new InvalidOperationException("未连接到OPC UA服务器"); return _opcUaClient.ReadNodes<T>(nodeIds); } #endregion #region 写入操作 /// <summary> /// 写入单个节点的值 /// </summary> /// <typeparam name="T">值的类型</typeparam> /// <param name="nodeId">节点ID</param> /// <param name="value">要写入的值</param> /// <returns>写入是否成功</returns> public bool WriteNode<T>(string nodeId, T value) { if (!IsConnected) throw new InvalidOperationException("未连接到OPC UA服务器"); return _opcUaClient.WriteNode(nodeId, value); } /// <summary> /// 批量写入多个节点的值 /// </summary> /// <param name="nodeIds">节点ID数组</param> /// <param name="values">要写入的值数组</param> /// <returns>写入是否成功</returns> public bool WriteNodes(string[] nodeIds, object[] values) { if (!IsConnected) throw new InvalidOperationException("未连接到OPC UA服务器"); if (nodeIds.Length != values.Length) throw new ArgumentException("节点ID数组和值数组的长度必须相同"); return _opcUaClient.WriteNodes(nodeIds, values); } #endregion #region 订阅操作 /// <summary> /// 添加节点订阅 /// </summary> /// <param name="key">订阅的唯一标识符</param> /// <param name="nodeId">要订阅的节点ID</param> /// <param name="callback">数据变化时的回调</param> /// <returns>订阅是否成功</returns> public bool AddSubscription(string key, string nodeId, Action<string, MonitoredItem, MonitoredItemNotificationEventArgs> callback) { return AddSubscription(key, nodeId, callback, 1000, 1000); } /// <summary> /// 添加多个节点的订阅 /// </summary> /// <param name="key">订阅的唯一标识符</param> /// <param name="nodeIds">要订阅的节点ID数组</param> /// <param name="callback">数据变化时的回调</param> /// <returns>订阅是否成功</returns> public bool AddSubscription(string key, string[] nodeIds, Action<string, MonitoredItem, MonitoredItemNotificationEventArgs> callback) { return AddSubscription(key, nodeIds, callback, 1000, 1000); } /// <summary> /// 移除订阅 /// </summary> /// <param name="key">订阅的唯一标识符</param> /// <returns>移除是否成功</returns> public bool RemoveSubscription(string key) { if (!IsConnected) return false; try { // 从备份的订阅信息中移除 _subscriptionInfos.Remove(key); if (_subscriptions.TryGetValue(key, out Subscription subscription)) { subscription.Delete(true); _opcUaClient.Session.RemoveSubscription(subscription); _subscriptions.Remove(key); return true; } return false; } catch (Exception ex) { Console.WriteLine($"移除订阅时发生错误: {ex.Message}"); return false; } } /// <summary> /// 获取当前活跃的订阅数量 /// </summary> /// <returns>订阅数量</returns> public int GetActiveSubscriptionCount() { return _subscriptions.Count; } /// <summary> /// 获取已保存的订阅配置数量 /// </summary> /// <returns>订阅配置数量</returns> public int GetSavedSubscriptionCount() { return _subscriptionInfos.Count; } /// <summary> /// 获取所有订阅的Key列表 /// </summary> /// <returns>订阅Key列表</returns> public List<string> GetSubscriptionKeys() { return _subscriptionInfos.Keys.ToList(); } /// <summary> /// 暂停所有订阅(禁用发布但保留配置) /// </summary> public void PauseAllSubscriptions() { if (!IsConnected) return; try { foreach (var subscription in _subscriptions.Values) { subscription.PublishingEnabled = false; subscription.ApplyChanges(); } } catch (Exception ex) { Console.WriteLine($"暂停所有订阅时发生错误: {ex.Message}"); } } /// <summary> /// 恢复所有已暂停的订阅 /// </summary> public void ResumeAllSubscriptions() { if (!IsConnected) return; try { foreach (var subscription in _subscriptions.Values) { subscription.PublishingEnabled = true; subscription.ApplyChanges(); } } catch (Exception ex) { Console.WriteLine($"恢复所有暂停的订阅时发生错误: {ex.Message}"); } } /// <summary> /// 移除所有订阅 /// </summary> public void RemoveAllSubscriptions() { if (!IsConnected) return; try { // 清空备份的订阅信息 _subscriptionInfos.Clear(); foreach (var subscription in _subscriptions.Values) { subscription.Delete(true); _opcUaClient.Session.RemoveSubscription(subscription); } _subscriptions.Clear(); } catch (Exception ex) { Console.WriteLine($"移除所有订阅时发生错误: {ex.Message}"); } } /// <summary> /// 添加节点订阅(带自定义参数) /// </summary> /// <param name="key">订阅的唯一标识符</param> /// <param name="nodeId">要订阅的节点ID</param> /// <param name="callback">数据变化时的回调</param> /// <param name="publishingInterval">发布间隔(毫秒)</param> /// <param name="samplingInterval">采样间隔(毫秒)</param> /// <returns>订阅是否成功</returns> public bool AddSubscription(string key, string nodeId, Action<string, MonitoredItem, MonitoredItemNotificationEventArgs> callback, int publishingInterval = 1000, int samplingInterval = 1000) { if (!IsConnected) throw new InvalidOperationException("未连接到OPC UA服务器"); try { // 保存订阅信息,用于断线重连后恢复 var subInfo = new SubscriptionInfo { Key = key, NodeIds = new[] { nodeId }, Callback = callback, PublishingInterval = publishingInterval, SamplingInterval = samplingInterval }; _subscriptionInfos[key] = subInfo; // 创建一个新的订阅 Subscription subscription = new Subscription(_opcUaClient.Session.DefaultSubscription); subscription.PublishingEnabled = true; subscription.PublishingInterval = publishingInterval; subscription.KeepAliveCount = 10; subscription.LifetimeCount = 30; subscription.Priority = 100; // 创建监控项 MonitoredItem monitoredItem = new MonitoredItem { StartNodeId = new NodeId(nodeId), AttributeId = Attributes.Value, DisplayName = key, SamplingInterval = samplingInterval }; // 设置回调 monitoredItem.Notification += (item, args) => callback(key, item, args); // 添加监控项到订阅 subscription.AddItem(monitoredItem); // 在客户端中创建订阅 _opcUaClient.Session.AddSubscription(subscription); subscription.Create(); // 添加到字典 _subscriptions[key] = subscription; return true; } catch (Exception ex) { Console.WriteLine($"添加订阅时发生错误: {ex.Message}"); return false; } } /// <summary> /// 添加多个节点的订阅(带自定义参数) /// </summary> /// <param name="key">订阅的唯一标识符</param> /// <param name="nodeIds">要订阅的节点ID数组</param> /// <param name="callback">数据变化时的回调</param> /// <param name="publishingInterval">发布间隔(毫秒)</param> /// <param name="samplingInterval">采样间隔(毫秒)</param> /// <returns>订阅是否成功</returns> public bool AddSubscription(string key, string[] nodeIds, Action<string, MonitoredItem, MonitoredItemNotificationEventArgs> callback, int publishingInterval = 1000, int samplingInterval = 1000) { if (!IsConnected) throw new InvalidOperationException("未连接到OPC UA服务器"); try { // 保存订阅信息,用于断线重连后恢复 var subInfo = new SubscriptionInfo { Key = key, NodeIds = nodeIds, Callback = callback, PublishingInterval = publishingInterval, SamplingInterval = samplingInterval }; _subscriptionInfos[key] = subInfo; // 创建一个新的订阅 Subscription subscription = new Subscription(_opcUaClient.Session.DefaultSubscription); subscription.PublishingEnabled = true; subscription.PublishingInterval = publishingInterval; subscription.KeepAliveCount = 10; subscription.LifetimeCount = 30; subscription.Priority = 100; // 遍历节点ID,为每个节点创建监控项 for (int i = 0; i < nodeIds.Length; i++) { MonitoredItem monitoredItem = new MonitoredItem { StartNodeId = new NodeId(nodeIds[i]), AttributeId = Attributes.Value, DisplayName = $"{key}_{i}", SamplingInterval = samplingInterval }; // 设置回调 monitoredItem.Notification += (item, args) => callback(key, item, args); // 添加监控项到订阅 subscription.AddItem(monitoredItem); } // 在客户端中创建订阅 _opcUaClient.Session.AddSubscription(subscription); subscription.Create(); // 添加到字典 _subscriptions[key] = subscription; return true; } catch (Exception ex) { Console.WriteLine($"添加多节点订阅时发生错误: {ex.Message}"); return false; } } #endregion #region IDisposable支持 private bool disposedValue = false; // 要检测冗余调用 /// <summary> /// 释放资源 /// </summary> /// <param name="disposing">是否在释放托管资源</param> protected virtual void Dispose(bool disposing) { if (!disposedValue) { if (disposing) { // 释放托管状态(托管对象) StopReconnection(); RemoveAllSubscriptions(); Disconnect(); _opcUaClient = null; } // 释放未托管的资源(未托管的对象)并在以下内容中替代终结器 // 将大型字段设置为 null disposedValue = true; } } /// <summary> /// 实现IDisposable接口的Dispose方法 /// </summary> public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } #endregion #region 断线重连 /// <summary> /// 断线重连配置 /// </summary> public class ReconnectionConfig { /// <summary> /// 是否启用自动重连 /// </summary> public bool EnableAutoReconnect { get; set; } = true; /// <summary> /// 重连尝试间隔(毫秒) /// </summary> public int ReconnectInterval { get; set; } = 5000; /// <summary> /// 最大重连尝试次数,设为0表示无限次尝试 /// </summary> public int MaxReconnectAttempts { get; set; } = 0; /// <summary> /// 是否在重连后恢复订阅 /// </summary> public bool RestoreSubscriptionsAfterReconnect { get; set; } = true; } private ReconnectionConfig _reconnectionConfig = new ReconnectionConfig(); private System.Threading.Timer _reconnectionTimer; private int _reconnectAttempts = 0; private string _lastUsername; private string _lastPassword; private bool _isReconnecting = false; // 存储订阅信息,用于断线重连后恢复 private class SubscriptionInfo { public string Key { get; set; } public string[] NodeIds { get; set; } public Action<string, MonitoredItem, MonitoredItemNotificationEventArgs> Callback { get; set; } public int PublishingInterval { get; set; } = 1000; public int SamplingInterval { get; set; } = 1000; } private Dictionary<string, SubscriptionInfo> _subscriptionInfos = new Dictionary<string, SubscriptionInfo>(); /// <summary> /// 配置断线重连参数 /// </summary> /// <param name="config">重连配置</param> public void ConfigureReconnection(ReconnectionConfig config) { _reconnectionConfig = config ?? new ReconnectionConfig(); } /// <summary> /// 启动断线重连 /// </summary> private void StartReconnection() { if (!_reconnectionConfig.EnableAutoReconnect || _isReconnecting) return; _isReconnecting = true; _reconnectAttempts = 0; // 停止已有的重连定时器 _reconnectionTimer?.Dispose(); // 创建新的重连定时器 _reconnectionTimer = new System.Threading.Timer( async (state) => await ReconnectAsync(), null, _reconnectionConfig.ReconnectInterval, _reconnectionConfig.ReconnectInterval); OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs { IsConnected = false, Message = "连接断开,开始尝试重连..." }); } /// <summary> /// 尝试重新连接 /// </summary> private async Task ReconnectAsync() { // 如果已经连接或已达到最大尝试次数,则停止重连 if (IsConnected || (_reconnectionConfig.MaxReconnectAttempts > 0 && _reconnectAttempts >= _reconnectionConfig.MaxReconnectAttempts)) { StopReconnection(); return; } _reconnectAttempts++; try { bool success; // 使用上次连接的凭据进行重连 if (!string.IsNullOrEmpty(_lastUsername)) { success = await ConnectServerAsync(ServerUrl, _lastUsername, _lastPassword); } else { success = await ConnectServerAsync(ServerUrl); } if (success) { OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs { IsConnected = true, Message = $"重连成功,共尝试 {_reconnectAttempts} 次" }); // 重连成功后恢复订阅 if (_reconnectionConfig.RestoreSubscriptionsAfterReconnect) { await RestoreSubscriptionsAsync(); } StopReconnection(); } else if (_reconnectionConfig.MaxReconnectAttempts > 0 && _reconnectAttempts >= _reconnectionConfig.MaxReconnectAttempts) { OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs { IsConnected = false, Message = $"重连失败,已达到最大尝试次数: {_reconnectionConfig.MaxReconnectAttempts}" }); StopReconnection(); } else { OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs { IsConnected = false, Message = $"重连尝试 {_reconnectAttempts} 失败,将在 {_reconnectionConfig.ReconnectInterval / 1000} 秒后重试" }); } } catch (Exception ex) { OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs { IsConnected = false, Message = $"重连尝试 {_reconnectAttempts} 出错: {ex.Message}" }); } } /// <summary> /// 恢复所有订阅 /// </summary> private async Task RestoreSubscriptionsAsync() { try { // 清空当前的订阅集合,因为断线后原有的订阅对象已经失效 _subscriptions.Clear(); int successCount = 0; int failureCount = 0; // 从备份的订阅信息中恢复所有订阅 foreach (var info in _subscriptionInfos.Values) { try { bool success; if (info.NodeIds.Length == 1) { // 单节点订阅 success = AddSubscription( info.Key, info.NodeIds[0], info.Callback, info.PublishingInterval, info.SamplingInterval); } else { // 多节点订阅 success = AddSubscription( info.Key, info.NodeIds, info.Callback, info.PublishingInterval, info.SamplingInterval); } if (success) successCount++; else failureCount++; } catch (Exception ex) { failureCount++; Console.WriteLine($"恢复订阅 {info.Key} 时出错: {ex.Message}"); } } OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs { IsConnected = true, Message = $"已恢复 {successCount} 个订阅,{failureCount} 个恢复失败,共 {_subscriptionInfos.Count} 个" }); } catch (Exception ex) { OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs { IsConnected = true, Message = $"恢复订阅时发生错误: {ex.Message}" }); } } /// <summary> /// 停止重连过程 /// </summary> private void StopReconnection() { _reconnectionTimer?.Dispose(); _reconnectionTimer = null; _isReconnecting = false; } #endregion } /// <summary> /// OPC状态事件参数 /// </summary> public class OpcStatusEventArgs : EventArgs { /// <summary> /// 是否已连接 /// </summary> public bool IsConnected { get; set; } /// <summary> /// 状态消息 /// </summary> public string Message { get; set; } } }

测试程序实现

测试程序OpcUaServiceTest演示了如何使用OpcUaService类进行各种OPC UA操作。

基础连接测试

C#
private static async Task TestConnection(OpcUaService opcService) { Console.WriteLine("\n测试连接到OPC UA服务器..."); try { // 尝试匿名连接 bool connected = await opcService.ConnectServerAsync(ServerUrl); if (connected) { Console.WriteLine($"成功连接到服务器: {ServerUrl}"); } else { // 匿名连接失败,尝试用户名密码连接 Console.WriteLine("连接失败,尝试使用用户名和密码连接..."); // 获取用户输入的凭据 Console.Write("请输入用户名: "); string username = Console.ReadLine(); Console.Write("请输入密码: "); string password = Console.ReadLine(); connected = await opcService.ConnectServerAsync(ServerUrl, username, password); // 输出连接结果 } } catch (Exception ex) { Console.WriteLine($"连接测试异常: {ex.Message}"); } }

读取操作测试

C#
private static async Task TestReadOperations(OpcUaService opcService) { Console.WriteLine("\n测试读取操作..."); try { // 测试单个节点读取 Console.WriteLine($"读取节点: {TestNodeId}"); DataValue value = opcService.ReadNode(TestNodeId); Console.WriteLine($"节点值: {value.Value}, 状态: {value.StatusCode}, 时间戳: {value.SourceTimestamp}"); // 测试泛型读取 Console.WriteLine($"\n泛型读取节点: {TestNodeId}"); var genericValue = opcService.ReadNode<object>(TestNodeId); Console.WriteLine($"泛型读取值: {genericValue}"); // 测试异步读取 Console.WriteLine($"\n异步读取节点: {TestNodeId}"); var asyncValue = await opcService.ReadNodeAsync<object>(TestNodeId); Console.WriteLine($"异步读取值: {asyncValue}"); // 测试批量读取 Console.WriteLine("\n批量读取多个节点"); string[] nodeIds = new string[] { TestNodeId, TestNodeId2 }; List<DataValue> values = opcService.ReadNodes(nodeIds); for (int i = 0; i < nodeIds.Length; i++) { Console.WriteLine($"节点 {nodeIds[i]} 值: {values[i].Value}"); } } catch (Exception ex) { Console.WriteLine($"读取操作测试异常: {ex.Message}"); } }

写入操作测试

C#
private static async Task TestWriteOperations(OpcUaService opcService) { Console.WriteLine("\n测试写入操作..."); try { // 先读取当前值 var currentValue = opcService.ReadNode<object>(TestNodeId); Console.WriteLine($"节点 {TestNodeId} 当前值: {currentValue}"); // 根据数据类型写入不同的值 bool writeSuccess = false; if (currentValue is bool boolValue) { // 布尔值取反 writeSuccess = opcService.WriteNode(TestNodeId, !boolValue); // 输出信息... } else if (currentValue is int intValue) { // 整数值加1 writeSuccess = opcService.WriteNode(TestNodeId, intValue + 1); // 输出信息... } // 其他类型处理... if (writeSuccess) { // 再次读取验证写入 await Task.Delay(500); var newValue = opcService.ReadNode<object>(TestNodeId); Console.WriteLine($"写入后节点 {TestNodeId} 的新值: {newValue}"); } // 测试批量写入 // 实现代码... } catch (Exception ex) { Console.WriteLine($"写入操作测试异常: {ex.Message}"); } }

订阅测试

C#
private static async Task TestSubscription(OpcUaService opcService) { Console.WriteLine("\n测试数据变更订阅..."); try { // 创建事件等待句柄 var dataChangedEvent = new ManualResetEventSlim(false); // 添加订阅 string subscriptionKey = "TestSubscription"; bool subscribed = opcService.AddSubscription(subscriptionKey, TestNodeId, (key, item, args) => { if (args.NotificationValue is MonitoredItemNotification notification) { Console.WriteLine($"数据变更通知 - 键: {key}, 节点: {item.DisplayName}, " + $"值: {notification.Value.Value}, 时间: {notification.Value.SourceTimestamp}"); dataChangedEvent.Set(); } }); if (subscribed) { // 通过修改值触发订阅通知 // 实现代码... // 测试多节点订阅 Console.WriteLine("\n测试多节点订阅..."); string multiSubscriptionKey = "MultiNodeSubscription"; string[] nodeIds = new string[] { TestNodeId, TestNodeId2 }; bool multiSubscribed = opcService.AddSubscription(multiSubscriptionKey, nodeIds, (key, item, args) => { // 回调实现... }); // 等待通知和清理订阅 // 实现代码... } } catch (Exception ex) { Console.WriteLine($"订阅测试异常: {ex.Message}"); } }

断线重连机制测试

测试程序中展示了如何配置断线重连功能:

C#
// 在Main方法中 opcService.ConfigureReconnection(new ReconnectionConfig { EnableAutoReconnect = true, ReconnectInterval = 3000, // 3秒 MaxReconnectAttempts = 0, // 无限尝试 RestoreSubscriptionsAfterReconnect = true // 重连后恢复订阅 }); // 注册状态变化事件 opcService.OpcStatusChanged += (sender, e) => { Console.WriteLine($"OPC UA 状态变化: {e.Message}, 连接状态: {e.IsConnected}"); };

完整测试代码

C#
using System; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; using Opc.Ua; using Opc.Ua.Client; using Rick.Core.Services.OpcUa; using static Rick.Core.Services.OpcUa.OpcUaService; namespace OpcUaServiceTest { class Program { // OPC UA 服务器地址,请根据实际环境修改 private const string ServerUrl = "opc.tcp://127.0.0.1:49320"; // 测试节点ID,请根据您的OPC UA服务器修改 private const string TestNodeId = "ns=2;s=LMES.W1.Test1"; private const string TestNodeId2 = "ns=2;s=LMES.W1.Test2"; static async Task Main(string[] args) { Console.WriteLine("OPC UA 服务测试程序"); Console.WriteLine("==================="); // 创建OPC UA服务实例 using (OpcUaService opcService = new OpcUaService()) { opcService.ConfigureReconnection(new ReconnectionConfig { EnableAutoReconnect = true, ReconnectInterval = 3000, // 3 seconds MaxReconnectAttempts = 0, // Try 10 times then give up RestoreSubscriptionsAfterReconnect = true // 重连后恢复订阅 }); // 注册状态变化事件 opcService.OpcStatusChanged += (sender, e) => { Console.WriteLine($"OPC UA 状态变化: {e.Message}, 连接状态: {e.IsConnected}"); }; // 测试连接 await TestConnection(opcService); if (opcService.IsConnected) { // 测试读取操作 await TestReadOperations(opcService); // 测试写入操作 await TestWriteOperations(opcService); // 测试订阅操作 await TestSubscription(opcService); Console.WriteLine("\n按任意键退出程序..."); Console.ReadKey(); } } } private static async Task TestConnection(OpcUaService opcService) { Console.WriteLine("\n测试连接到OPC UA服务器..."); try { // 尝试匿名连接 bool connected = await opcService.ConnectServerAsync(ServerUrl); if (connected) { Console.WriteLine($"成功连接到服务器: {ServerUrl}"); } else { Console.WriteLine("连接失败,尝试使用用户名和密码连接..."); // 如果匿名连接失败,尝试使用用户名和密码连接 Console.Write("请输入用户名: "); string username = Console.ReadLine(); Console.Write("请输入密码: "); string password = Console.ReadLine(); connected = await opcService.ConnectServerAsync(ServerUrl, username, password); if (connected) { Console.WriteLine($"使用凭据成功连接到服务器: {ServerUrl}"); } else { Console.WriteLine("连接失败,请检查服务器地址和凭据"); } } } catch (Exception ex) { Console.WriteLine($"连接测试异常: {ex.Message}"); } } private static async Task TestReadOperations(OpcUaService opcService) { Console.WriteLine("\n测试读取操作..."); try { // 测试单个节点读取 Console.WriteLine($"读取节点: {TestNodeId}"); DataValue value = opcService.ReadNode(TestNodeId); Console.WriteLine($"节点值: {value.Value}, 状态: {value.StatusCode}, 时间戳: {value.SourceTimestamp}"); // 测试泛型读取 Console.WriteLine($"\n泛型读取节点: {TestNodeId}"); var genericValue = opcService.ReadNode<object>(TestNodeId); Console.WriteLine($"泛型读取值: {genericValue}"); // 测试异步读取 Console.WriteLine($"\n异步读取节点: {TestNodeId}"); var asyncValue = await opcService.ReadNodeAsync<object>(TestNodeId); Console.WriteLine($"异步读取值: {asyncValue}"); // 测试批量读取 Console.WriteLine("\n批量读取多个节点"); string[] nodeIds = new string[] { TestNodeId, TestNodeId2 }; List<DataValue> values = opcService.ReadNodes(nodeIds); for (int i = 0; i < nodeIds.Length; i++) { Console.WriteLine($"节点 {nodeIds[i]} 值: {values[i].Value}"); } } catch (Exception ex) { Console.WriteLine($"读取操作测试异常: {ex.Message}"); } } private static async Task TestWriteOperations(OpcUaService opcService) { Console.WriteLine("\n测试写入操作..."); try { // 先读取当前值 var currentValue = opcService.ReadNode<object>(TestNodeId); Console.WriteLine($"节点 {TestNodeId} 当前值: {currentValue}"); // 根据当前值类型尝试写入新值 bool writeSuccess = false; if (currentValue is bool boolValue) { // 布尔值取反 writeSuccess = opcService.WriteNode(TestNodeId, !boolValue); Console.WriteLine($"写入布尔值 {!boolValue} 到节点 {TestNodeId}"); } else if (currentValue is int intValue) { // 整数值加1 writeSuccess = opcService.WriteNode(TestNodeId, intValue + 1); Console.WriteLine($"写入整数值 {intValue + 1} 到节点 {TestNodeId}"); } else if (currentValue is double doubleValue) { // 浮点值加1.5 writeSuccess = opcService.WriteNode(TestNodeId, doubleValue + 1.5); Console.WriteLine($"写入浮点值 {doubleValue + 1.5} 到节点 {TestNodeId}"); } else if (currentValue is string) { // 字符串值添加时间戳 string newValue = $"Test_{DateTime.Now:yyyyMMdd_HHmmss}"; writeSuccess = opcService.WriteNode(TestNodeId, newValue); Console.WriteLine($"写入字符串值 \"{newValue}\" 到节点 {TestNodeId}"); } else { Console.WriteLine($"不支持的数据类型: {currentValue?.GetType().Name ?? "null"}"); } if (writeSuccess) { Console.WriteLine("写入成功"); // 再次读取以验证写入结果 await Task.Delay(500); // 等待一会儿确保写入完成 var newValue = opcService.ReadNode<object>(TestNodeId); Console.WriteLine($"写入后节点 {TestNodeId} 的新值: {newValue}"); } else { Console.WriteLine("写入失败"); } // 测试批量写入 if (currentValue != null) { Console.WriteLine("\n测试批量写入"); string[] nodeIds = new string[] { TestNodeId, TestNodeId2 }; object[] values = new object[] { currentValue, currentValue }; bool batchWriteSuccess = opcService.WriteNodes(nodeIds, values); Console.WriteLine($"批量写入结果: {(batchWriteSuccess ? "成功" : "失败")}"); } } catch (Exception ex) { Console.WriteLine($"写入操作测试异常: {ex.Message}"); } } private static async Task TestSubscription(OpcUaService opcService) { Console.WriteLine("\n测试数据变更订阅..."); try { // 创建一个事件重置事件,用于等待数据变更 var dataChangedEvent = new ManualResetEventSlim(false); // 添加单个节点订阅 string subscriptionKey = "TestSubscription"; bool subscribed = opcService.AddSubscription(subscriptionKey, TestNodeId, (key, item, args) => { if (args.NotificationValue is MonitoredItemNotification notification) { Console.WriteLine($"数据变更通知 - 键: {key}, 节点: {item.DisplayName}, 值: {notification.Value.Value}, 时间: {notification.Value.SourceTimestamp}"); dataChangedEvent.Set(); } }); if (subscribed) { Console.WriteLine($"成功订阅节点: {TestNodeId}"); Console.WriteLine("\n尝试修改节点值以触发订阅通知..."); // 读取当前值 var currentValue = opcService.ReadNode<object>(TestNodeId); // 尝试写入新值以触发订阅通知 if (currentValue is bool boolValue) { opcService.WriteNode(TestNodeId, !boolValue); } else if (currentValue is int intValue) { opcService.WriteNode(TestNodeId, intValue + 1); } else if (currentValue is double doubleValue) { opcService.WriteNode(TestNodeId, doubleValue + 1.0); } else if (currentValue is string) { opcService.WriteNode(TestNodeId, $"Changed_{DateTime.Now:HHmmss}"); } // 等待数据变更通知或超时 Console.WriteLine("等待数据变更通知..."); bool notificationReceived = dataChangedEvent.Wait(5000); if (!notificationReceived) { Console.WriteLine("未收到数据变更通知,可能是值未实际变化或服务器未发送通知"); } // 测试移除订阅 Console.WriteLine("\n测试移除订阅..."); bool unsubscribed = opcService.RemoveSubscription(subscriptionKey); Console.WriteLine($"移除订阅结果: {(unsubscribed ? "成功" : "失败")}"); // 测试多节点订阅 Console.WriteLine("\n测试多节点订阅..."); string multiSubscriptionKey = "MultiNodeSubscription"; string[] nodeIds = new string[] { TestNodeId, TestNodeId2 }; bool multiSubscribed = opcService.AddSubscription(multiSubscriptionKey, nodeIds, (key, item, args) => { if (args.NotificationValue is MonitoredItemNotification notification) { Console.WriteLine($"多节点订阅通知 - 键: {key}, 节点: {item.DisplayName}, 值: {notification.Value.Value}"); } }); Console.WriteLine($"多节点订阅结果: {(multiSubscribed ? "成功" : "失败")}"); // 等待一段时间以观察可能的通知 await Task.Delay(3000); // 移除所有订阅 Console.WriteLine("\n移除所有订阅..."); opcService.RemoveAllSubscriptions(); Console.WriteLine("所有订阅已移除"); } else { Console.WriteLine($"订阅节点失败: {TestNodeId}"); } } catch (Exception ex) { Console.WriteLine($"订阅测试异常: {ex.Message}"); } } } }

image.png

工业应用场景

此OPC UA客户端库适用于多种工业场景:

  1. 制造执行系统(MES)集成:从PLC和控制系统采集数据,实现生产监控和质量跟踪
  2. 设备健康监控:通过订阅机制实时监测设备状态和预测性维护
  3. 工业数据采集与分析:收集工业过程数据用于分析和优化
  4. 设备控制:通过写入操作远程控制工业设备和执行器
  5. 历史数据存储:将OPC UA数据存入数据库,用于报表和趋势分析

结论

本文介绍的OpcUaService类提供了一个稳健的OPC UA通信框架,可用于开发各种工业物联网和自动化应用。通过封装复杂的OPC UA操作细节,提供简洁易用的API,同时实现了关键的可靠性和容错机制。

工业物联网通信需要兼顾性能、可靠性和安全性。本文展示的解决方案通过断线重连、订阅恢复和异常处理等机制,满足了工业环境下的高可靠性要求,是构建现代工业自动化系统的理想基础。

随着工业4.0的发展,基于OPC UA的通信将继续扮演关键角色,为工业设备互联互通提供稳固基础。掌握这一技术,将有助于工程师们开发出更加智能、高效的工业自动化解决方案。

本文作者:技术老小子

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!