In the modern Industry 4.0 era, data acquisition systems have become the core infrastructure for enterprise digital transformation. Whether it's sensor monitoring on factory production lines, IoT device status collection, or real-time transaction data processing in financial systems, an efficient and stable data acquisition system is crucial.
However, many C# developers often encounter these pain points when building such systems: high data processing latency, excessive memory usage, improper anomalous data handling, and lack of effective quality control mechanisms. This article will teach you how to build a professional-grade data acquisition system using C# through a complete practical case study, thoroughly solving these technical challenges.
We adopt the Producer-Consumer Pattern combined with Channel asynchronous communication to build a three-layer data processing pipeline:
MarkdownData Generation Layer → Data Processing Layer → Data Aggregation Layer → UI Display Layer

The advantages of this design include:
C#using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace AppDataAcquisitionSystem
{
public class DataGenerator
{
private readonly Random _random;
private double _baseValue;
private double _trend;
public DataGenerator()
{
_random = new Random();
_baseValue = 50.0;
_trend = 0.0;
}
public async Task<DataPoint> GenerateDataPointAsync()
{
// Simulate real sensor data: base value + trend + noise + occasional anomalies
var noise = _random.NextDouble() * 2 - 1; // Noise from -1 to 1
var spike = _random.NextDouble() < 0.05 ? _random.NextDouble() * 20 - 10 : 0; // 5% probability of anomalous values
_trend += (_random.NextDouble() - 0.5) * 0.1;
_trend = Math.Max(-2, Math.Min(2, _trend));
_baseValue += _trend;
_baseValue = Math.Max(0, Math.Min(100, _baseValue)); // Limit base value range
var value = _baseValue + noise + spike;
return new DataPoint
{
Timestamp = DateTime.Now,
Value = value,
Quality = DataQuality.Good,
Source = "Sensor-001"
};
}
}
}
💡 Practical Tips:
_trend variable implements natural data fluctuation, closer to real sensorsC#public class DataProcessor
{
private readonly Channel<DataPoint> _rawDataChannel;
private readonly Channel<DataPoint> _processedDataChannel;
private readonly Channel<AggregatedData> _aggregatedDataChannel;
public async Task StartProcessingAsync(CancellationToken cancellationToken = default)
{
// 🚀 Parallel processing: data processing and aggregation run simultaneously
var processingTask = ProcessDataAsync(cancellationToken);
var aggregationTask = AggregateDataAsync(cancellationToken);
await Task.WhenAll(processingTask, aggregationTask);
}
private async Task ProcessDataAsync(CancellationToken cancellationToken)
{
await foreach (var dataPoint in _rawDataChannel.Reader.ReadAllAsync(cancellationToken))
{
var processedPoint = ProcessSingleDataPoint(dataPoint);
if (processedPoint != null)
{
await _processedDataChannel.Writer.WriteAsync(processedPoint, cancellationToken);
}
}
}
}
🔧 Key Technical Points:
IAsyncEnumerable makes data stream processing more elegantC#private DataQuality PerformQualityCheck(DataPoint dataPoint)
{
// 🛡️ Basic numeric checks
if (double.IsNaN(dataPoint.Value) || double.IsInfinity(dataPoint.Value))
return DataQuality.Bad;
// 📊 Numeric range validation
if (Math.Abs(dataPoint.Value) > 1000)
return DataQuality.Bad;
// ⚡ Change rate check - prevent sudden data changes
if (_lastValue != null)
{
var timeDiff = (dataPoint.Timestamp - _lastValue.Timestamp).TotalSeconds;
if (timeDiff > 0)
{
var changeRate = Math.Abs(dataPoint.Value - _lastValue.Value) / timeDiff;
if (changeRate > 100) // Change rate threshold
return DataQuality.Uncertain;
}
}
return DataQuality.Good;
}
C#private async Task AggregateDataAsync(CancellationToken cancellationToken)
{
await foreach (var dataPoint in _processedDataChannel.Reader.ReadAllAsync(cancellationToken))
{
_windowBuffer.Add(dataPoint);
if (_windowBuffer.Count >= _config.WindowSize)
{
var aggregated = CalculateAggregatedData(_windowBuffer);
await _aggregatedDataChannel.Writer.WriteAsync(aggregated, cancellationToken);
// 🎯 Sliding window optimization: retain partial historical data
var keepCount = _config.WindowSize / 2;
_windowBuffer.RemoveRange(0, _windowBuffer.Count - keepCount);
}
}
}
💡 Sliding Window Advantages:
C#using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;
using ScottPlot;
using Timer = System.Threading.Timer;
namespace AppDataAcquisitionSystem
{
public partial class Form1 : Form
{
private DataGenerator _dataGenerator;
private DataProcessor _dataProcessor;
private AcquisitionConfig _config;
private Timer _acquisitionTimer;
private CancellationTokenSource _cancellationTokenSource;
// Data storage
private List<DataPoint> _realTimeData;
private List<AggregatedData> _aggregatedData;
// Chart data
private List<double> _realTimeValues;
private List<DateTime> _realTimeTimes;
private List<double> _aggregatedValues;
private List<DateTime> _aggregatedTimes;
private const int MaxDisplayPoints = 100;
public Form1()
{
InitializeComponent();
InitializeSystem();
InitializePlots();
AttachEventHandlers();
}
private void InitializeSystem()
{
_realTimeData = new List<DataPoint>();
_aggregatedData = new List<AggregatedData>();
_realTimeValues = new List<double>();
_realTimeTimes = new List<DateTime>();
_aggregatedValues = new List<double>();
_aggregatedTimes = new List<DateTime>();
_dataGenerator = new DataGenerator();
UpdateConfiguration();
}
private void InitializePlots()
{
plotRealTime.Font = new Font("Arial", 12);
// Configure real-time data chart
plotRealTime.Plot.Title("Real-time Data");
plotRealTime.Plot.Font.Set("Arial");
plotRealTime.Plot.XLabel("Time");
plotRealTime.Plot.YLabel("Value");
// Configure aggregated data chart
plotAggregated.Font = new Font("Arial", 12);
plotAggregated.Plot.Title("Aggregated Data");
plotAggregated.Plot.Font.Set("Arial");
plotAggregated.Plot.XLabel("Time");
plotAggregated.Plot.YLabel("Aggregated Value");
}
private void AttachEventHandlers()
{
btnStart.Click += BtnStart_Click;
btnStop.Click += BtnStop_Click;
btnClear.Click += BtnClear_Click;
// Configuration change events
nudSampleRate.ValueChanged += (s, e) => UpdateConfiguration();
nudNoiseThreshold.ValueChanged += (s, e) => UpdateConfiguration();
nudWindowSize.ValueChanged += (s, e) => UpdateConfiguration();
chkQualityCheck.CheckedChanged += (s, e) => UpdateConfiguration();
chkDenoising.CheckedChanged += (s, e) => UpdateConfiguration();
cmbAggregationType.SelectedIndexChanged += (s, e) => UpdateConfiguration();
}
private void UpdateConfiguration()
{
_config = new AcquisitionConfig
{
SampleRateMs = (int)nudSampleRate.Value,
NoiseThreshold = (double)nudNoiseThreshold.Value,
WindowSize = (int)nudWindowSize.Value,
EnableQualityCheck = chkQualityCheck.Checked,
EnableDenoising = chkDenoising.Checked,
AggregationType = (AggregationType)cmbAggregationType.SelectedIndex
};
}
private async void BtnStart_Click(object sender, EventArgs e)
{
try
{
btnStart.Enabled = false;
btnStop.Enabled = true;
UpdateConfiguration();
_dataProcessor = new DataProcessor(_config);
_cancellationTokenSource = new CancellationTokenSource();
// Start data processing
var processingTask = _dataProcessor.StartProcessingAsync(_cancellationTokenSource.Token);
// Start data reading task
var readingTask = StartDataReadingAsync(_cancellationTokenSource.Token);
// Start data acquisition timer
_acquisitionTimer = new Timer(OnDataAcquisitionTimer, null, 0, _config.SampleRateMs);
UpdateStatus("System started");
await Task.WhenAny(processingTask, readingTask);
}
catch (Exception ex)
{
UpdateStatus($"Startup failed: {ex.Message}");
btnStart.Enabled = true;
btnStop.Enabled = false;
}
}
private void BtnStop_Click(object sender, EventArgs e)
{
StopAcquisition();
}
private void BtnClear_Click(object sender, EventArgs e)
{
_realTimeData.Clear();
_aggregatedData.Clear();
_realTimeValues.Clear();
_realTimeTimes.Clear();
_aggregatedValues.Clear();
_aggregatedTimes.Clear();
plotRealTime.Plot.Clear();
plotAggregated.Plot.Clear();
plotRealTime.Refresh();
plotAggregated.Refresh();
UpdateStatus("Data cleared");
}
private void StopAcquisition()
{
try
{
_acquisitionTimer?.Dispose();
_cancellationTokenSource?.Cancel();
_dataProcessor?.StopProcessing();
btnStart.Enabled = true;
btnStop.Enabled = false;
UpdateStatus("System stopped");
}
catch (Exception ex)
{
UpdateStatus($"Stop failed: {ex.Message}");
}
}
private async void OnDataAcquisitionTimer(object state)
{
try
{
var dataPoint = await _dataGenerator.GenerateDataPointAsync();
await _dataProcessor.AddRawDataAsync(dataPoint);
}
catch (Exception ex)
{
BeginInvoke(new Action(() => UpdateStatus($"Data acquisition error: {ex.Message}")));
}
}
private async Task StartDataReadingAsync(CancellationToken cancellationToken)
{
// Start processed data reading
var processedTask = ReadProcessedDataAsync(cancellationToken);
var aggregatedTask = ReadAggregatedDataAsync(cancellationToken);
await Task.WhenAll(processedTask, aggregatedTask);
}
private async Task ReadProcessedDataAsync(CancellationToken cancellationToken)
{
try
{
await foreach (var dataPoint in _dataProcessor.ProcessedDataReader.ReadAllAsync(cancellationToken))
{
BeginInvoke(new Action(() =>
{
_realTimeData.Add(dataPoint);
_realTimeValues.Add(dataPoint.Value);
_realTimeTimes.Add(dataPoint.Timestamp);
// Limit display points
if (_realTimeValues.Count > MaxDisplayPoints)
{
_realTimeValues.RemoveAt(0);
_realTimeTimes.RemoveAt(0);
}
UpdateRealTimePlot();
UpdateStatus($"Real-time data: {dataPoint.Value:F2} (Quality: {dataPoint.Quality})");
}));
}
}
catch (OperationCanceledException)
{
// Normal cancellation
}
}
private async Task ReadAggregatedDataAsync(CancellationToken cancellationToken)
{
try
{
await foreach (var aggregatedData in _dataProcessor.AggregatedDataReader.ReadAllAsync(cancellationToken))
{
BeginInvoke(new Action(() =>
{
_aggregatedData.Add(aggregatedData);
_aggregatedValues.Add(aggregatedData.Value);
_aggregatedTimes.Add(aggregatedData.WindowEnd);
// Limit display points
if (_aggregatedValues.Count > MaxDisplayPoints)
{
_aggregatedValues.RemoveAt(0);
_aggregatedTimes.RemoveAt(0);
}
UpdateAggregatedPlot();
UpdateStatus($"Aggregated data ({aggregatedData.Type}): {aggregatedData.Value:F2} (Sample count: {aggregatedData.SampleCount})");
}));
}
}
catch (OperationCanceledException)
{
// Normal cancellation
}
}
private void UpdateRealTimePlot()
{
if (_realTimeValues.Count == 0) return;
plotRealTime.Plot.Clear();
var times = _realTimeTimes.Select(t => t.ToOADate()).ToArray();
var values = _realTimeValues.ToArray();
var scatter = plotRealTime.Plot.Add.Scatter(times, values);
scatter.Color = Colors.Blue;
scatter.LineWidth = 2;
plotRealTime.Plot.Axes.DateTimeTicksBottom();
plotRealTime.Plot.Axes.AutoScale();
plotRealTime.Refresh();
}
private void UpdateAggregatedPlot()
{
if (_aggregatedValues.Count == 0) return;
plotAggregated.Plot.Clear();
var times = _aggregatedTimes.Select(t => t.ToOADate()).ToArray();
var values = _aggregatedValues.ToArray();
var scatter = plotAggregated.Plot.Add.Scatter(times, values);
scatter.Color = Colors.Red;
scatter.LineWidth = 3;
plotAggregated.Plot.Axes.DateTimeTicksBottom();
plotAggregated.Plot.Axes.AutoScale();
plotAggregated.Refresh();
}
private void UpdateStatus(string message)
{
if (InvokeRequired)
{
BeginInvoke(new Action(() => UpdateStatus(message)));
return;
}
txtStatus.AppendText($"[{DateTime.Now:HH:mm:ss}] {message}\r\n");
txtStatus.ScrollToCaret();
}
protected override void OnFormClosing(FormClosingEventArgs e)
{
StopAcquisition();
base.OnFormClosing(e);
}
}
}

