在工业物联网时代,传感器数据采集系统面临着巨大挑战:如何在高频率数据写入的同时保证系统稳定性?
想象一下,你的工厂有数十个传感器,每秒产生上千条数据,传统的实时写入数据库方式会让你的系统瞬间崩溃。今天,我将通过一个完整的C#项目,展示如何优雅地解决这个问题,构建一个能够处理高频数据的工业级监控系统。
本文将深入剖析批量写入策略、异步处理机制和SQLite性能优化,让你的数据采集系统既稳定又高效!
在实际的工业应用中,传感器数据采集系统经常遇到以下问题:
我们的解决方案采用了生产者-消费者模式,通过以下关键技术实现高效的数据处理:
Markdown传感器数据 → 内存队列 → 批量处理线程 → SQLite数据库
↓ ↓ ↓
实时显示 ← UI更新线程 ← 数据统计查询
C#public class SensorData
{
public string SensorId { get; set; } // 传感器ID
public double Value { get; set; } // 测量值
public DateTime Timestamp { get; set; } // 时间戳
public string Unit { get; set; } // 单位
}
public class SensorConfig
{
public string Name { get; set; } // 传感器名称
public string Unit { get; set; } // 单位
public double MinValue { get; set; } // 最小值
public double MaxValue { get; set; } // 最大值
}
C#public class HighFrequencyDataManager : IDisposable
{
private readonly ConcurrentQueue<SensorData> _dataQueue;
private readonly Task _batchProcessingTask;
private readonly CancellationTokenSource _cancellationTokenSource;
private int _batchSize;
private readonly int _flushInterval;
public HighFrequencyDataManager(string databasePath, int batchSize = 1000, int flushIntervalMs = 1000)
{
_batchSize = batchSize;
_flushInterval = flushIntervalMs;
_connectionString = $"Data Source={databasePath};";
_dataQueue = new ConcurrentQueue<SensorData>();
_cancellationTokenSource = new CancellationTokenSource();
InitializeDatabase();
// 启动后台批量处理任务
_batchProcessingTask = Task.Run(BatchProcessingLoop, _cancellationTokenSource.Token);
}
}
这是整个系统的核心,实现了高效的数据批量处理:
C#private async Task BatchProcessingLoop()
{
var batchData = new List<SensorData>();
var lastFlushTime = DateTime.Now;
while (!_cancellationTokenSource.Token.IsCancellationRequested)
{
try
{
// 从队列中取出数据,直到达到批量大小
while (_dataQueue.TryDequeue(out var data) && batchData.Count < _batchSize)
{
batchData.Add(data);
}
// 智能刷新策略:达到批量大小或超过时间间隔
var shouldFlush = batchData.Count >= _batchSize ||
(batchData.Count > 0 && (DateTime.Now - lastFlushTime).TotalMilliseconds >= _flushInterval);
if (shouldFlush)
{
await SaveBatchToDatabase(batchData);
batchData.Clear();
lastFlushTime = DateTime.Now;
}
await Task.Delay(10, _cancellationTokenSource.Token);
}
catch (Exception ex)
{
Console.WriteLine($"批量处理错误: {ex.Message}");
await Task.Delay(1000, _cancellationTokenSource.Token);
}
}
}
关键技巧解析:
C#private async Task SaveBatchToDatabase(List<SensorData> batchData)
{
if (batchData.Count == 0) return;
using (var connection = new SqliteConnection(_connectionString))
{
await connection.OpenAsync();
using (var transaction = connection.BeginTransaction())
{
try
{
// SQLite参数限制处理
const int maxParametersPerBatch = 999;
const int parametersPerRow = 4;
var maxRowsPerBatch = maxParametersPerBatch / parametersPerRow;
// 分批插入,避免参数过多
for (int i = 0; i < batchData.Count; i += maxRowsPerBatch)
{
var batchSize = Math.Min(maxRowsPerBatch, batchData.Count - i);
var batch = batchData.GetRange(i, batchSize);
// 动态构建批量插入语句
var valueClause = string.Join(", ",
Enumerable.Range(0, batch.Count).Select(j =>
$"(@SensorId{j}, @Value{j}, @Timestamp{j}, @Unit{j})"));
var insertCommand = $@"
INSERT INTO SensorData (SensorId, Value, Timestamp, Unit)
VALUES {valueClause}";
using (var command = new SqliteCommand(insertCommand, connection, transaction))
{
// 批量绑定参数
for (int j = 0; j < batch.Count; j++)
{
var data = batch[j];
command.Parameters.AddWithValue($"@SensorId{j}", data.SensorId);
command.Parameters.AddWithValue($"@Value{j}", data.Value);
command.Parameters.AddWithValue($"@Timestamp{j}", data.Timestamp);
command.Parameters.AddWithValue($"@Unit{j}", data.Unit ?? string.Empty);
}
await command.ExecuteNonQueryAsync();
}
}
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}
}
性能优化要点:
C#private void SensorSimulationTimer_Tick(object sender, EventArgs e)
{
var dataList = new List<SensorData>();
// 模拟多传感器数据生成
foreach (var sensor in _sensors)
{
var data = new SensorData
{
SensorId = sensor.Key,
Value = Math.Round(_random.NextDouble() *
(sensor.Value.MaxValue - sensor.Value.MinValue) +
sensor.Value.MinValue, 2),
Timestamp = DateTime.Now,
Unit = sensor.Value.Unit
};
dataList.Add(data);
}
// 异步写入数据管理器
_dataManager.AddDataBatch(dataList);
_totalDataCount += dataList.Count;
// 更新UI显示
UpdateRealtimeData(dataList);
}
C#private void UpdateRealtimeData(List<SensorData> dataList)
{
// 跨线程操作UI的标准做法
if (InvokeRequired)
{
Invoke(new Action<List<SensorData>>(UpdateRealtimeData), dataList);
return;
}
// 格式化显示数据
var displayData = dataList.Select(d => new
{
SensorId = d.SensorId,
SensorName = _sensors.ContainsKey(d.SensorId) ? _sensors[d.SensorId].Name : d.SensorId,
Value = d.Value.ToString("F2"),
Unit = d.Unit,
Timestamp = d.Timestamp.ToString("yyyy-MM-dd HH:mm:ss.fff")
}).ToList();
dataGridView.DataSource = displayData;
}
C#private void UiUpdateTimer_Tick(object sender, EventArgs e)
{
// 显示总数据量
countLabel.Text = _totalDataCount.ToString("N0");
// 计算写入速率
var elapsed = DateTime.Now - _startTime;
var rate = elapsed.TotalSeconds > 0 ? _totalDataCount / elapsed.TotalSeconds : 0;
rateLabel.Text = $"{rate:F1} 条/秒";
// 显示运行时间
timeLabel.Text = elapsed.ToString(@"hh\:mm\:ss");
}
C#public SensorStatistics GetStatistics(string sensorId, DateTime startTime, DateTime endTime)
{
using (var connection = new SqliteConnection(_connectionString))
{
connection.Open();
var query = @"
SELECT
COUNT(*) as Count,
AVG(Value) as AvgValue,
MAX(Value) as MaxValue,
MIN(Value) as MinValue
FROM SensorData
WHERE SensorId = @SensorId
AND Timestamp >= @StartTime
AND Timestamp <= @EndTime";
using (var command = new SqliteCommand(query, connection))
{
command.Parameters.AddWithValue("@SensorId", sensorId);
command.Parameters.AddWithValue("@StartTime", startTime);
command.Parameters.AddWithValue("@EndTime", endTime);
using (var reader = command.ExecuteReader())
{
if (reader.Read())
{
return new SensorStatistics
{
Count = (int)reader.GetInt64("Count"),
AvgValue = reader.IsDBNull("AvgValue") ? 0 : reader.GetDouble("AvgValue"),
MaxValue = reader.IsDBNull("MaxValue") ? 0 : reader.GetDouble("MaxValue"),
MinValue = reader.IsDBNull("MinValue") ? 0 : reader.GetDouble("MinValue")
};
}
}
}
}
return new SensorStatistics();
}
C#using Timer = System.Windows.Forms.Timer;
namespace AppSqliteHighFrequencyData
{
public partial class Form1 : Form
{
private HighFrequencyDataManager _dataManager;
private Timer _sensorSimulationTimer;
private Timer _uiUpdateTimer;
private Random _random = new Random();
private long _totalDataCount = 0;
private DateTime _startTime = DateTime.Now;
// 传感器配置
private readonly Dictionary<string, SensorConfig> _sensors = new Dictionary<string, SensorConfig>
{
["TEMP01"] = new SensorConfig { Name = "温度传感器1", Unit = "°C", MinValue = 20, MaxValue = 40 },
["TEMP02"] = new SensorConfig { Name = "温度传感器2", Unit = "°C", MinValue = 18, MaxValue = 42 },
["HUMID01"] = new SensorConfig { Name = "湿度传感器1", Unit = "%", MinValue = 30, MaxValue = 80 },
["PRESS01"] = new SensorConfig { Name = "压力传感器1", Unit = "Pa", MinValue = 1000, MaxValue = 1200 }
};
public Form1()
{
InitializeComponent();
InitializeDataManager();
InitializeUI();
}
private void InitializeDataManager()
{
var databasePath = "sensor_data.db";
_dataManager = new HighFrequencyDataManager(databasePath, 1000, 1000);
}
private void InitializeUI()
{
sensorCombo.Items.AddRange(_sensors.Keys.ToArray());
sensorCombo.SelectedIndex = 0;
startTimePicker.Value = DateTime.Now.AddHours(-1);
endTimePicker.Value = DateTime.Now;
_uiUpdateTimer = new Timer { Interval = 1000, Enabled = true };
_uiUpdateTimer.Tick += UiUpdateTimer_Tick;
UpdateStatus("已停止", Color.Red);
}
private void StartButton_Click(object sender, EventArgs e)
{
startButton.Enabled = false;
stopButton.Enabled = true;
var frequency = (int)frequencyNumeric.Value;
var interval = 1000 / frequency;
_startTime = DateTime.Now;
_totalDataCount = 0;
var batchSize = (int)batchSizeNumeric.Value;
_dataManager.UpdateBatchSize(batchSize);
_sensorSimulationTimer = new Timer { Interval = interval, Enabled = true };
_sensorSimulationTimer.Tick += SensorSimulationTimer_Tick;
UpdateStatus("运行中", Color.Green);
}
private void StopButton_Click(object sender, EventArgs e)
{
startButton.Enabled = true;
stopButton.Enabled = false;
_sensorSimulationTimer?.Stop();
_sensorSimulationTimer?.Dispose();
_sensorSimulationTimer = null;
UpdateStatus("已停止", Color.Red);
}
private void SensorSimulationTimer_Tick(object sender, EventArgs e)
{
var dataList = new List<SensorData>();
foreach (var sensor in _sensors)
{
var data = new SensorData
{
SensorId = sensor.Key,
Value = Math.Round(_random.NextDouble() * (sensor.Value.MaxValue - sensor.Value.MinValue) + sensor.Value.MinValue, 2),
Timestamp = DateTime.Now,
Unit = sensor.Value.Unit
};
dataList.Add(data);
}
_dataManager.AddDataBatch(dataList);
_totalDataCount += dataList.Count;
UpdateRealtimeData(dataList);
}
private void UpdateRealtimeData(List<SensorData> dataList)
{
if (InvokeRequired)
{
Invoke(new Action<List<SensorData>>(UpdateRealtimeData), dataList);
return;
}
var displayData = dataList.Select(d => new
{
SensorId = d.SensorId,
SensorName = _sensors.ContainsKey(d.SensorId) ? _sensors[d.SensorId].Name : d.SensorId,
Value = d.Value.ToString("F2"),
Unit = d.Unit,
Timestamp = d.Timestamp.ToString("yyyy-MM-dd HH:mm:ss.fff")
}).ToList();
dataGridView.DataSource = displayData;
}
private void UiUpdateTimer_Tick(object sender, EventArgs e)
{
countLabel.Text = _totalDataCount.ToString("N0");
var elapsed = DateTime.Now - _startTime;
var rate = elapsed.TotalSeconds > 0 ? _totalDataCount / elapsed.TotalSeconds : 0;
rateLabel.Text = $"{rate:F1} 条/秒";
timeLabel.Text = elapsed.ToString(@"hh\:mm\:ss");
}
private async void QueryButton_Click(object sender, EventArgs e)
{
queryButton.Enabled = false;
statsLabel.Text = "查询中...";
var sensorId = sensorCombo.SelectedItem.ToString();
var startTime = startTimePicker.Value;
var endTime = endTimePicker.Value;
try
{
var stats = await Task.Run(() => _dataManager.GetStatistics(sensorId, startTime, endTime));
statsLabel.Text = $"数据条数: {stats.Count:N0}, 平均值: {stats.AvgValue:F2}, 最大值: {stats.MaxValue:F2}, 最小值: {stats.MinValue:F2}";
}
catch (Exception ex)
{
statsLabel.Text = $"查询出错: {ex.Message}";
}
finally
{
queryButton.Enabled = true;
}
}
private void UpdateStatus(string status, Color color)
{
statusLabel.Text = status;
statusLabel.ForeColor = color;
}
protected override void OnFormClosing(FormClosingEventArgs e)
{
_sensorSimulationTimer?.Stop();
_sensorSimulationTimer?.Dispose();
_uiUpdateTimer?.Stop();
_uiUpdateTimer?.Dispose();
_dataManager?.Dispose();
base.OnFormClosing(e);
}
}
}
C#using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Data.Sqlite;
namespace AppSqliteHighFrequencyData
{
public class HighFrequencyDataManager : IDisposable
{
private readonly string _connectionString;
private readonly ConcurrentQueue<SensorData> _dataQueue;
private readonly Task _batchProcessingTask;
private readonly System.Threading.CancellationTokenSource _cancellationTokenSource;
private int _batchSize;
private readonly int _flushInterval;
public HighFrequencyDataManager(string databasePath, int batchSize = 1000, int flushIntervalMs = 1000)
{
_batchSize = batchSize;
_flushInterval = flushIntervalMs;
_connectionString = $"Data Source={databasePath};";
_dataQueue = new ConcurrentQueue<SensorData>();
_cancellationTokenSource = new System.Threading.CancellationTokenSource();
InitializeDatabase();
_batchProcessingTask = Task.Run(BatchProcessingLoop, _cancellationTokenSource.Token);
}
public void UpdateBatchSize(int newBatchSize)
{
_batchSize = newBatchSize;
}
private void InitializeDatabase()
{
using (var connection = new SqliteConnection(_connectionString))
{
connection.Open();
var createTableCommand = @"
CREATE TABLE IF NOT EXISTS SensorData (
Id INTEGER PRIMARY KEY AUTOINCREMENT,
SensorId TEXT NOT NULL,
Value REAL NOT NULL,
Timestamp DATETIME NOT NULL,
Unit TEXT
);
CREATE INDEX IF NOT EXISTS IX_SensorData_SensorId_Timestamp
ON SensorData(SensorId, Timestamp);
";
using (var command = new SqliteCommand(createTableCommand, connection))
{
command.ExecuteNonQuery();
}
}
}
public void AddDataBatch(List<SensorData> dataList)
{
foreach (var data in dataList)
{
_dataQueue.Enqueue(data);
}
}
private async Task BatchProcessingLoop()
{
var batchData = new List<SensorData>();
var lastFlushTime = DateTime.Now;
while (!_cancellationTokenSource.Token.IsCancellationRequested)
{
try
{
while (_dataQueue.TryDequeue(out var data) && batchData.Count < _batchSize)
{
batchData.Add(data);
}
var shouldFlush = batchData.Count >= _batchSize ||
(batchData.Count > 0 && (DateTime.Now - lastFlushTime).TotalMilliseconds >= _flushInterval);
if (shouldFlush)
{
await SaveBatchToDatabase(batchData);
batchData.Clear();
lastFlushTime = DateTime.Now;
}
await Task.Delay(10, _cancellationTokenSource.Token);
}
catch (Exception ex)
{
Console.WriteLine($"批量处理错误: {ex.Message}");
await Task.Delay(1000, _cancellationTokenSource.Token);
}
}
if (batchData.Count > 0)
{
await SaveBatchToDatabase(batchData);
}
}
private async Task SaveBatchToDatabase(List<SensorData> batchData)
{
if (batchData.Count == 0) return;
using (var connection = new SqliteConnection(_connectionString))
{
await connection.OpenAsync();
using (var transaction = connection.BeginTransaction())
{
try
{
const int maxParametersPerBatch = 999; // SQLite parameter limit
const int parametersPerRow = 4;
var maxRowsPerBatch = maxParametersPerBatch / parametersPerRow;
for (int i = 0; i < batchData.Count; i += maxRowsPerBatch)
{
var batchSize = Math.Min(maxRowsPerBatch, batchData.Count - i);
var batch = batchData.GetRange(i, batchSize);
var valueClause = string.Join(", ",
Enumerable.Range(0, batch.Count).Select(j =>
$"(@SensorId{j}, @Value{j}, @Timestamp{j}, @Unit{j})"));
var insertCommand = $@"
INSERT INTO SensorData (SensorId, Value, Timestamp, Unit)
VALUES {valueClause}";
using (var command = new SqliteCommand(insertCommand, connection, transaction))
{
for (int j = 0; j < batch.Count; j++)
{
var data = batch[j];
command.Parameters.AddWithValue($"@SensorId{j}", data.SensorId);
command.Parameters.AddWithValue($"@Value{j}", data.Value);
command.Parameters.AddWithValue($"@Timestamp{j}", data.Timestamp);
command.Parameters.AddWithValue($"@Unit{j}", data.Unit ?? string.Empty);
}
await command.ExecuteNonQueryAsync();
}
}
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}
}
public SensorStatistics GetStatistics(string sensorId, DateTime startTime, DateTime endTime)
{
using (var connection = new SqliteConnection(_connectionString))
{
connection.Open();
var query = @"
SELECT
COUNT(*) as Count,
AVG(Value) as AvgValue,
MAX(Value) as MaxValue,
MIN(Value) as MinValue
FROM SensorData
WHERE SensorId = @SensorId
AND Timestamp >= @StartTime
AND Timestamp <= @EndTime";
using (var command = new SqliteCommand(query, connection))
{
command.Parameters.AddWithValue("@SensorId", sensorId);
command.Parameters.AddWithValue("@StartTime", startTime);
command.Parameters.AddWithValue("@EndTime", endTime);
using (var reader = command.ExecuteReader())
{
if (reader.Read())
{
return new SensorStatistics
{
Count = (int)reader.GetInt64("Count"),
AvgValue = reader.IsDBNull("AvgValue") ? 0 : reader.GetDouble("AvgValue"),
MaxValue = reader.IsDBNull("MaxValue") ? 0 : reader.GetDouble("MaxValue"),
MinValue = reader.IsDBNull("MinValue") ? 0 : reader.GetDouble("MinValue")
};
}
}
}
}
return new SensorStatistics();
}
public void Dispose()
{
_cancellationTokenSource?.Cancel();
_batchProcessingTask?.Wait(5000);
_cancellationTokenSource?.Dispose();
}
}
}
C#using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace AppSqliteHighFrequencyData
{
public class SensorData
{
public string SensorId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
public string Unit { get; set; }
}
public class SensorConfig
{
public string Name { get; set; }
public string Unit { get; set; }
public double MinValue { get; set; }
public double MaxValue { get; set; }
}
public class SensorStatistics
{
public int Count { get; set; }
public double AvgValue { get; set; }
public double MaxValue { get; set; }
public double MinValue { get; set; }
}
}
忘记Dispose资源
C#// ❌ 错误做法:资源泄漏
public void BadExample()
{
var timer = new Timer();
// 忘记释放timer
}
// ✅ 正确做法:实现IDisposable
protected override void OnFormClosing(FormClosingEventArgs e)
{
_sensorSimulationTimer?.Stop();
_sensorSimulationTimer?.Dispose();
_uiUpdateTimer?.Stop();
_uiUpdateTimer?.Dispose();
_dataManager?.Dispose();
base.OnFormClosing(e);
}
直接在UI线程进行数据库操作
C#// ❌ 错误做法:阻塞UI线程
private void BadQueryButton_Click(object sender, EventArgs e)
{
var stats = _dataManager.GetStatistics(sensorId, startTime, endTime);
statsLabel.Text = stats.ToString();
}
// ✅ 正确做法:异步操作
private async void QueryButton_Click(object sender, EventArgs e)
{
queryButton.Enabled = false;
try
{
var stats = await Task.Run(() => _dataManager.GetStatistics(sensorId, startTime, endTime));
statsLabel.Text = $"数据条数: {stats.Count:N0}...";
}
finally
{
queryButton.Enabled = true;
}
}
通过本文的详细解析,我们实现了一个高性能、稳定可靠的传感器数据采集系统。核心要点包括:
这套方案不仅适用于传感器监控,还可以扩展到日志采集、实时分析等多个场景。在实际项目中,你还可以考虑添加数据压缩、分库分表、集群部署等高级特性。
你在项目中遇到过哪些高频数据处理的挑战? 欢迎在评论区分享你的经验,让我们一起探讨更多优化方案!
如果这篇文章对你有帮助,请转发给更多需要的同行。关注我,获取更多C#开发实战技巧!
相关信息
通过网盘分享的文件:AppSqliteHighFrequencyData.zip 链接: https://pan.baidu.com/s/1IT91akmqHL66CkDkoUJkhw?pwd=nnjq 提取码: nnjq --来自百度网盘超级会员v9的分享
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!