2025-11-18
C#
00

目录

through a complete practical case study, thoroughly solving these technical challenges.
🎯 System Architecture Design: Three-Layer Data Pipeline
Core Architecture Concept
🔥 Core Technology Implementation Details
1️⃣ Smart Data Generator: Simulating Real Sensors
2️⃣ High-Performance Data Processor: Channel + Async Stream Processing
3️⃣ Smart Quality Control: Multi-dimensional Data Validation
4️⃣ Real-time Data Aggregation: Sliding Window Algorithm
📊 UI Interface: Real-time Visualization Display
ScottPlot Integration Implementation
⚡ Core Performance Optimization Techniques
1️⃣ Memory Management Optimization
2️⃣ Asynchronous Operation Best Practices
🛠️ Real-World Application Scenarios
🔍 Common Issues and Solutions
Q1: Backpressure Issues in Channel Usage?
Q2: Performance Bottlenecks with Large Data Volumes?
🎉 Summary and Future Outlook
practical content!

C# Data Acquisition System in Action: Building High-Performance Real-Time Data Processing Pipeline from Scratch

In the modern Industry 4.0 era, data acquisition systems have become the core infrastructure for enterprise digital transformation. Whether it's sensor monitoring on factory production lines, IoT device status collection, or real-time transaction data processing in financial systems, an efficient and stable data acquisition system is crucial.

However, many C# developers often encounter these pain points when building such systems: high data processing latency, excessive memory usage, improper anomalous data handling, and lack of effective quality control mechanisms. This article will teach you how to build a professional-grade data acquisition system using C# through a complete practical case study, thoroughly solving these technical challenges.

🎯 System Architecture Design: Three-Layer Data Pipeline

Core Architecture Concept

We adopt the Producer-Consumer Pattern combined with Channel asynchronous communication to build a three-layer data processing pipeline:

Markdown
Data Generation Layer → Data Processing Layer → Data Aggregation Layer → UI Display Layer

image.png

The advantages of this design include:

  • Strong Decoupling: Each layer has a single responsibility, making it easy to maintain and extend
  • Excellent Performance: Channel-based asynchronous processing supporting high concurrency
  • Fault Tolerance: Single-layer failures don't affect the entire system operation

🔥 Core Technology Implementation Details

1️⃣ Smart Data Generator: Simulating Real Sensors

