【导读】随着多核处理器成为主流,并行编程已成为提升应用性能的关键技术。本文深入介绍C#中的PLINQ(并行LINQ)技术,通过丰富的代码示例和详细解析,帮助开发者掌握这一强大的并行数据处理工具。无论你是初学者还是有经验的开发者,都能从中获益。
PLINQ(Parallel LINQ)是.NET Framework提供的并行数据处理库,它是LINQ(Language Integrated Query,语言集成查询)的并行扩展版本。PLINQ能够自动将数据处理操作分配到多个CPU核心上执行,充分利用现代多核处理器的计算能力,大幅提升数据处理性能。
将普通LINQ查询转换为并行查询非常简单,只需添加.AsParallel()方法调用:
C#namespace AppPLinq
{
internal class Program
{
static void Main(string[] args)
{
int[] numbers = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
// 普通LINQ查询
var normalQuery = numbers.Where(n => n % 2 == 0).Select(n => n * n);
// PLINQ并行查询 - 仅添加AsParallel()方法
var parallelQuery = numbers.AsParallel()
.Where(n => n % 2 == 0)
.Select(n => n * n);
foreach (var item in parallelQuery)
{
Console.WriteLine(item);
}
Console.ReadKey();
}
}
}

下面是一个处理百万级数据的完整示例,展示PLINQ的强大性能:
C#using System.Diagnostics;
namespace AppPLinq
{
internal class Program
{
static void Main(string[] args)
{
int[] numbers = Enumerable.Range(1, 10_000_000).ToArray();
// 测量普通LINQ查询的执行时间
Stopwatch normalTimer = Stopwatch.StartNew();
var normalResult = numbers
.Where(n => IsPrime(n)) // 筛选质数
.Select(n => n * n) // 计算平方
.ToList(); // 执行查询并收集结果
normalTimer.Stop();
Console.WriteLine($"普通LINQ查询耗时: {normalTimer.ElapsedMilliseconds}毫秒");
// 测量PLINQ查询的执行时间
Stopwatch parallelTimer = Stopwatch.StartNew();
var parallelResult = numbers
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount) // 根据CPU核心数调整
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.Where(n => IsPrime(n))
.Select(n => n * n)
.ToList();
parallelTimer.Stop();
Console.WriteLine($"PLINQ查询耗时: {parallelTimer.ElapsedMilliseconds}毫秒");
// 验证结果数量
Console.WriteLine($"普通LINQ结果数量: {normalResult.Count}");
Console.WriteLine($"PLINQ结果数量: {parallelResult.Count}");
Console.ReadKey();
}
// 判断一个数是否为质数的辅助方法
static bool IsPrime(int number)
{
if (number <= 1) return false;
if (number <= 3) return true;
if (number % 2 == 0 || number % 3 == 0) return false;
// 使用6k±1优化的质数检测算法
int i = 5;
while (i * i <= number)
{
if (number % i == 0 || number % (i + 2) == 0)
return false;
i += 6;
}
return true;
}
}
}

运行结果分析:在多核处理器上,PLINQ版本通常比普通LINQ快3-8倍,具体取决于CPU核心数和任务复杂度。
在某些情况下,你可能需要限制PLINQ使用的线程数量。通过WithDegreeOfParallelism()方法可以精确控制并行度:
C#using System.Diagnostics;
namespace AppPLinq
{
internal class Program
{
static void Main(string[] args)
{
int[] numbers = Enumerable.Range(1, 10000).ToArray();
// 限制并行度为4(最多同时使用4个线程)
var parallelQuery = numbers.AsParallel()
.WithDegreeOfParallelism(4) // 限制最多使用4个线程
.Select(n =>
{
// 打印当前处理数字的线程ID
Console.WriteLine($"处理数字 {n} 的线程ID: {Thread.CurrentThread.ManagedThreadId}");
return n * n;
});
// 执行查询
parallelQuery.ToList();
Console.WriteLine("查询完成!");
Console.ReadKey();
}
}
}

