工业4.0和物联网时代,OPC UA (OPC Unified Architecture) 已成为工业自动化和设备通信的标准协议。本文将深入探讨如何在.NET环境中实现可靠的OPC UA通信,通过一个功能完备的OPC UA客户端库及其测试程序,展示工业环境下数据采集、监控和控制的实现方法。
OPC UA是一种独立于平台的、面向服务的架构,用于工业自动化领域的数据交换。它具有以下特点:
我们设计的OpcUaService类是对OPC UA客户端操作的封装,提供了以下核心功能:
C#// 匿名连接方式
public async Task<bool> ConnectServerAsync(string serverUrl)
{
try
{
ServerUrl = serverUrl;
_lastUsername = null;
_lastPassword = null;
_opcUaClient.UserIdentity = new UserIdentity(new AnonymousIdentityToken());
await _opcUaClient.ConnectServer(serverUrl);
IsConnected = true;
return true;
}
catch (Exception ex)
{
IsConnected = false;
OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs
{
IsConnected = false,
Message = $"连接失败: {ex.Message}"
});
return false;
}
}
// 用户名密码连接方式
public async Task<bool> ConnectServerAsync(string serverUrl, string username, string password)
{
// 实现代码...
}
读取操作支持多种灵活的方式:
C#// 读取单个节点
public DataValue ReadNode(string nodeId)
{
if (!IsConnected)
throw new InvalidOperationException("未连接到OPC UA服务器");
return _opcUaClient.ReadNode(new NodeId(nodeId));
}
// 泛型读取方法
public T ReadNode<T>(string nodeId)
{
// 实现代码...
}
// 异步读取
public async Task<T> ReadNodeAsync<T>(string nodeId)
{
// 实现代码...
}
// 批量读取
public List<DataValue> ReadNodes(string[] nodeIds)
{
// 实现代码...
}
C#// 写入单个节点
public bool WriteNode<T>(string nodeId, T value)
{
if (!IsConnected)
throw new InvalidOperationException("未连接到OPC UA服务器");
return _opcUaClient.WriteNode(nodeId, value);
}
// 批量写入
public bool WriteNodes(string[] nodeIds, object[] values)
{
// 实现代码...
}
订阅功能是实时监控系统的核心:
C#// 添加节点订阅
public bool AddSubscription(string key, string nodeId,
Action<string, MonitoredItem, MonitoredItemNotificationEventArgs> callback)
{
return AddSubscription(key, nodeId, callback, 1000, 1000);
}
// 详细订阅实现
public bool AddSubscription(string key, string nodeId,
Action<string, MonitoredItem, MonitoredItemNotificationEventArgs> callback,
int publishingInterval = 1000, int samplingInterval = 1000)
{
// 详细实现代码...
// 创建订阅和监控项
Subscription subscription = new Subscription(_opcUaClient.Session.DefaultSubscription);
subscription.PublishingEnabled = true;
subscription.PublishingInterval = publishingInterval;
MonitoredItem monitoredItem = new MonitoredItem
{
StartNodeId = new NodeId(nodeId),
AttributeId = Attributes.Value,
DisplayName = key,
SamplingInterval = samplingInterval
};
// 设置回调
monitoredItem.Notification += (item, args) => callback(key, item, args);
// 实现剩余代码...
}
工业环境中网络不稳定是常见问题,自动重连机制至关重要:
C#// 重连配置类
public class ReconnectionConfig
{
public bool EnableAutoReconnect { get; set; } = true;
public int ReconnectInterval { get; set; } = 5000;
public int MaxReconnectAttempts { get; set; } = 0;
public bool RestoreSubscriptionsAfterReconnect { get; set; } = true;
}
// 配置重连参数
public void ConfigureReconnection(ReconnectionConfig config)
{
_reconnectionConfig = config ?? new ReconnectionConfig();
}
// 重连实现
private async Task ReconnectAsync()
{
// 检查重连条件
if (IsConnected ||
(_reconnectionConfig.MaxReconnectAttempts > 0 &&
_reconnectAttempts >= _reconnectionConfig.MaxReconnectAttempts))
{
StopReconnection();
return;
}
_reconnectAttempts++;
try
{
// 尝试使用原有凭据重连
bool success;
if (!string.IsNullOrEmpty(_lastUsername))
{
success = await ConnectServerAsync(ServerUrl, _lastUsername, _lastPassword);
}
else
{
success = await ConnectServerAsync(ServerUrl);
}
if (success)
{
// 重连成功后恢复订阅
if (_reconnectionConfig.RestoreSubscriptionsAfterReconnect)
{
await RestoreSubscriptionsAsync();
}
StopReconnection();
}
// 其他重连逻辑...
}
catch (Exception ex)
{
// 异常处理...
}
}
C#using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Opc.Ua.Client;
using Opc.Ua;
using OpcUaHelper;
namespace Rick.Core.Services.OpcUa
{
/// <summary>
/// 通用OPC UA操作类,封装OPC UA基本操作
/// </summary>
public class OpcUaService: IDisposable
{
#region 变量与属性
/// <summary>
/// OPC UA客户端实例
/// </summary>
private OpcUaClient _opcUaClient;
/// <summary>
/// 订阅字典,用于管理订阅
/// </summary>
private Dictionary<string, Subscription> _subscriptions = new Dictionary<string, Subscription>();
/// <summary>
/// 连接状态
/// </summary>
public bool IsConnected { get; private set; } = false;
/// <summary>
/// 服务器地址
/// </summary>
public string ServerUrl { get; private set; }
/// <summary>
/// OPC状态变化事件
/// </summary>
public event EventHandler<OpcStatusEventArgs> OpcStatusChanged;
#endregion
#region 构造函数和初始化
/// <summary>
/// 构造函数
/// </summary>
public OpcUaService()
{
_opcUaClient = new OpcUaClient();
_opcUaClient.OpcStatusChange += OpcUaClient_OpcStatusChange;
}
/// <summary>
/// OPC UA状态改变事件处理
/// </summary>
private void OpcUaClient_OpcStatusChange(object sender, OpcUaStatusEventArgs e)
{
bool previousState = IsConnected;
IsConnected = !e.Error;
OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs
{
IsConnected = IsConnected,
Message = e.Error ? "连接断开" : "连接成功"
});
// 如果连接状态从已连接变为断开,则启动重连
if (previousState && !IsConnected)
{
StartReconnection();
}
}
#endregion
#region 连接和断开
/// <summary>
/// 连接到OPC UA服务器(匿名方式)
/// </summary>
public async Task<bool> ConnectServerAsync(string serverUrl)
{
try
{
ServerUrl = serverUrl;
_lastUsername = null;
_lastPassword = null;
_opcUaClient.UserIdentity = new UserIdentity(new AnonymousIdentityToken());
await _opcUaClient.ConnectServer(serverUrl);
IsConnected = true;
return true;
}
catch (Exception ex)
{
IsConnected = false;
OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs
{
IsConnected = false,
Message = $"连接失败: {ex.Message}"
});
return false;
}
}
/// <summary>
/// 使用用户名密码连接到OPC UA服务器
/// </summary>
public async Task<bool> ConnectServerAsync(string serverUrl, string username, string password)
{
try
{
ServerUrl = serverUrl;
_lastUsername = username;
_lastPassword = password;
_opcUaClient.UserIdentity = new UserIdentity(username, password);
await _opcUaClient.ConnectServer(serverUrl);
IsConnected = true;
return true;
}
catch (Exception ex)
{
IsConnected = false;
OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs
{
IsConnected = false,
Message = $"连接失败: {ex.Message}"
});
return false;
}
}
/// <summary>
/// 断开与服务器的连接
/// </summary>
public void Disconnect()
{
try
{
StopReconnection();
if (_opcUaClient != null)
{
_opcUaClient.Disconnect();
}
IsConnected = false;
}
catch (Exception ex)
{
Console.WriteLine($"断开连接时发生错误: {ex.Message}");
}
}
#endregion
#region 读取操作
/// <summary>
/// 读取单个节点的值
/// </summary>
/// <param name="nodeId">节点ID,例如:"ns=2;s=Channel1.Device1.Tag1"</param>
/// <returns>数据值</returns>
public DataValue ReadNode(string nodeId)
{
if (!IsConnected)
throw new InvalidOperationException("未连接到OPC UA服务器");
return _opcUaClient.ReadNode(new NodeId(nodeId));
}
/// <summary>
/// 读取单个节点的值并转换为指定类型
/// </summary>
/// <typeparam name="T">值的类型</typeparam>
/// <param name="nodeId">节点ID</param>
/// <returns>指定类型的值</returns>
public T ReadNode<T>(string nodeId)
{
if (!IsConnected)
throw new InvalidOperationException("未连接到OPC UA服务器");
return _opcUaClient.ReadNode<T>(nodeId);
}
/// <summary>
/// 异步读取节点值
/// </summary>
/// <typeparam name="T">值的类型</typeparam>
/// <param name="nodeId">节点ID</param>
/// <returns>指定类型的值</returns>
public async Task<T> ReadNodeAsync<T>(string nodeId)
{
if (!IsConnected)
throw new InvalidOperationException("未连接到OPC UA服务器");
return await _opcUaClient.ReadNodeAsync<T>(nodeId);
}
/// <summary>
/// 批量读取多个节点的值
/// </summary>
/// <param name="nodeIds">节点ID列表</param>
/// <returns>数据值列表</returns>
public List<DataValue> ReadNodes(List<NodeId> nodeIds)
{
if (!IsConnected)
throw new InvalidOperationException("未连接到OPC UA服务器");
return _opcUaClient.ReadNodes(nodeIds.ToArray());
}
/// <summary>
/// 批量读取多个字符串节点ID的值
/// </summary>
/// <param name="nodeIds">字符串节点ID数组</param>
/// <returns>数据值列表</returns>
public List<DataValue> ReadNodes(string[] nodeIds)
{
if (!IsConnected)
throw new InvalidOperationException("未连接到OPC UA服务器");
List<NodeId> nodes = new List<NodeId>();
foreach (var nodeId in nodeIds)
{
nodes.Add(new NodeId(nodeId));
}
return _opcUaClient.ReadNodes(nodes.ToArray());
}
/// <summary>
/// 批量读取相同类型的多个节点的值
/// </summary>
/// <typeparam name="T">值的类型</typeparam>
/// <param name="nodeIds">节点ID数组</param>
/// <returns>指定类型的值列表</returns>
public List<T> ReadNodes<T>(string[] nodeIds)
{
if (!IsConnected)
throw new InvalidOperationException("未连接到OPC UA服务器");
return _opcUaClient.ReadNodes<T>(nodeIds);
}
#endregion
#region 写入操作
/// <summary>
/// 写入单个节点的值
/// </summary>
/// <typeparam name="T">值的类型</typeparam>
/// <param name="nodeId">节点ID</param>
/// <param name="value">要写入的值</param>
/// <returns>写入是否成功</returns>
public bool WriteNode<T>(string nodeId, T value)
{
if (!IsConnected)
throw new InvalidOperationException("未连接到OPC UA服务器");
return _opcUaClient.WriteNode(nodeId, value);
}
/// <summary>
/// 批量写入多个节点的值
/// </summary>
/// <param name="nodeIds">节点ID数组</param>
/// <param name="values">要写入的值数组</param>
/// <returns>写入是否成功</returns>
public bool WriteNodes(string[] nodeIds, object[] values)
{
if (!IsConnected)
throw new InvalidOperationException("未连接到OPC UA服务器");
if (nodeIds.Length != values.Length)
throw new ArgumentException("节点ID数组和值数组的长度必须相同");
return _opcUaClient.WriteNodes(nodeIds, values);
}
#endregion
#region 订阅操作
/// <summary>
/// 添加节点订阅
/// </summary>
/// <param name="key">订阅的唯一标识符</param>
/// <param name="nodeId">要订阅的节点ID</param>
/// <param name="callback">数据变化时的回调</param>
/// <returns>订阅是否成功</returns>
public bool AddSubscription(string key, string nodeId, Action<string, MonitoredItem, MonitoredItemNotificationEventArgs> callback)
{
return AddSubscription(key, nodeId, callback, 1000, 1000);
}
/// <summary>
/// 添加多个节点的订阅
/// </summary>
/// <param name="key">订阅的唯一标识符</param>
/// <param name="nodeIds">要订阅的节点ID数组</param>
/// <param name="callback">数据变化时的回调</param>
/// <returns>订阅是否成功</returns>
public bool AddSubscription(string key, string[] nodeIds, Action<string, MonitoredItem, MonitoredItemNotificationEventArgs> callback)
{
return AddSubscription(key, nodeIds, callback, 1000, 1000);
}
/// <summary>
/// 移除订阅
/// </summary>
/// <param name="key">订阅的唯一标识符</param>
/// <returns>移除是否成功</returns>
public bool RemoveSubscription(string key)
{
if (!IsConnected)
return false;
try
{
// 从备份的订阅信息中移除
_subscriptionInfos.Remove(key);
if (_subscriptions.TryGetValue(key, out Subscription subscription))
{
subscription.Delete(true);
_opcUaClient.Session.RemoveSubscription(subscription);
_subscriptions.Remove(key);
return true;
}
return false;
}
catch (Exception ex)
{
Console.WriteLine($"移除订阅时发生错误: {ex.Message}");
return false;
}
}
/// <summary>
/// 获取当前活跃的订阅数量
/// </summary>
/// <returns>订阅数量</returns>
public int GetActiveSubscriptionCount()
{
return _subscriptions.Count;
}
/// <summary>
/// 获取已保存的订阅配置数量
/// </summary>
/// <returns>订阅配置数量</returns>
public int GetSavedSubscriptionCount()
{
return _subscriptionInfos.Count;
}
/// <summary>
/// 获取所有订阅的Key列表
/// </summary>
/// <returns>订阅Key列表</returns>
public List<string> GetSubscriptionKeys()
{
return _subscriptionInfos.Keys.ToList();
}
/// <summary>
/// 暂停所有订阅(禁用发布但保留配置)
/// </summary>
public void PauseAllSubscriptions()
{
if (!IsConnected)
return;
try
{
foreach (var subscription in _subscriptions.Values)
{
subscription.PublishingEnabled = false;
subscription.ApplyChanges();
}
}
catch (Exception ex)
{
Console.WriteLine($"暂停所有订阅时发生错误: {ex.Message}");
}
}
/// <summary>
/// 恢复所有已暂停的订阅
/// </summary>
public void ResumeAllSubscriptions()
{
if (!IsConnected)
return;
try
{
foreach (var subscription in _subscriptions.Values)
{
subscription.PublishingEnabled = true;
subscription.ApplyChanges();
}
}
catch (Exception ex)
{
Console.WriteLine($"恢复所有暂停的订阅时发生错误: {ex.Message}");
}
}
/// <summary>
/// 移除所有订阅
/// </summary>
public void RemoveAllSubscriptions()
{
if (!IsConnected)
return;
try
{
// 清空备份的订阅信息
_subscriptionInfos.Clear();
foreach (var subscription in _subscriptions.Values)
{
subscription.Delete(true);
_opcUaClient.Session.RemoveSubscription(subscription);
}
_subscriptions.Clear();
}
catch (Exception ex)
{
Console.WriteLine($"移除所有订阅时发生错误: {ex.Message}");
}
}
/// <summary>
/// 添加节点订阅(带自定义参数)
/// </summary>
/// <param name="key">订阅的唯一标识符</param>
/// <param name="nodeId">要订阅的节点ID</param>
/// <param name="callback">数据变化时的回调</param>
/// <param name="publishingInterval">发布间隔(毫秒)</param>
/// <param name="samplingInterval">采样间隔(毫秒)</param>
/// <returns>订阅是否成功</returns>
public bool AddSubscription(string key, string nodeId,
Action<string, MonitoredItem, MonitoredItemNotificationEventArgs> callback,
int publishingInterval = 1000, int samplingInterval = 1000)
{
if (!IsConnected)
throw new InvalidOperationException("未连接到OPC UA服务器");
try
{
// 保存订阅信息,用于断线重连后恢复
var subInfo = new SubscriptionInfo
{
Key = key,
NodeIds = new[] { nodeId },
Callback = callback,
PublishingInterval = publishingInterval,
SamplingInterval = samplingInterval
};
_subscriptionInfos[key] = subInfo;
// 创建一个新的订阅
Subscription subscription = new Subscription(_opcUaClient.Session.DefaultSubscription);
subscription.PublishingEnabled = true;
subscription.PublishingInterval = publishingInterval;
subscription.KeepAliveCount = 10;
subscription.LifetimeCount = 30;
subscription.Priority = 100;
// 创建监控项
MonitoredItem monitoredItem = new MonitoredItem
{
StartNodeId = new NodeId(nodeId),
AttributeId = Attributes.Value,
DisplayName = key,
SamplingInterval = samplingInterval
};
// 设置回调
monitoredItem.Notification += (item, args) => callback(key, item, args);
// 添加监控项到订阅
subscription.AddItem(monitoredItem);
// 在客户端中创建订阅
_opcUaClient.Session.AddSubscription(subscription);
subscription.Create();
// 添加到字典
_subscriptions[key] = subscription;
return true;
}
catch (Exception ex)
{
Console.WriteLine($"添加订阅时发生错误: {ex.Message}");
return false;
}
}
/// <summary>
/// 添加多个节点的订阅(带自定义参数)
/// </summary>
/// <param name="key">订阅的唯一标识符</param>
/// <param name="nodeIds">要订阅的节点ID数组</param>
/// <param name="callback">数据变化时的回调</param>
/// <param name="publishingInterval">发布间隔(毫秒)</param>
/// <param name="samplingInterval">采样间隔(毫秒)</param>
/// <returns>订阅是否成功</returns>
public bool AddSubscription(string key, string[] nodeIds,
Action<string, MonitoredItem, MonitoredItemNotificationEventArgs> callback,
int publishingInterval = 1000, int samplingInterval = 1000)
{
if (!IsConnected)
throw new InvalidOperationException("未连接到OPC UA服务器");
try
{
// 保存订阅信息,用于断线重连后恢复
var subInfo = new SubscriptionInfo
{
Key = key,
NodeIds = nodeIds,
Callback = callback,
PublishingInterval = publishingInterval,
SamplingInterval = samplingInterval
};
_subscriptionInfos[key] = subInfo;
// 创建一个新的订阅
Subscription subscription = new Subscription(_opcUaClient.Session.DefaultSubscription);
subscription.PublishingEnabled = true;
subscription.PublishingInterval = publishingInterval;
subscription.KeepAliveCount = 10;
subscription.LifetimeCount = 30;
subscription.Priority = 100;
// 遍历节点ID,为每个节点创建监控项
for (int i = 0; i < nodeIds.Length; i++)
{
MonitoredItem monitoredItem = new MonitoredItem
{
StartNodeId = new NodeId(nodeIds[i]),
AttributeId = Attributes.Value,
DisplayName = $"{key}_{i}",
SamplingInterval = samplingInterval
};
// 设置回调
monitoredItem.Notification += (item, args) => callback(key, item, args);
// 添加监控项到订阅
subscription.AddItem(monitoredItem);
}
// 在客户端中创建订阅
_opcUaClient.Session.AddSubscription(subscription);
subscription.Create();
// 添加到字典
_subscriptions[key] = subscription;
return true;
}
catch (Exception ex)
{
Console.WriteLine($"添加多节点订阅时发生错误: {ex.Message}");
return false;
}
}
#endregion
#region IDisposable支持
private bool disposedValue = false; // 要检测冗余调用
/// <summary>
/// 释放资源
/// </summary>
/// <param name="disposing">是否在释放托管资源</param>
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
// 释放托管状态(托管对象)
StopReconnection();
RemoveAllSubscriptions();
Disconnect();
_opcUaClient = null;
}
// 释放未托管的资源(未托管的对象)并在以下内容中替代终结器
// 将大型字段设置为 null
disposedValue = true;
}
}
/// <summary>
/// 实现IDisposable接口的Dispose方法
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
#endregion
#region 断线重连
/// <summary>
/// 断线重连配置
/// </summary>
public class ReconnectionConfig
{
/// <summary>
/// 是否启用自动重连
/// </summary>
public bool EnableAutoReconnect { get; set; } = true;
/// <summary>
/// 重连尝试间隔(毫秒)
/// </summary>
public int ReconnectInterval { get; set; } = 5000;
/// <summary>
/// 最大重连尝试次数,设为0表示无限次尝试
/// </summary>
public int MaxReconnectAttempts { get; set; } = 0;
/// <summary>
/// 是否在重连后恢复订阅
/// </summary>
public bool RestoreSubscriptionsAfterReconnect { get; set; } = true;
}
private ReconnectionConfig _reconnectionConfig = new ReconnectionConfig();
private System.Threading.Timer _reconnectionTimer;
private int _reconnectAttempts = 0;
private string _lastUsername;
private string _lastPassword;
private bool _isReconnecting = false;
// 存储订阅信息,用于断线重连后恢复
private class SubscriptionInfo
{
public string Key { get; set; }
public string[] NodeIds { get; set; }
public Action<string, MonitoredItem, MonitoredItemNotificationEventArgs> Callback { get; set; }
public int PublishingInterval { get; set; } = 1000;
public int SamplingInterval { get; set; } = 1000;
}
private Dictionary<string, SubscriptionInfo> _subscriptionInfos = new Dictionary<string, SubscriptionInfo>();
/// <summary>
/// 配置断线重连参数
/// </summary>
/// <param name="config">重连配置</param>
public void ConfigureReconnection(ReconnectionConfig config)
{
_reconnectionConfig = config ?? new ReconnectionConfig();
}
/// <summary>
/// 启动断线重连
/// </summary>
private void StartReconnection()
{
if (!_reconnectionConfig.EnableAutoReconnect || _isReconnecting)
return;
_isReconnecting = true;
_reconnectAttempts = 0;
// 停止已有的重连定时器
_reconnectionTimer?.Dispose();
// 创建新的重连定时器
_reconnectionTimer = new System.Threading.Timer(
async (state) => await ReconnectAsync(),
null,
_reconnectionConfig.ReconnectInterval,
_reconnectionConfig.ReconnectInterval);
OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs
{
IsConnected = false,
Message = "连接断开,开始尝试重连..."
});
}
/// <summary>
/// 尝试重新连接
/// </summary>
private async Task ReconnectAsync()
{
// 如果已经连接或已达到最大尝试次数,则停止重连
if (IsConnected ||
(_reconnectionConfig.MaxReconnectAttempts > 0 &&
_reconnectAttempts >= _reconnectionConfig.MaxReconnectAttempts))
{
StopReconnection();
return;
}
_reconnectAttempts++;
try
{
bool success;
// 使用上次连接的凭据进行重连
if (!string.IsNullOrEmpty(_lastUsername))
{
success = await ConnectServerAsync(ServerUrl, _lastUsername, _lastPassword);
}
else
{
success = await ConnectServerAsync(ServerUrl);
}
if (success)
{
OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs
{
IsConnected = true,
Message = $"重连成功,共尝试 {_reconnectAttempts} 次"
});
// 重连成功后恢复订阅
if (_reconnectionConfig.RestoreSubscriptionsAfterReconnect)
{
await RestoreSubscriptionsAsync();
}
StopReconnection();
}
else if (_reconnectionConfig.MaxReconnectAttempts > 0 &&
_reconnectAttempts >= _reconnectionConfig.MaxReconnectAttempts)
{
OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs
{
IsConnected = false,
Message = $"重连失败,已达到最大尝试次数: {_reconnectionConfig.MaxReconnectAttempts}"
});
StopReconnection();
}
else
{
OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs
{
IsConnected = false,
Message = $"重连尝试 {_reconnectAttempts} 失败,将在 {_reconnectionConfig.ReconnectInterval / 1000} 秒后重试"
});
}
}
catch (Exception ex)
{
OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs
{
IsConnected = false,
Message = $"重连尝试 {_reconnectAttempts} 出错: {ex.Message}"
});
}
}
/// <summary>
/// 恢复所有订阅
/// </summary>
private async Task RestoreSubscriptionsAsync()
{
try
{
// 清空当前的订阅集合,因为断线后原有的订阅对象已经失效
_subscriptions.Clear();
int successCount = 0;
int failureCount = 0;
// 从备份的订阅信息中恢复所有订阅
foreach (var info in _subscriptionInfos.Values)
{
try
{
bool success;
if (info.NodeIds.Length == 1)
{
// 单节点订阅
success = AddSubscription(
info.Key,
info.NodeIds[0],
info.Callback,
info.PublishingInterval,
info.SamplingInterval);
}
else
{
// 多节点订阅
success = AddSubscription(
info.Key,
info.NodeIds,
info.Callback,
info.PublishingInterval,
info.SamplingInterval);
}
if (success)
successCount++;
else
failureCount++;
}
catch (Exception ex)
{
failureCount++;
Console.WriteLine($"恢复订阅 {info.Key} 时出错: {ex.Message}");
}
}
OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs
{
IsConnected = true,
Message = $"已恢复 {successCount} 个订阅,{failureCount} 个恢复失败,共 {_subscriptionInfos.Count} 个"
});
}
catch (Exception ex)
{
OpcStatusChanged?.Invoke(this, new OpcStatusEventArgs
{
IsConnected = true,
Message = $"恢复订阅时发生错误: {ex.Message}"
});
}
}
/// <summary>
/// 停止重连过程
/// </summary>
private void StopReconnection()
{
_reconnectionTimer?.Dispose();
_reconnectionTimer = null;
_isReconnecting = false;
}
#endregion
}
/// <summary>
/// OPC状态事件参数
/// </summary>
public class OpcStatusEventArgs : EventArgs
{
/// <summary>
/// 是否已连接
/// </summary>
public bool IsConnected { get; set; }
/// <summary>
/// 状态消息
/// </summary>
public string Message { get; set; }
}
}
测试程序OpcUaServiceTest演示了如何使用OpcUaService类进行各种OPC UA操作。
C#private static async Task TestConnection(OpcUaService opcService)
{
Console.WriteLine("\n测试连接到OPC UA服务器...");
try
{
// 尝试匿名连接
bool connected = await opcService.ConnectServerAsync(ServerUrl);
if (connected)
{
Console.WriteLine($"成功连接到服务器: {ServerUrl}");
}
else
{
// 匿名连接失败,尝试用户名密码连接
Console.WriteLine("连接失败,尝试使用用户名和密码连接...");
// 获取用户输入的凭据
Console.Write("请输入用户名: ");
string username = Console.ReadLine();
Console.Write("请输入密码: ");
string password = Console.ReadLine();
connected = await opcService.ConnectServerAsync(ServerUrl, username, password);
// 输出连接结果
}
}
catch (Exception ex)
{
Console.WriteLine($"连接测试异常: {ex.Message}");
}
}
C#private static async Task TestReadOperations(OpcUaService opcService)
{
Console.WriteLine("\n测试读取操作...");
try
{
// 测试单个节点读取
Console.WriteLine($"读取节点: {TestNodeId}");
DataValue value = opcService.ReadNode(TestNodeId);
Console.WriteLine($"节点值: {value.Value}, 状态: {value.StatusCode}, 时间戳: {value.SourceTimestamp}");
// 测试泛型读取
Console.WriteLine($"\n泛型读取节点: {TestNodeId}");
var genericValue = opcService.ReadNode<object>(TestNodeId);
Console.WriteLine($"泛型读取值: {genericValue}");
// 测试异步读取
Console.WriteLine($"\n异步读取节点: {TestNodeId}");
var asyncValue = await opcService.ReadNodeAsync<object>(TestNodeId);
Console.WriteLine($"异步读取值: {asyncValue}");
// 测试批量读取
Console.WriteLine("\n批量读取多个节点");
string[] nodeIds = new string[] { TestNodeId, TestNodeId2 };
List<DataValue> values = opcService.ReadNodes(nodeIds);
for (int i = 0; i < nodeIds.Length; i++)
{
Console.WriteLine($"节点 {nodeIds[i]} 值: {values[i].Value}");
}
}
catch (Exception ex)
{
Console.WriteLine($"读取操作测试异常: {ex.Message}");
}
}
C#private static async Task TestWriteOperations(OpcUaService opcService)
{
Console.WriteLine("\n测试写入操作...");
try
{
// 先读取当前值
var currentValue = opcService.ReadNode<object>(TestNodeId);
Console.WriteLine($"节点 {TestNodeId} 当前值: {currentValue}");
// 根据数据类型写入不同的值
bool writeSuccess = false;
if (currentValue is bool boolValue)
{
// 布尔值取反
writeSuccess = opcService.WriteNode(TestNodeId, !boolValue);
// 输出信息...
}
else if (currentValue is int intValue)
{
// 整数值加1
writeSuccess = opcService.WriteNode(TestNodeId, intValue + 1);
// 输出信息...
}
// 其他类型处理...
if (writeSuccess)
{
// 再次读取验证写入
await Task.Delay(500);
var newValue = opcService.ReadNode<object>(TestNodeId);
Console.WriteLine($"写入后节点 {TestNodeId} 的新值: {newValue}");
}
// 测试批量写入
// 实现代码...
}
catch (Exception ex)
{
Console.WriteLine($"写入操作测试异常: {ex.Message}");
}
}
C#private static async Task TestSubscription(OpcUaService opcService)
{
Console.WriteLine("\n测试数据变更订阅...");
try
{
// 创建事件等待句柄
var dataChangedEvent = new ManualResetEventSlim(false);
// 添加订阅
string subscriptionKey = "TestSubscription";
bool subscribed = opcService.AddSubscription(subscriptionKey, TestNodeId,
(key, item, args) =>
{
if (args.NotificationValue is MonitoredItemNotification notification)
{
Console.WriteLine($"数据变更通知 - 键: {key}, 节点: {item.DisplayName}, " +
$"值: {notification.Value.Value}, 时间: {notification.Value.SourceTimestamp}");
dataChangedEvent.Set();
}
});
if (subscribed)
{
// 通过修改值触发订阅通知
// 实现代码...
// 测试多节点订阅
Console.WriteLine("\n测试多节点订阅...");
string multiSubscriptionKey = "MultiNodeSubscription";
string[] nodeIds = new string[] { TestNodeId, TestNodeId2 };
bool multiSubscribed = opcService.AddSubscription(multiSubscriptionKey, nodeIds,
(key, item, args) =>
{
// 回调实现...
});
// 等待通知和清理订阅
// 实现代码...
}
}
catch (Exception ex)
{
Console.WriteLine($"订阅测试异常: {ex.Message}");
}
}
测试程序中展示了如何配置断线重连功能:
C#// 在Main方法中
opcService.ConfigureReconnection(new ReconnectionConfig
{
EnableAutoReconnect = true,
ReconnectInterval = 3000, // 3秒
MaxReconnectAttempts = 0, // 无限尝试
RestoreSubscriptionsAfterReconnect = true // 重连后恢复订阅
});
// 注册状态变化事件
opcService.OpcStatusChanged += (sender, e) =>
{
Console.WriteLine($"OPC UA 状态变化: {e.Message}, 连接状态: {e.IsConnected}");
};
C#using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Opc.Ua;
using Opc.Ua.Client;
using Rick.Core.Services.OpcUa;
using static Rick.Core.Services.OpcUa.OpcUaService;
namespace OpcUaServiceTest
{
class Program
{
// OPC UA 服务器地址,请根据实际环境修改
private const string ServerUrl = "opc.tcp://127.0.0.1:49320";
// 测试节点ID,请根据您的OPC UA服务器修改
private const string TestNodeId = "ns=2;s=LMES.W1.Test1";
private const string TestNodeId2 = "ns=2;s=LMES.W1.Test2";
static async Task Main(string[] args)
{
Console.WriteLine("OPC UA 服务测试程序");
Console.WriteLine("===================");
// 创建OPC UA服务实例
using (OpcUaService opcService = new OpcUaService())
{
opcService.ConfigureReconnection(new ReconnectionConfig
{
EnableAutoReconnect = true,
ReconnectInterval = 3000, // 3 seconds
MaxReconnectAttempts = 0, // Try 10 times then give up
RestoreSubscriptionsAfterReconnect = true // 重连后恢复订阅
});
// 注册状态变化事件
opcService.OpcStatusChanged += (sender, e) =>
{
Console.WriteLine($"OPC UA 状态变化: {e.Message}, 连接状态: {e.IsConnected}");
};
// 测试连接
await TestConnection(opcService);
if (opcService.IsConnected)
{
// 测试读取操作
await TestReadOperations(opcService);
// 测试写入操作
await TestWriteOperations(opcService);
// 测试订阅操作
await TestSubscription(opcService);
Console.WriteLine("\n按任意键退出程序...");
Console.ReadKey();
}
}
}
private static async Task TestConnection(OpcUaService opcService)
{
Console.WriteLine("\n测试连接到OPC UA服务器...");
try
{
// 尝试匿名连接
bool connected = await opcService.ConnectServerAsync(ServerUrl);
if (connected)
{
Console.WriteLine($"成功连接到服务器: {ServerUrl}");
}
else
{
Console.WriteLine("连接失败,尝试使用用户名和密码连接...");
// 如果匿名连接失败,尝试使用用户名和密码连接
Console.Write("请输入用户名: ");
string username = Console.ReadLine();
Console.Write("请输入密码: ");
string password = Console.ReadLine();
connected = await opcService.ConnectServerAsync(ServerUrl, username, password);
if (connected)
{
Console.WriteLine($"使用凭据成功连接到服务器: {ServerUrl}");
}
else
{
Console.WriteLine("连接失败,请检查服务器地址和凭据");
}
}
}
catch (Exception ex)
{
Console.WriteLine($"连接测试异常: {ex.Message}");
}
}
private static async Task TestReadOperations(OpcUaService opcService)
{
Console.WriteLine("\n测试读取操作...");
try
{
// 测试单个节点读取
Console.WriteLine($"读取节点: {TestNodeId}");
DataValue value = opcService.ReadNode(TestNodeId);
Console.WriteLine($"节点值: {value.Value}, 状态: {value.StatusCode}, 时间戳: {value.SourceTimestamp}");
// 测试泛型读取
Console.WriteLine($"\n泛型读取节点: {TestNodeId}");
var genericValue = opcService.ReadNode<object>(TestNodeId);
Console.WriteLine($"泛型读取值: {genericValue}");
// 测试异步读取
Console.WriteLine($"\n异步读取节点: {TestNodeId}");
var asyncValue = await opcService.ReadNodeAsync<object>(TestNodeId);
Console.WriteLine($"异步读取值: {asyncValue}");
// 测试批量读取
Console.WriteLine("\n批量读取多个节点");
string[] nodeIds = new string[] { TestNodeId, TestNodeId2 };
List<DataValue> values = opcService.ReadNodes(nodeIds);
for (int i = 0; i < nodeIds.Length; i++)
{
Console.WriteLine($"节点 {nodeIds[i]} 值: {values[i].Value}");
}
}
catch (Exception ex)
{
Console.WriteLine($"读取操作测试异常: {ex.Message}");
}
}
private static async Task TestWriteOperations(OpcUaService opcService)
{
Console.WriteLine("\n测试写入操作...");
try
{
// 先读取当前值
var currentValue = opcService.ReadNode<object>(TestNodeId);
Console.WriteLine($"节点 {TestNodeId} 当前值: {currentValue}");
// 根据当前值类型尝试写入新值
bool writeSuccess = false;
if (currentValue is bool boolValue)
{
// 布尔值取反
writeSuccess = opcService.WriteNode(TestNodeId, !boolValue);
Console.WriteLine($"写入布尔值 {!boolValue} 到节点 {TestNodeId}");
}
else if (currentValue is int intValue)
{
// 整数值加1
writeSuccess = opcService.WriteNode(TestNodeId, intValue + 1);
Console.WriteLine($"写入整数值 {intValue + 1} 到节点 {TestNodeId}");
}
else if (currentValue is double doubleValue)
{
// 浮点值加1.5
writeSuccess = opcService.WriteNode(TestNodeId, doubleValue + 1.5);
Console.WriteLine($"写入浮点值 {doubleValue + 1.5} 到节点 {TestNodeId}");
}
else if (currentValue is string)
{
// 字符串值添加时间戳
string newValue = $"Test_{DateTime.Now:yyyyMMdd_HHmmss}";
writeSuccess = opcService.WriteNode(TestNodeId, newValue);
Console.WriteLine($"写入字符串值 \"{newValue}\" 到节点 {TestNodeId}");
}
else
{
Console.WriteLine($"不支持的数据类型: {currentValue?.GetType().Name ?? "null"}");
}
if (writeSuccess)
{
Console.WriteLine("写入成功");
// 再次读取以验证写入结果
await Task.Delay(500); // 等待一会儿确保写入完成
var newValue = opcService.ReadNode<object>(TestNodeId);
Console.WriteLine($"写入后节点 {TestNodeId} 的新值: {newValue}");
}
else
{
Console.WriteLine("写入失败");
}
// 测试批量写入
if (currentValue != null)
{
Console.WriteLine("\n测试批量写入");
string[] nodeIds = new string[] { TestNodeId, TestNodeId2 };
object[] values = new object[] { currentValue, currentValue };
bool batchWriteSuccess = opcService.WriteNodes(nodeIds, values);
Console.WriteLine($"批量写入结果: {(batchWriteSuccess ? "成功" : "失败")}");
}
}
catch (Exception ex)
{
Console.WriteLine($"写入操作测试异常: {ex.Message}");
}
}
private static async Task TestSubscription(OpcUaService opcService)
{
Console.WriteLine("\n测试数据变更订阅...");
try
{
// 创建一个事件重置事件,用于等待数据变更
var dataChangedEvent = new ManualResetEventSlim(false);
// 添加单个节点订阅
string subscriptionKey = "TestSubscription";
bool subscribed = opcService.AddSubscription(subscriptionKey, TestNodeId,
(key, item, args) =>
{
if (args.NotificationValue is MonitoredItemNotification notification)
{
Console.WriteLine($"数据变更通知 - 键: {key}, 节点: {item.DisplayName}, 值: {notification.Value.Value}, 时间: {notification.Value.SourceTimestamp}");
dataChangedEvent.Set();
}
});
if (subscribed)
{
Console.WriteLine($"成功订阅节点: {TestNodeId}");
Console.WriteLine("\n尝试修改节点值以触发订阅通知...");
// 读取当前值
var currentValue = opcService.ReadNode<object>(TestNodeId);
// 尝试写入新值以触发订阅通知
if (currentValue is bool boolValue)
{
opcService.WriteNode(TestNodeId, !boolValue);
}
else if (currentValue is int intValue)
{
opcService.WriteNode(TestNodeId, intValue + 1);
}
else if (currentValue is double doubleValue)
{
opcService.WriteNode(TestNodeId, doubleValue + 1.0);
}
else if (currentValue is string)
{
opcService.WriteNode(TestNodeId, $"Changed_{DateTime.Now:HHmmss}");
}
// 等待数据变更通知或超时
Console.WriteLine("等待数据变更通知...");
bool notificationReceived = dataChangedEvent.Wait(5000);
if (!notificationReceived)
{
Console.WriteLine("未收到数据变更通知,可能是值未实际变化或服务器未发送通知");
}
// 测试移除订阅
Console.WriteLine("\n测试移除订阅...");
bool unsubscribed = opcService.RemoveSubscription(subscriptionKey);
Console.WriteLine($"移除订阅结果: {(unsubscribed ? "成功" : "失败")}");
// 测试多节点订阅
Console.WriteLine("\n测试多节点订阅...");
string multiSubscriptionKey = "MultiNodeSubscription";
string[] nodeIds = new string[] { TestNodeId, TestNodeId2 };
bool multiSubscribed = opcService.AddSubscription(multiSubscriptionKey, nodeIds,
(key, item, args) =>
{
if (args.NotificationValue is MonitoredItemNotification notification)
{
Console.WriteLine($"多节点订阅通知 - 键: {key}, 节点: {item.DisplayName}, 值: {notification.Value.Value}");
}
});
Console.WriteLine($"多节点订阅结果: {(multiSubscribed ? "成功" : "失败")}");
// 等待一段时间以观察可能的通知
await Task.Delay(3000);
// 移除所有订阅
Console.WriteLine("\n移除所有订阅...");
opcService.RemoveAllSubscriptions();
Console.WriteLine("所有订阅已移除");
}
else
{
Console.WriteLine($"订阅节点失败: {TestNodeId}");
}
}
catch (Exception ex)
{
Console.WriteLine($"订阅测试异常: {ex.Message}");
}
}
}
}

此OPC UA客户端库适用于多种工业场景:
本文介绍的OpcUaService类提供了一个稳健的OPC UA通信框架,可用于开发各种工业物联网和自动化应用。通过封装复杂的OPC UA操作细节,提供简洁易用的API,同时实现了关键的可靠性和容错机制。
工业物联网通信需要兼顾性能、可靠性和安全性。本文展示的解决方案通过断线重连、订阅恢复和异常处理等机制,满足了工业环境下的高可靠性要求,是构建现代工业自动化系统的理想基础。
随着工业4.0的发展,基于OPC UA的通信将继续扮演关键角色,为工业设备互联互通提供稳固基础。掌握这一技术,将有助于工程师们开发出更加智能、高效的工业自动化解决方案。
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!