C#
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace AppDataAcquisitionSystem { public class DataGenerator { private readonly Random _random; private double _baseValue; private double _trend; public DataGenerator() { _random = new Random(); _baseValue = 50.0; _trend = 0.0; } public async Task<DataPoint> GenerateDataPointAsync() { // Simulate real sensor data: base value + trend + noise + occasional anomalies var noise = _random.NextDouble() * 2 - 1; // Noise from -1 to 1 var spike = _random.NextDouble() < 0.05 ? _random.NextDouble() * 20 - 10 : 0; // 5% probability of anomalous values _trend += (_random.NextDouble() - 0.5) * 0.1; _trend = Math.Max(-2, Math.Min(2, _trend)); _baseValue += _trend; _baseValue = Math.Max(0, Math.Min(100, _baseValue)); // Limit base value range var value = _baseValue + noise + spike; return new DataPoint { Timestamp = DateTime.Now, Value = value, Quality = DataQuality.Good, Source = "Sensor-001" }; } } }

💡 Practical Tips:

  • The _trend variable implements natural data fluctuation, closer to real sensors
  • Anomaly probability controlled at 5%, simulating occasional anomalies in real environments
  • Data range limitation prevents numerical overflow issues

2️⃣ High-Performance Data Processor: Channel + Async Stream Processing

C#
public class DataProcessor { private readonly Channel<DataPoint> _rawDataChannel; private readonly Channel<DataPoint> _processedDataChannel; private readonly Channel<AggregatedData> _aggregatedDataChannel; public async Task StartProcessingAsync(CancellationToken cancellationToken = default) { // 🚀 Parallel processing: data processing and aggregation run simultaneously var processingTask = ProcessDataAsync(cancellationToken); var aggregationTask = AggregateDataAsync(cancellationToken); await Task.WhenAll(processingTask, aggregationTask); } private async Task ProcessDataAsync(CancellationToken cancellationToken) { await foreach (var dataPoint in _rawDataChannel.Reader.ReadAllAsync(cancellationToken)) { var processedPoint = ProcessSingleDataPoint(dataPoint); if (processedPoint != null) { await _processedDataChannel.Writer.WriteAsync(processedPoint, cancellationToken); } } } }

🔧 Key Technical Points:

  • Channel Advantages: Compared to traditional queues, Channel provides better async support and backpressure control
  • Async Stream Processing: IAsyncEnumerable makes data stream processing more elegant
  • Parallel Design: Processing and aggregation run simultaneously, maximizing system throughput

3️⃣ Smart Quality Control: Multi-dimensional Data Validation

C#
private DataQuality PerformQualityCheck(DataPoint dataPoint) { // 🛡️ Basic numeric checks if (double.IsNaN(dataPoint.Value) || double.IsInfinity(dataPoint.Value)) return DataQuality.Bad; // 📊 Numeric range validation if (Math.Abs(dataPoint.Value) > 1000) return DataQuality.Bad; // ⚡ Change rate check - prevent sudden data changes if (_lastValue != null) { var timeDiff = (dataPoint.Timestamp - _lastValue.Timestamp).TotalSeconds; if (timeDiff > 0) { var changeRate = Math.Abs(dataPoint.Value - _lastValue.Value) / timeDiff; if (changeRate > 100) // Change rate threshold return DataQuality.Uncertain; } } return DataQuality.Good; }

4️⃣ Real-time Data Aggregation: Sliding Window Algorithm

C#
private async Task AggregateDataAsync(CancellationToken cancellationToken) { await foreach (var dataPoint in _processedDataChannel.Reader.ReadAllAsync(cancellationToken)) { _windowBuffer.Add(dataPoint); if (_windowBuffer.Count >= _config.WindowSize) { var aggregated = CalculateAggregatedData(_windowBuffer); await _aggregatedDataChannel.Writer.WriteAsync(aggregated, cancellationToken); // 🎯 Sliding window optimization: retain partial historical data var keepCount = _config.WindowSize / 2; _windowBuffer.RemoveRange(0, _windowBuffer.Count - keepCount); } } }

💡 Sliding Window Advantages:

  • Maintains data continuity, avoiding window boundary effects
  • Memory usage optimization, preventing infinite buffer growth
  • Perfect balance between real-time performance and accuracy

📊 UI Interface: Real-time Visualization Display

ScottPlot Integration Implementation

C#
using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows.Forms; using ScottPlot; using Timer = System.Threading.Timer; namespace AppDataAcquisitionSystem { public partial class Form1 : Form { private DataGenerator _dataGenerator; private DataProcessor _dataProcessor; private AcquisitionConfig _config; private Timer _acquisitionTimer; private CancellationTokenSource _cancellationTokenSource; // Data storage private List<DataPoint> _realTimeData; private List<AggregatedData> _aggregatedData; // Chart data private List<double> _realTimeValues; private List<DateTime> _realTimeTimes; private List<double> _aggregatedValues; private List<DateTime> _aggregatedTimes; private const int MaxDisplayPoints = 100; public Form1() { InitializeComponent(); InitializeSystem(); InitializePlots(); AttachEventHandlers(); } private void InitializeSystem() { _realTimeData = new List<DataPoint>(); _aggregatedData = new List<AggregatedData>(); _realTimeValues = new List<double>(); _realTimeTimes = new List<DateTime>(); _aggregatedValues = new List<double>(); _aggregatedTimes = new List<DateTime>(); _dataGenerator = new DataGenerator(); UpdateConfiguration(); } private void InitializePlots() { plotRealTime.Font = new Font("Arial", 12); // Configure real-time data chart plotRealTime.Plot.Title("Real-time Data"); plotRealTime.Plot.Font.Set("Arial"); plotRealTime.Plot.XLabel("Time"); plotRealTime.Plot.YLabel("Value"); // Configure aggregated data chart plotAggregated.Font = new Font("Arial", 12); plotAggregated.Plot.Title("Aggregated Data"); plotAggregated.Plot.Font.Set("Arial"); plotAggregated.Plot.XLabel("Time"); plotAggregated.Plot.YLabel("Aggregated Value"); } private void AttachEventHandlers() { btnStart.Click += BtnStart_Click; btnStop.Click += BtnStop_Click; btnClear.Click += BtnClear_Click; // Configuration change events nudSampleRate.ValueChanged += (s, e) => UpdateConfiguration(); nudNoiseThreshold.ValueChanged += (s, e) => UpdateConfiguration(); nudWindowSize.ValueChanged += (s, e) => UpdateConfiguration(); chkQualityCheck.CheckedChanged += (s, e) => UpdateConfiguration(); chkDenoising.CheckedChanged += (s, e) => UpdateConfiguration(); cmbAggregationType.SelectedIndexChanged += (s, e) => UpdateConfiguration(); } private void UpdateConfiguration() { _config = new AcquisitionConfig { SampleRateMs = (int)nudSampleRate.Value, NoiseThreshold = (double)nudNoiseThreshold.Value, WindowSize = (int)nudWindowSize.Value, EnableQualityCheck = chkQualityCheck.Checked, EnableDenoising = chkDenoising.Checked, AggregationType = (AggregationType)cmbAggregationType.SelectedIndex }; } private async void BtnStart_Click(object sender, EventArgs e) { try { btnStart.Enabled = false; btnStop.Enabled = true; UpdateConfiguration(); _dataProcessor = new DataProcessor(_config); _cancellationTokenSource = new CancellationTokenSource(); // Start data processing var processingTask = _dataProcessor.StartProcessingAsync(_cancellationTokenSource.Token); // Start data reading task var readingTask = StartDataReadingAsync(_cancellationTokenSource.Token); // Start data acquisition timer _acquisitionTimer = new Timer(OnDataAcquisitionTimer, null, 0, _config.SampleRateMs); UpdateStatus("System started"); await Task.WhenAny(processingTask, readingTask); } catch (Exception ex) { UpdateStatus($"Startup failed: {ex.Message}"); btnStart.Enabled = true; btnStop.Enabled = false; } } private void BtnStop_Click(object sender, EventArgs e) { StopAcquisition(); } private void BtnClear_Click(object sender, EventArgs e) { _realTimeData.Clear(); _aggregatedData.Clear(); _realTimeValues.Clear(); _realTimeTimes.Clear(); _aggregatedValues.Clear(); _aggregatedTimes.Clear(); plotRealTime.Plot.Clear(); plotAggregated.Plot.Clear(); plotRealTime.Refresh(); plotAggregated.Refresh(); UpdateStatus("Data cleared"); } private void StopAcquisition() { try { _acquisitionTimer?.Dispose(); _cancellationTokenSource?.Cancel(); _dataProcessor?.StopProcessing(); btnStart.Enabled = true; btnStop.Enabled = false; UpdateStatus("System stopped"); } catch (Exception ex) { UpdateStatus($"Stop failed: {ex.Message}"); } } private async void OnDataAcquisitionTimer(object state) { try { var dataPoint = await _dataGenerator.GenerateDataPointAsync(); await _dataProcessor.AddRawDataAsync(dataPoint); } catch (Exception ex) { BeginInvoke(new Action(() => UpdateStatus($"Data acquisition error: {ex.Message}"))); } } private async Task StartDataReadingAsync(CancellationToken cancellationToken) { // Start processed data reading var processedTask = ReadProcessedDataAsync(cancellationToken); var aggregatedTask = ReadAggregatedDataAsync(cancellationToken); await Task.WhenAll(processedTask, aggregatedTask); } private async Task ReadProcessedDataAsync(CancellationToken cancellationToken) { try { await foreach (var dataPoint in _dataProcessor.ProcessedDataReader.ReadAllAsync(cancellationToken)) { BeginInvoke(new Action(() => { _realTimeData.Add(dataPoint); _realTimeValues.Add(dataPoint.Value); _realTimeTimes.Add(dataPoint.Timestamp); // Limit display points if (_realTimeValues.Count > MaxDisplayPoints) { _realTimeValues.RemoveAt(0); _realTimeTimes.RemoveAt(0); } UpdateRealTimePlot(); UpdateStatus($"Real-time data: {dataPoint.Value:F2} (Quality: {dataPoint.Quality})"); })); } } catch (OperationCanceledException) { // Normal cancellation } } private async Task ReadAggregatedDataAsync(CancellationToken cancellationToken) { try { await foreach (var aggregatedData in _dataProcessor.AggregatedDataReader.ReadAllAsync(cancellationToken)) { BeginInvoke(new Action(() => { _aggregatedData.Add(aggregatedData); _aggregatedValues.Add(aggregatedData.Value); _aggregatedTimes.Add(aggregatedData.WindowEnd); // Limit display points if (_aggregatedValues.Count > MaxDisplayPoints) { _aggregatedValues.RemoveAt(0); _aggregatedTimes.RemoveAt(0); } UpdateAggregatedPlot(); UpdateStatus($"Aggregated data ({aggregatedData.Type}): {aggregatedData.Value:F2} (Sample count: {aggregatedData.SampleCount})"); })); } } catch (OperationCanceledException) { // Normal cancellation } } private void UpdateRealTimePlot() { if (_realTimeValues.Count == 0) return; plotRealTime.Plot.Clear(); var times = _realTimeTimes.Select(t => t.ToOADate()).ToArray(); var values = _realTimeValues.ToArray(); var scatter = plotRealTime.Plot.Add.Scatter(times, values); scatter.Color = Colors.Blue; scatter.LineWidth = 2; plotRealTime.Plot.Axes.DateTimeTicksBottom(); plotRealTime.Plot.Axes.AutoScale(); plotRealTime.Refresh(); } private void UpdateAggregatedPlot() { if (_aggregatedValues.Count == 0) return; plotAggregated.Plot.Clear(); var times = _aggregatedTimes.Select(t => t.ToOADate()).ToArray(); var values = _aggregatedValues.ToArray(); var scatter = plotAggregated.Plot.Add.Scatter(times, values); scatter.Color = Colors.Red; scatter.LineWidth = 3; plotAggregated.Plot.Axes.DateTimeTicksBottom(); plotAggregated.Plot.Axes.AutoScale(); plotAggregated.Refresh(); } private void UpdateStatus(string message) { if (InvokeRequired) { BeginInvoke(new Action(() => UpdateStatus(message))); return; } txtStatus.AppendText($"[{DateTime.Now:HH:mm:ss}] {message}\r\n"); txtStatus.ScrollToCaret(); } protected override void OnFormClosing(FormClosingEventArgs e) { StopAcquisition(); base.OnFormClosing(e); } } }

image.png

⚡ Core Performance Optimization Techniques

1️⃣ Memory Management Optimization

C#
private const int MaxDisplayPoints = 100; // Limit display points to prevent memory leaks if (_realTimeValues.Count > MaxDisplayPoints) { _realTimeValues.RemoveAt(0); _realTimeTimes.RemoveAt(0); }

2️⃣ Asynchronous Operation Best Practices

C#
// ✅ Correct cancellation token usage _cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); // ✅ Graceful resource cleanup protected override void OnFormClosing(FormClosingEventArgs e) { StopAcquisition(); // Ensure all async tasks stop properly base.OnFormClosing(e); }

🛠️ Real-World Application Scenarios

This data acquisition system can be widely applied to:

  1. Industrial IoT: Real-time sensor data monitoring
  2. Financial Systems: Stock prices and trading data collection
  3. Environmental Monitoring: Temperature, humidity, and air quality data
  4. Performance Monitoring: Server and application performance metrics collection

🔍 Common Issues and Solutions

Q1: Backpressure Issues in Channel Usage?

A: Use bounded Channels with reasonable capacity limits:

C#
var options = new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.Wait, SingleReader = true, SingleWriter = false }; _channel = Channel.CreateBounded<DataPoint>(options);

Q2: Performance Bottlenecks with Large Data Volumes?

A: Adopt batch processing strategy:

C#
private readonly List<DataPoint> _batchBuffer = new List<DataPoint>(); private const int BatchSize = 50; // Batch processing for improved efficiency if (_batchBuffer.Count >= BatchSize) { await ProcessBatchAsync(_batchBuffer); _batchBuffer.Clear(); }

🎉 Summary and Future Outlook

Through this practical case study, we've mastered three core technologies for building efficient data acquisition systems:

  1. Channel Asynchronous Communication: Achieving high concurrency, low-latency data transmission
  2. Multi-layer Processing Pipeline: Complete workflow including quality control, noise reduction, and data aggregation
  3. Real-time Visualization Display: Perfect combination of ScottPlot and WinForms

This architecture not only solves the performance bottlenecks of traditional data acquisition systems but also provides powerful extensibility. You can easily add new data sources, processing algorithms, or visualization components.


💬 Interactive Discussion:

  1. What technical challenges have you encountered with data acquisition in your projects?
  2. Do you have better strategies for handling anomalous data?

If this article helps your C# development work, please share it with more colleagues to help us improve .NET technology skills together! Follow me for more C# practical content!

🔖 Collection-grade code templates are ready, recommend bookmarking for future use!

相关信息

通过网盘分享的文件:AppDataAcquisitionSystem.zip 链接: https://pan.baidu.com/s/1YMZl9u6rMGwIalC0_ohB9g?pwd=jrkg 提取码: jrkg --来自百度网盘超级会员v9的分享:::

本文作者:技术老小子

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!