在现代工业4.0时代,数据采集系统已成为企业数字化转型的核心基础设施。无论是工厂生产线的传感器监控、物联网设备的状态采集,还是金融系统的实时交易数据处理,一个高效、稳定的数据采集系统都至关重要。
然而,许多C#开发者在构建此类系统时常常遇到这些痛点:数据处理延迟高、内存占用过大、异常数据处理不当、缺乏有效的质量控制机制。本文将通过一个完整的实战案例,教你如何用C#构建一个专业级的数据采集系统,彻底解决这些技术难题。
我们采用生产者-消费者模式结合Channel异步通信,构建了一个三层数据处理管道:
Markdown数据生成层 → 数据处理层 → 数据聚合层 → UI展示层

这种设计的优势在于:
C#using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace AppDataAcquisitionSystem
{
public class DataGenerator
{
private readonly Random _random;
private double _baseValue;
private double _trend;
public DataGenerator()
{
_random = new Random();
_baseValue = 50.0;
_trend = 0.0;
}
public async Task<DataPoint> GenerateDataPointAsync()
{
// 模拟真实的传感器数据:基础值 + 趋势 + 噪声 + 偶尔的异常值
var noise = _random.NextDouble() * 2 - 1; // -1 到 1 的噪声
var spike = _random.NextDouble() < 0.05 ? _random.NextDouble() * 20 - 10 : 0; // 5%概率出现异常值
_trend += (_random.NextDouble() - 0.5) * 0.1;
_trend = Math.Max(-2, Math.Min(2, _trend));
_baseValue += _trend;
_baseValue = Math.Max(0, Math.Min(100, _baseValue)); // 限制基础值范围
var value = _baseValue + noise + spike;
return new DataPoint
{
Timestamp = DateTime.Now,
Value = value,
Quality = DataQuality.Good,
Source = "Sensor-001"
};
}
}
}
💡 实战技巧:
_trend变量实现数据的自然波动,更贴近真实传感器C#public class DataProcessor
{
private readonly Channel<DataPoint> _rawDataChannel;
private readonly Channel<DataPoint> _processedDataChannel;
private readonly Channel<AggregatedData> _aggregatedDataChannel;
public async Task StartProcessingAsync(CancellationToken cancellationToken = default)
{
// 🚀 并行处理:数据处理和聚合同时进行
var processingTask = ProcessDataAsync(cancellationToken);
var aggregationTask = AggregateDataAsync(cancellationToken);
await Task.WhenAll(processingTask, aggregationTask);
}
private async Task ProcessDataAsync(CancellationToken cancellationToken)
{
await foreach (var dataPoint in _rawDataChannel.Reader.ReadAllAsync(cancellationToken))
{
var processedPoint = ProcessSingleDataPoint(dataPoint);
if (processedPoint != null)
{
await _processedDataChannel.Writer.WriteAsync(processedPoint, cancellationToken);
}
}
}
}
🔧 关键技术点:
IAsyncEnumerable让数据流处理更加优雅C#private DataQuality PerformQualityCheck(DataPoint dataPoint)
{
// 🛡️ 基础数值检查
if (double.IsNaN(dataPoint.Value) || double.IsInfinity(dataPoint.Value))
return DataQuality.Bad;
// 📊 数值范围验证
if (Math.Abs(dataPoint.Value) > 1000)
return DataQuality.Bad;
// ⚡ 变化率检查 - 防止突变数据
if (_lastValue != null)
{
var timeDiff = (dataPoint.Timestamp - _lastValue.Timestamp).TotalSeconds;
if (timeDiff > 0)
{
var changeRate = Math.Abs(dataPoint.Value - _lastValue.Value) / timeDiff;
if (changeRate > 100) // 变化率阈值
return DataQuality.Uncertain;
}
}
return DataQuality.Good;
}
C#private async Task AggregateDataAsync(CancellationToken cancellationToken)
{
await foreach (var dataPoint in _processedDataChannel.Reader.ReadAllAsync(cancellationToken))
{
_windowBuffer.Add(dataPoint);
if (_windowBuffer.Count >= _config.WindowSize)
{
var aggregated = CalculateAggregatedData(_windowBuffer);
await _aggregatedDataChannel.Writer.WriteAsync(aggregated, cancellationToken);
// 🎯 滑动窗口优化:保留部分历史数据
var keepCount = _config.WindowSize / 2;
_windowBuffer.RemoveRange(0, _windowBuffer.Count - keepCount);
}
}
}
💡 滑动窗口优势:
C#using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using ScottPlot;
using Timer = System.Threading.Timer;
namespace AppDataAcquisitionSystem
{
public partial class Form1 : Form
{
private DataGenerator _dataGenerator;
private DataProcessor _dataProcessor;
private AcquisitionConfig _config;
private Timer _acquisitionTimer;
private CancellationTokenSource _cancellationTokenSource;
// 数据存储
private List<DataPoint> _realTimeData;
private List<AggregatedData> _aggregatedData;
// 图表数据
private List<double> _realTimeValues;
private List<DateTime> _realTimeTimes;
private List<double> _aggregatedValues;
private List<DateTime> _aggregatedTimes;
private const int MaxDisplayPoints = 100;
public Form1()
{
InitializeComponent();
InitializeSystem();
InitializePlots();
AttachEventHandlers();
}
private void InitializeSystem()
{
_realTimeData = new List<DataPoint>();
_aggregatedData = new List<AggregatedData>();
_realTimeValues = new List<double>();
_realTimeTimes = new List<DateTime>();
_aggregatedValues = new List<double>();
_aggregatedTimes = new List<DateTime>();
_dataGenerator = new DataGenerator();
UpdateConfiguration();
}
private void InitializePlots()
{
plotRealTime.Font = new Font("SimSun", 12);
// 配置实时数据图表
plotRealTime.Plot.Title("实时数据");
plotRealTime.Plot.Font.Set("SimSun");
plotRealTime.Plot.XLabel("时间");
plotRealTime.Plot.YLabel("数值");
// 配置聚合数据图表
plotAggregated.Font = new Font("SimSun", 12);
plotAggregated.Plot.Title("聚合数据");
plotAggregated.Plot.Font.Set("SimSun");
plotAggregated.Plot.XLabel("时间");
plotAggregated.Plot.YLabel("聚合值");
}
private void AttachEventHandlers()
{
btnStart.Click += BtnStart_Click;
btnStop.Click += BtnStop_Click;
btnClear.Click += BtnClear_Click;
// 配置变更事件
nudSampleRate.ValueChanged += (s, e) => UpdateConfiguration();
nudNoiseThreshold.ValueChanged += (s, e) => UpdateConfiguration();
nudWindowSize.ValueChanged += (s, e) => UpdateConfiguration();
chkQualityCheck.CheckedChanged += (s, e) => UpdateConfiguration();
chkDenoising.CheckedChanged += (s, e) => UpdateConfiguration();
cmbAggregationType.SelectedIndexChanged += (s, e) => UpdateConfiguration();
}
private void UpdateConfiguration()
{
_config = new AcquisitionConfig
{
SampleRateMs = (int)nudSampleRate.Value,
NoiseThreshold = (double)nudNoiseThreshold.Value,
WindowSize = (int)nudWindowSize.Value,
EnableQualityCheck = chkQualityCheck.Checked,
EnableDenoising = chkDenoising.Checked,
AggregationType = (AggregationType)cmbAggregationType.SelectedIndex
};
}
private async void BtnStart_Click(object sender, EventArgs e)
{
try
{
btnStart.Enabled = false;
btnStop.Enabled = true;
UpdateConfiguration();
_dataProcessor = new DataProcessor(_config);
_cancellationTokenSource = new CancellationTokenSource();
// 启动数据处理
var processingTask = _dataProcessor.StartProcessingAsync(_cancellationTokenSource.Token);
// 启动数据读取任务
var readingTask = StartDataReadingAsync(_cancellationTokenSource.Token);
// 启动数据采集定时器
_acquisitionTimer = new Timer(OnDataAcquisitionTimer, null, 0, _config.SampleRateMs);
UpdateStatus("系统已启动");
await Task.WhenAny(processingTask, readingTask);
}
catch (Exception ex)
{
UpdateStatus($"启动失败: {ex.Message}");
btnStart.Enabled = true;
btnStop.Enabled = false;
}
}
private void BtnStop_Click(object sender, EventArgs e)
{
StopAcquisition();
}
private void BtnClear_Click(object sender, EventArgs e)
{
_realTimeData.Clear();
_aggregatedData.Clear();
_realTimeValues.Clear();
_realTimeTimes.Clear();
_aggregatedValues.Clear();
_aggregatedTimes.Clear();
plotRealTime.Plot.Clear();
plotAggregated.Plot.Clear();
plotRealTime.Refresh();
plotAggregated.Refresh();
UpdateStatus("数据已清除");
}
private void StopAcquisition()
{
try
{
_acquisitionTimer?.Dispose();
_cancellationTokenSource?.Cancel();
_dataProcessor?.StopProcessing();
btnStart.Enabled = true;
btnStop.Enabled = false;
UpdateStatus("系统已停止");
}
catch (Exception ex)
{
UpdateStatus($"停止失败: {ex.Message}");
}
}
private async void OnDataAcquisitionTimer(object state)
{
try
{
var dataPoint = await _dataGenerator.GenerateDataPointAsync();
await _dataProcessor.AddRawDataAsync(dataPoint);
}
catch (Exception ex)
{
BeginInvoke(new Action(() => UpdateStatus($"数据采集错误: {ex.Message}")));
}
}
private async Task StartDataReadingAsync(CancellationToken cancellationToken)
{
// 启动处理后数据读取
var processedTask = ReadProcessedDataAsync(cancellationToken);
var aggregatedTask = ReadAggregatedDataAsync(cancellationToken);
await Task.WhenAll(processedTask, aggregatedTask);
}
private async Task ReadProcessedDataAsync(CancellationToken cancellationToken)
{
try
{
await foreach (var dataPoint in _dataProcessor.ProcessedDataReader.ReadAllAsync(cancellationToken))
{
BeginInvoke(new Action(() =>
{
_realTimeData.Add(dataPoint);
_realTimeValues.Add(dataPoint.Value);
_realTimeTimes.Add(dataPoint.Timestamp);
// 限制显示点数
if (_realTimeValues.Count > MaxDisplayPoints)
{
_realTimeValues.RemoveAt(0);
_realTimeTimes.RemoveAt(0);
}
UpdateRealTimePlot();
UpdateStatus($"实时数据: {dataPoint.Value:F2} (质量: {dataPoint.Quality})");
}));
}
}
catch (OperationCanceledException)
{
// 正常取消
}
}
private async Task ReadAggregatedDataAsync(CancellationToken cancellationToken)
{
try
{
await foreach (var aggregatedData in _dataProcessor.AggregatedDataReader.ReadAllAsync(cancellationToken))
{
BeginInvoke(new Action(() =>
{
_aggregatedData.Add(aggregatedData);
_aggregatedValues.Add(aggregatedData.Value);
_aggregatedTimes.Add(aggregatedData.WindowEnd);
// 限制显示点数
if (_aggregatedValues.Count > MaxDisplayPoints)
{
_aggregatedValues.RemoveAt(0);
_aggregatedTimes.RemoveAt(0);
}
UpdateAggregatedPlot();
UpdateStatus($"聚合数据 ({aggregatedData.Type}): {aggregatedData.Value:F2} (样本数: {aggregatedData.SampleCount})");
}));
}
}
catch (OperationCanceledException)
{
// 正常取消
}
}
private void UpdateRealTimePlot()
{
if (_realTimeValues.Count == 0) return;
plotRealTime.Plot.Clear();
var times = _realTimeTimes.Select(t => t.ToOADate()).ToArray();
var values = _realTimeValues.ToArray();
var scatter = plotRealTime.Plot.Add.Scatter(times, values);
scatter.Color = Colors.Blue;
scatter.LineWidth = 2;
plotRealTime.Plot.Axes.DateTimeTicksBottom();
plotRealTime.Plot.Axes.AutoScale();
plotRealTime.Refresh();
}
private void UpdateAggregatedPlot()
{
if (_aggregatedValues.Count == 0) return;
plotAggregated.Plot.Clear();
var times = _aggregatedTimes.Select(t => t.ToOADate()).ToArray();
var values = _aggregatedValues.ToArray();
var scatter = plotAggregated.Plot.Add.Scatter(times, values);
scatter.Color = Colors.Red;
scatter.LineWidth = 3;
plotAggregated.Plot.Axes.DateTimeTicksBottom();
plotAggregated.Plot.Axes.AutoScale();
plotAggregated.Refresh();
}
private void UpdateStatus(string message)
{
if (InvokeRequired)
{
BeginInvoke(new Action(() => UpdateStatus(message)));
return;
}
txtStatus.AppendText($"[{DateTime.Now:HH:mm:ss}] {message}\r\n");
txtStatus.ScrollToCaret();
}
protected override void OnFormClosing(FormClosingEventArgs e)
{
StopAcquisition();
base.OnFormClosing(e);
}
}
}