最佳实践:通常不需要手动设置并行度,.NET运行时会根据系统负载自动选择最优值。只有在特殊情况下(如资源受限的环境)才需要手动设置。
默认情况下,PLINQ不保证处理结果的顺序与原始集合相同。如果需要保持顺序,可以使用AsOrdered()方法:
C#using System.Diagnostics;
namespace AppPLinq
{
internal class Program
{
static void Main(string[] args)
{
int[] numbers = Enumerable.Range(1, 100).ToArray();
Console.WriteLine("不保序的PLINQ结果:");
var unorderedResults = numbers.AsParallel()
.Select(n => n * 10)
.ToArray();
Console.WriteLine(string.Join(", ", unorderedResults));
Console.WriteLine("\n保序的PLINQ结果:");
var orderedResults = numbers.AsParallel()
.AsOrdered() // 保持元素顺序
.Select(n => n * 10)
.ToArray();
Console.WriteLine(string.Join(", ", orderedResults));
Console.ReadKey();
}
}
}
性能考虑:保持顺序会带来一定的性能开销,因为需要额外的协调工作来确保结果按正确顺序返回。
PLINQ中的异常处理与普通LINQ不同。由于并行执行可能导致多个异常同时发生,PLINQ使用AggregateException来收集所有异常:
C#using System;
using System.Linq;
class Program
{
static void Main(string[] args)
{
// 包含可能导致异常的数据
int[] numbers = { 10, 20, 0, 30, 0, 40 }; // 0会导致除法异常
try
{
var results = numbers.AsParallel()
.Select(n =>
{
// 尝试执行可能引发异常的操作
Console.WriteLine($"处理数字: {n}");
return 100 / n; // 可能的除零异常
})
.ToArray(); // 执行查询
Console.WriteLine("计算结果: " + string.Join(", ", results));
}
catch (AggregateException ex)
{
// 处理聚合异常
Console.WriteLine($"捕获到 {ex.InnerExceptions.Count} 个并行处理异常:");
foreach (var innerEx in ex.InnerExceptions)
{
Console.WriteLine($"- {innerEx.GetType().Name}: {innerEx.Message}");
}
}
}
}

注意:PLINQ会在首次遇到异常时尝试停止处理,但由于并行本质,可能已经启动的其他并行任务仍会继续执行并可能引发更多异常。
PLINQ在处理大型数据集的分析和聚合操作时特别有用:
C#using System.Diagnostics;
namespace AppPLinq
{
// 销售数据类
class SaleRecord
{
public string Category { get; set; }
public decimal Amount { get; set; }
public DateTime Date { get; set; }
}
internal class Program
{
static void Main(string[] args)
{
// 模拟大量销售数据
var sales = GenerateSalesData(1_000_000);
// 使用PLINQ按产品类别分组并计算总销售额
var categorySales = sales.AsParallel()
.GroupBy(sale => sale.Category)
.Select(group => new
{
Category = group.Key,
TotalSales = group.Sum(sale => sale.Amount),
AverageAmount = group.Average(sale => sale.Amount),
Count = group.Count()
})
.OrderByDescending(result => result.TotalSales)
.ToList();
// 显示结果
Console.WriteLine("产品类别销售统计:");
Console.WriteLine("----------------------------------------");
Console.WriteLine("类别名称 总销售额 平均金额 订单数");
Console.WriteLine("----------------------------------------");
foreach (var result in categorySales)
{
Console.WriteLine($"{result.Category,-12} {result.TotalSales,12:C} {result.AverageAmount,12:C} {result.Count,10}");
}
Console.ReadKey();
}
// 生成模拟销售数据
static List<SaleRecord> GenerateSalesData(int count)
{
string[] categories = { "电子产品", "服装", "食品", "家居", "图书" };
Random random = new Random(42); // 固定种子以获得可重复的结果
return Enumerable.Range(1, count)
.Select(_ => new SaleRecord
{
Category = categories[random.Next(categories.Length)],
Amount = (decimal)(random.Next(1000, 100000) / 100.0),
Date = DateTime.Now.AddDays(-random.Next(365))
})
.ToList();
}
}
}

通过本文的学习,我们全面了解了C#中PLINQ的工作原理、使用方法和优化技巧。PLINQ作为.NET平台的并行数据处理利器,能够帮助开发者轻松实现高性能的数据处理程序,充分发挥现代多核处理器的性能优势。
关键要点总结:
.AsParallel()即可将普通LINQ查询转为并行处理无论你是处理大数据集、执行复杂计算还是优化应用程序性能,PLINQ都是一个值得掌握的强大工具。希望本文能帮助你在实际项目中合理应用PLINQ,编写高效的并行数据处理程序。
关键词:C#并行编程、PLINQ、AsParallel、高性能数据处理、多核编程、.NET并行编程
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!