编辑
2025-10-10
C#
00

目录

概述
使用Parallel.ForEach进行并行处理
手动分块处理方式
使用生产者-消费者模式
注意事项
总结

概述

在处理大型数据集时,单线程处理往往效率低下。通过将数据分割成多个小块并利用多线程并行处理,我们可以显著提高程序的性能。本文将详细介绍几种实现方式。

使用Parallel.ForEach进行并行处理

最简单的实现方式是使用C#内置的Parallel.ForEach方法。

C#
namespace AppParallel { internal class Program { static object lockObject = new object(); static void Main(string[] args) { // 创建示例数据 var largeList = Enumerable.Range(1, 1000000).ToList(); // 设置并行选项 var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount // 使用处理器核心数量的线程 }; try { Parallel.ForEach(largeList, parallelOptions, (number) => { // 这里是对每个元素的处理逻辑 var result = ComplexCalculation(number); // 注意:如果需要收集结果,要考虑线程安全 lock (lockObject) { // 进行线程安全的结果收集 Console.WriteLine(result); } }); } catch (AggregateException ae) { // 处理并行处理中的异常 foreach (var ex in ae.InnerExceptions) { Console.WriteLine($"Error: {ex.Message}"); } } } private static int ComplexCalculation(int number) { // 模拟复杂计算 Thread.Sleep(100); return number * 2; } } }

image.png

手动分块处理方式

有时我们需要更精细的控制,可以手动将数据分块并分配给不同的线程。

C#
namespace AppParallel { internal class Program { static void Main(string[] args) { var largeList = Enumerable.Range(1, 1000000).ToList(); ProcessByChunks(largeList, 1000); // 每1000个元素一个块 } public static void ProcessByChunks<T>(List<T> largeList, int chunkSize) { // 计算需要多少个分块 int chunksCount = (int)Math.Ceiling((double)largeList.Count / chunkSize); var tasks = new List<Task>(); for (int i = 0; i < chunksCount; i++) { // 获取当前分块的数据 var chunk = largeList .Skip(i * chunkSize) .Take(chunkSize) .ToList(); // 创建新任务处理当前分块 var task = Task.Run(() => ProcessChunk(chunk)); tasks.Add(task); } // 等待所有任务完成 Task.WaitAll(tasks.ToArray()); } private static void ProcessChunk<T>(List<T> chunk) { foreach (var item in chunk) { // 处理每个元素 ProcessItem(item); } } private static void ProcessItem<T>(T item) { // 具体的处理逻辑 Console.WriteLine($"Processing item: {item} on thread: {Task.CurrentId}"); } } }

image.png

使用生产者-消费者模式

对于更复杂的场景,我们可以使用生产者-消费者模式,这样可以更好地控制内存使用和处理流程。

C#
public class ProducerConsumerExample { private readonly BlockingCollection<int> _queue; private readonly int _producerCount; private readonly int _consumerCount; private readonly CancellationTokenSource _cts; public ProducerConsumerExample(int queueCapacity = 1000) { _queue = new BlockingCollection<int>(queueCapacity); _producerCount = 1; _consumerCount = Environment.ProcessorCount; _cts = new CancellationTokenSource(); } public async Task ProcessDataAsync(List<int> largeList) { // 创建生产者任务 var producerTask = Task.Run(() => Producer(largeList)); // 创建消费者任务 var consumerTasks = Enumerable.Range(0, _consumerCount) .Select(_ => Task.Run(() => Consumer())) .ToList(); // 等待所有生产者完成 await producerTask; // 标记队列已完成 _queue.CompleteAdding(); // 等待所有消费者完成 await Task.WhenAll(consumerTasks); } private void Producer(List<int> items) { try { foreach (var item in items) { if (_cts.Token.IsCancellationRequested) break; _queue.Add(item); } } catch (Exception ex) { Console.WriteLine($"Producer error: {ex.Message}"); _cts.Cancel(); } } private void Consumer() { try { foreach (var item in _queue.GetConsumingEnumerable()) { if (_cts.Token.IsCancellationRequested) break; // 处理数据 ProcessItem(item); } } catch (Exception ex) { Console.WriteLine($"Consumer error: {ex.Message}"); _cts.Cancel(); } } private void ProcessItem(int item) { // 具体的处理逻辑 Thread.Sleep(100); // 模拟耗时操作 Console.WriteLine($"Processed item {item} on thread {Task.CurrentId}"); } } // 使用示例 static async Task Main(string[] args) { var processor = new ProducerConsumerExample(); var largeList = Enumerable.Range(1, 10000).ToList(); await processor.ProcessDataAsync(largeList); }

image.png

注意事项

  1. 合适的分块大小
    • 分块不要太小,否则线程切换开销会抵消并行处理的优势
    • 也不要太大,否则会影响负载均衡
    • 建议从1000-5000个元素每块开始测试
  2. 异常处理
    • 务必妥善处理并行处理中的异常
    • 使用try-catch包装每个任务
    • 考虑使用CancellationToken来优雅终止所有任务
  3. 资源管理
    • 注意内存使用,避免同时加载过多数据
    • 合理控制线程数量,通常不超过处理器核心数的2倍
    • 使用using语句管理IDisposable资源
  4. 线程安全
    • 访问共享资源时确保使用适当的同步机制
    • 考虑使用线程安全的集合类
    • 避免过度锁定导致性能下降

总结

并行处理大数据列表是提高程序性能的有效方式,但需要根据具体场景选择合适的实现方式。本文介绍的三种方法各有特点:

  • Parallel.ForEach: 适合简单场景,实现简单
  • 手动分块处理:提供更多控制,适合中等复杂度场景
  • 生产者-消费者模式:适合复杂场景,可以更好地控制资源使用

在实际应用中,建议先进行性能测试,根据数据量大小和处理复杂度选择合适的实现方式。同时要注意异常处理和资源管理,确保程序的稳定性和可靠性。

本文作者:技术老小子

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!