C#private const int MaxDisplayPoints = 100;
// 限制显示点数,防止内存泄漏
if (_realTimeValues.Count > MaxDisplayPoints)
{
_realTimeValues.RemoveAt(0);
_realTimeTimes.RemoveAt(0);
}
C#// ✅ 正确的取消令牌使用
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
// ✅ 优雅的资源清理
protected override void OnFormClosing(FormClosingEventArgs e)
{
StopAcquisition(); // 确保所有异步任务正确停止
base.OnFormClosing(e);
}
这套数据采集系统可以广泛应用于:
A: 使用有界Channel并设置合理的容量限制:
C#var options = new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false
};
_channel = Channel.CreateBounded<DataPoint>(options);
A: 采用批处理策略:
C#private readonly List<DataPoint> _batchBuffer = new List<DataPoint>();
private const int BatchSize = 50;
// 批量处理提高效率
if (_batchBuffer.Count >= BatchSize)
{
await ProcessBatchAsync(_batchBuffer);
_batchBuffer.Clear();
}
通过本文的实战案例,我们掌握了构建高效数据采集系统的三大核心技术:
这套架构不仅解决了传统数据采集系统的性能瓶颈,还提供了强大的扩展性。你可以轻松添加新的数据源、处理算法或可视化组件。
💬 互动讨论:
如果这篇文章对你的C#开发工作有帮助,请转发给更多同行,让我们一起提升.NET技术水平!关注我,获取更多C#实战干货!
🔖 收藏级代码模板已整理完毕,建议收藏备用!
相关信息
通过网盘分享的文件:AppDataAcquisitionSystem.zip 链接: https://pan.baidu.com/s/1YMZl9u6rMGwIalC0_ohB9g?pwd=jrkg 提取码: jrkg --来自百度网盘超级会员v9的分享
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!