C#private const int MaxDisplayPoints = 100;
// Limit display points to prevent memory leaks
if (_realTimeValues.Count > MaxDisplayPoints)
{
_realTimeValues.RemoveAt(0);
_realTimeTimes.RemoveAt(0);
}
C#// ✅ Correct cancellation token usage
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
// ✅ Graceful resource cleanup
protected override void OnFormClosing(FormClosingEventArgs e)
{
StopAcquisition(); // Ensure all async tasks stop properly
base.OnFormClosing(e);
}
This data acquisition system can be widely applied to:
A: Use bounded Channels with reasonable capacity limits:
C#var options = new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = false
};
_channel = Channel.CreateBounded<DataPoint>(options);
A: Adopt batch processing strategy:
C#private readonly List<DataPoint> _batchBuffer = new List<DataPoint>();
private const int BatchSize = 50;
// Batch processing for improved efficiency
if (_batchBuffer.Count >= BatchSize)
{
await ProcessBatchAsync(_batchBuffer);
_batchBuffer.Clear();
}
Through this practical case study, we've mastered three core technologies for building efficient data acquisition systems:
This architecture not only solves the performance bottlenecks of traditional data acquisition systems but also provides powerful extensibility. You can easily add new data sources, processing algorithms, or visualization components.
💬 Interactive Discussion:
If this article helps your C# development work, please share it with more colleagues to help us improve .NET technology skills together! Follow me for more C# practical content!
🔖 Collection-grade code templates are ready, recommend bookmarking for future use!
相关信息
通过网盘分享的文件:AppDataAcquisitionSystem.zip 链接: https://pan.baidu.com/s/1YMZl9u6rMGwIalC0_ohB9g?pwd=jrkg 提取码: jrkg --来自百度网盘超级会员v9的分享:::
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!