编辑
2026-03-08
C#
00

目录

💡 为什么选择Worker Services + Channels?
🔍 传统方案的痛点分析
⚡ Channels的核心优势
🛠️ 完整实战方案
📋 方案一:基础队列实现
🎯 定义后台任务队列接口
🔧 Worker Service消费者实现
📦 方案二:服务注册配置
🎮 方案三:控制器中的使用实践
🧪 方案四:性能压测验证
🛡️ 高级特性:优雅关闭与背压控制
🔄 优雅关闭机制
⚖️ 背压控制策略
🎯 技术选型决策指南
✅ 适合使用Worker Services + Channels的场景
❌ 建议使用第三方库的场景
💎 核心要点总结

还在为后台任务处理而苦恼吗?支付处理、邮件发送、报表生成、数据同步......这些耗时操作如果在主线程执行,用户体验会极其糟糕。

大多数开发者的第一反应是引入HangfireQuartzAzure Functions等第三方库。但你知道吗?.NET 9已经为我们提供了生产级别的原生解决方案

本文将深入探讨如何使用Worker Services + Channels构建高性能的后台任务系统,让你的应用响应如飞,同时告别对第三方依赖的困扰。

💡 为什么选择Worker Services + Channels?

🔍 传统方案的痛点分析

传统的.NET后台队列方案(如BlockingCollection、自定义队列)存在诸多局限:

  • 手动锁管理:容易出现死锁和竞态条件
  • 线程饥饿风险:资源分配不均衡
  • 缺乏背压控制:系统过载时无法有效限流
  • 异步支持困难:与现代async/await模式不匹配

⚡ Channels的核心优势

System.Threading.Channels是.NET Core中的隐藏宝石:

  • 线程安全:内置并发控制,无需手动加锁
  • 高性能:零分配设计,媲体Kestrel内核
  • 背压支持:自动流量控制,防止内存泄漏
  • 异步优先:完美配合async/await模式

💡 专家提示:Channels被广泛应用于Kestrel、gRPC、SignalR、EF Core等微软核心组件中,这足以证明其生产环境的可靠性。

🛠️ 完整实战方案

📋 方案一:基础队列实现

🎯 定义后台任务队列接口

c#
using System.Threading.Channels; namespace AppBackgroundTask { public interface IBackgroundTaskQueue { ValueTask QueueAsync(Func<CancellationToken, Task> workItem); ValueTask<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken); } public class BackgroundTaskQueue : IBackgroundTaskQueue { private readonly Channel<Func<CancellationToken, Task>> _queue; public BackgroundTaskQueue(int capacity = 100) { var options = new BoundedChannelOptions(capacity) { SingleReader = false, // 支持多消费者 SingleWriter = false, // 支持多生产者 FullMode = BoundedChannelFullMode.Wait // 队列满时等待 }; _queue = Channel.CreateBounded<Func<CancellationToken, Task>>(options); } public async ValueTask QueueAsync(Func<CancellationToken, Task> workItem) => await _queue.Writer.WriteAsync(workItem); public async ValueTask<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken) => await _queue.Reader.ReadAsync(cancellationToken); } // 后台服务,负责处理队列中的任务 public class BackgroundWorkerService { private readonly IBackgroundTaskQueue _taskQueue; private readonly CancellationTokenSource _cancellationTokenSource; private Task _backgroundTask; public BackgroundWorkerService(IBackgroundTaskQueue taskQueue) { _taskQueue = taskQueue; _cancellationTokenSource = new CancellationTokenSource(); } public void Start() { _backgroundTask = ProcessTasksAsync(_cancellationTokenSource.Token); Console.WriteLine("后台服务已启动"); } public async Task StopAsync() { _cancellationTokenSource.Cancel(); if (_backgroundTask != null) { await _backgroundTask; } Console.WriteLine("后台服务已停止"); } private async Task ProcessTasksAsync(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { try { // 从队列中取出任务并执行 var workItem = await _taskQueue.DequeueAsync(cancellationToken); await workItem(cancellationToken); } catch (OperationCanceledException) { // 正常取消,退出循环 break; } catch (Exception ex) { Console.WriteLine($"执行任务时发生错误: {ex.Message}"); } } } } internal class Program { static async Task Main(string[] args) { // 创建任务队列 var backgroundTaskQueue = new BackgroundTaskQueue(); // 创建并启动后台服务 var workerService = new BackgroundWorkerService(backgroundTaskQueue); workerService.Start(); // 添加一些示例任务到队列 await QueueSampleTasks(backgroundTaskQueue); // 等待用户输入后停止服务 Console.WriteLine("按任意键停止服务..."); Console.ReadKey(); await workerService.StopAsync(); } private static async Task QueueSampleTasks(IBackgroundTaskQueue taskQueue) { // 添加任务1:模拟文件处理 await taskQueue.QueueAsync(async (cancellationToken) => { Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] 开始处理文件..."); await Task.Delay(2000, cancellationToken); // 模拟耗时操作 Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] 文件处理完成"); }); // 添加任务2:模拟发送邮件 await taskQueue.QueueAsync(async (cancellationToken) => { Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] 开始发送邮件..."); await Task.Delay(1000, cancellationToken); Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] 邮件发送完成"); }); // 添加任务3:模拟数据备份 await taskQueue.QueueAsync(async (cancellationToken) => { Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] 开始数据备份..."); await Task.Delay(3000, cancellationToken); Console.WriteLine($"[{DateTime.Now:HH:mm:ss}] 数据备份完成"); }); Console.WriteLine("所有任务已添加到队列"); } } }

image.png

⚠️ 常见坑点提醒

  • BoundedChannelFullMode.Wait确保在队列满时等待而非丢弃任务
  • 合理设置capacity值,过小影响性能,过大消耗内存

🔧 Worker Service消费者实现

c#
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Threading.Channels; namespace AppBackgroundTask { public interface IBackgroundTaskQueue { ValueTask QueueAsync(Func<CancellationToken, Task> workItem); ValueTask<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken); } public class BackgroundTaskQueue : IBackgroundTaskQueue { private readonly Channel<Func<CancellationToken, Task>> _queue; public BackgroundTaskQueue(int capacity = 100) { var options = new BoundedChannelOptions(capacity) { SingleReader = false, // 支持多消费者 SingleWriter = false, // 支持多生产者 FullMode = BoundedChannelFullMode.Wait // 队列满时等待 }; _queue = Channel.CreateBounded<Func<CancellationToken, Task>>(options); } public async ValueTask QueueAsync(Func<CancellationToken, Task> workItem) => await _queue.Writer.WriteAsync(workItem); public async ValueTask<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken) => await _queue.Reader.ReadAsync(cancellationToken); } public class BackgroundWorker : BackgroundService { private readonly IBackgroundTaskQueue _taskQueue; private readonly ILogger<BackgroundWorker> _logger; public BackgroundWorker(IBackgroundTaskQueue taskQueue, ILogger<BackgroundWorker> logger) { _taskQueue = taskQueue; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("🚀 后台工作服务已启动"); while (!stoppingToken.IsCancellationRequested) { try { var workItem = await _taskQueue.DequeueAsync(stoppingToken); await workItem(stoppingToken); } catch (OperationCanceledException) { // 正常关闭,不记录错误 break; } catch (Exception ex) { _logger.LogError(ex, "❌ 后台任务执行失败"); } } _logger.LogInformation("🛑 后台工作服务正在停止..."); } } // 业务服务,用于添加任务到队列 public class TaskService { private readonly IBackgroundTaskQueue _taskQueue; private readonly ILogger<TaskService> _logger; public TaskService(IBackgroundTaskQueue taskQueue, ILogger<TaskService> logger) { _taskQueue = taskQueue; _logger = logger; } public async Task ProcessFileAsync(string fileName) { await _taskQueue.QueueAsync(async (cancellationToken) => { _logger.LogInformation("📁 开始处理文件: {FileName}", fileName); await Task.Delay(2000, cancellationToken); // 模拟文件处理 _logger.LogInformation("✅ 文件处理完成: {FileName}", fileName); }); _logger.LogInformation("📋 文件处理任务已加入队列: {FileName}", fileName); } public async Task SendEmailAsync(string recipient, string subject) { await _taskQueue.QueueAsync(async (cancellationToken) => { _logger.LogInformation("📧 开始发送邮件 - 收件人: {Recipient}, 主题: {Subject}", recipient, subject); await Task.Delay(1000, cancellationToken); // 模拟发送邮件 _logger.LogInformation("✅ 邮件发送完成: {Recipient}", recipient); }); _logger.LogInformation("📋 邮件发送任务已加入队列"); } public async Task BackupDataAsync(string dataName) { await _taskQueue.QueueAsync(async (cancellationToken) => { _logger.LogInformation("💾 开始数据备份: {DataName}", dataName); await Task.Delay(3000, cancellationToken); // 模拟数据备份 _logger.LogInformation("✅ 数据备份完成: {DataName}", dataName); }); _logger.LogInformation("📋 数据备份任务已加入队列: {DataName}", dataName); } } internal class Program { static async Task Main(string[] args) { // 创建主机构建器 var host = Host.CreateDefaultBuilder(args) .ConfigureServices(services => { // 注册服务 services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>(); services.AddScoped<TaskService>(); services.AddHostedService<BackgroundWorker>(); }) .ConfigureLogging(logging => { logging.ClearProviders(); logging.AddConsole(); }) .Build(); // 启动主机(后台服务会自动启动) _ = host.RunAsync(); // 获取业务服务并添加一些示例任务 using var scope = host.Services.CreateScope(); var taskService = scope.ServiceProvider.GetRequiredService<TaskService>(); // 添加示例任务 await AddSampleTasks(taskService); // 等待用户输入 Console.WriteLine("\n按 'q' 退出程序..."); while (Console.ReadKey().KeyChar != 'q') { Console.WriteLine("\n按 'q' 退出程序..."); } // 停止主机 await host.StopAsync(); } private static async Task AddSampleTasks(TaskService taskService) { Console.WriteLine("🎯 开始添加示例任务...\n"); // 添加文件处理任务 await taskService.ProcessFileAsync("report.pdf"); // 添加邮件发送任务 await taskService.SendEmailAsync("user@example.com", "月度报告"); // 添加数据备份任务 await taskService.BackupDataAsync("用户数据"); Console.WriteLine("\n📢 所有任务已添加完成!"); } } }

image.png

📦 方案二:服务注册配置

Program.cs中注册服务:

c#
var builder = WebApplication.CreateBuilder(args); // 注册后台任务队列(单例模式) builder.Services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>(); // 注册Worker Service builder.Services.AddHostedService<BackgroundWorker>(); var app = builder.Build();

🎮 方案三:控制器中的使用实践

c#
[ApiController] [Route("api/[controller]")] public class PaymentController : ControllerBase { private readonly IBackgroundTaskQueue _taskQueue; public PaymentController(IBackgroundTaskQueue taskQueue) { _taskQueue = taskQueue; } [HttpPost("process")] public async Task<IActionResult> ProcessPayment([FromBody] PaymentRequest request) { // 立即入队,快速响应用户 await _taskQueue.QueueAsync(async token => { // 模拟耗时的支付处理逻辑 await ProcessPaymentInternal(request, token); Console.WriteLine($"💰 支付处理完成:{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss}"); }); return Accepted(new { Message = "支付请求已接收,正在处理中..." }); } private async Task ProcessPaymentInternal(PaymentRequest request, CancellationToken token) { // 实际的支付处理逻辑 await Task.Delay(2000, token); // 模拟API调用延迟 // 调用第三方支付接口 // 更新数据库状态 // 发送确认邮件等 } }

🏆 实际应用场景说明

  • 电商系统:订单处理、库存更新、发送确认邮件
  • 内容管理:图片压缩、视频转码、搜索索引更新
  • 数据分析:报表生成、统计计算、数据导出

🧪 方案四:性能压测验证

想要测试系统承载能力?试试批量任务:

c#
[HttpPost("stress-test")] public async Task<IActionResult> StressTest() { var tasks = new List<Task>(); for (int i = 0; i < 1000; i++) { var jobId = i; tasks.Add(_taskQueue.QueueAsync(async token => { Console.WriteLine($"🔄 任务 {jobId} 开始执行..."); await Task.Delay(500, token); Console.WriteLine($"✅ 任务 {jobId} 执行完成"); }).AsTask()); } await Task.WhenAll(tasks); return Ok(new { Message = "1000个任务已全部入队" }); }

🛡️ 高级特性:优雅关闭与背压控制

🔄 优雅关闭机制

c#
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Threading.Channels; namespace AppBackgroundTask { public interface IEnhancedBackgroundTaskQueue { ValueTask QueueAsync(Func<IServiceProvider, CancellationToken, Task> workItem); IAsyncEnumerable<Func<IServiceProvider, CancellationToken, Task>> ReadAllAsync(CancellationToken cancellationToken); } public class EnhancedBackgroundTaskQueue : IEnhancedBackgroundTaskQueue { private readonly Channel<Func<IServiceProvider, CancellationToken, Task>> _queue; public EnhancedBackgroundTaskQueue(int capacity = 100) { var options = new BoundedChannelOptions(capacity) { SingleReader = false, SingleWriter = false, FullMode = BoundedChannelFullMode.Wait }; _queue = Channel.CreateBounded<Func<IServiceProvider, CancellationToken, Task>>(options); } public async ValueTask QueueAsync(Func<IServiceProvider, CancellationToken, Task> workItem) => await _queue.Writer.WriteAsync(workItem); public IAsyncEnumerable<Func<IServiceProvider, CancellationToken, Task>> ReadAllAsync(CancellationToken cancellationToken) => _queue.Reader.ReadAllAsync(cancellationToken); } public class EnhancedBackgroundWorker : BackgroundService { private readonly IEnhancedBackgroundTaskQueue _taskQueue; private readonly IServiceProvider _serviceProvider; private readonly ILogger<EnhancedBackgroundWorker> _logger; public EnhancedBackgroundWorker( IEnhancedBackgroundTaskQueue taskQueue, IServiceProvider serviceProvider, ILogger<EnhancedBackgroundWorker> logger) { _taskQueue = taskQueue; _serviceProvider = serviceProvider; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("🚀 增强后台工作服务已启动"); await foreach (var workItem in _taskQueue.ReadAllAsync(stoppingToken)) { try { using var scope = _serviceProvider.CreateScope(); await workItem(scope.ServiceProvider, stoppingToken); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { // 优雅关闭:完成当前任务后退出 _logger.LogInformation("🔄 收到关闭信号,等待当前任务完成..."); break; } catch (Exception ex) { _logger.LogError(ex, "❌ 后台任务执行失败"); } } _logger.LogInformation("🛑 增强后台工作服务已停止"); } } // 示例数据服务 public interface IDataService { Task ProcessDataAsync(string data); } public class DataService : IDataService { private readonly ILogger<DataService> _logger; public DataService(ILogger<DataService> logger) { _logger = logger; } public async Task ProcessDataAsync(string data) { _logger.LogInformation("🔍 DataService 正在处理数据: {Data}", data); await Task.Delay(1000); // 模拟处理时间 _logger.LogInformation("✅ DataService 数据处理完成: {Data}", data); } } // 示例邮件服务 public interface IEmailService { Task SendEmailAsync(string recipient, string message); } public class EmailService : IEmailService { private readonly ILogger<EmailService> _logger; public EmailService(ILogger<EmailService> logger) { _logger = logger; } public async Task SendEmailAsync(string recipient, string message) { _logger.LogInformation("📧 EmailService 正在发送邮件给: {Recipient}", recipient); await Task.Delay(800); // 模拟发送时间 _logger.LogInformation("✅ EmailService 邮件发送完成: {Message}", message); } } // 增强任务服务,展示依赖注入的使用 public class EnhancedTaskService { private readonly IEnhancedBackgroundTaskQueue _taskQueue; private readonly ILogger<EnhancedTaskService> _logger; public EnhancedTaskService(IEnhancedBackgroundTaskQueue taskQueue, ILogger<EnhancedTaskService> logger) { _taskQueue = taskQueue; _logger = logger; } public async Task QueueDataProcessingAsync(string data) { await _taskQueue.QueueAsync(async (serviceProvider, cancellationToken) => { var dataService = serviceProvider.GetRequiredService<IDataService>(); await dataService.ProcessDataAsync(data); }); _logger.LogInformation("📋 数据处理任务已加入队列: {Data}", data); } public async Task QueueEmailSendingAsync(string recipient, string message) { await _taskQueue.QueueAsync(async (serviceProvider, cancellationToken) => { var emailService = serviceProvider.GetRequiredService<IEmailService>(); await emailService.SendEmailAsync(recipient, message); }); _logger.LogInformation("📋 邮件发送任务已加入队列: {Recipient}", recipient); } public async Task QueueComplexTaskAsync(string taskName) { await _taskQueue.QueueAsync(async (serviceProvider, cancellationToken) => { // 在任务中使用多个服务 var dataService = serviceProvider.GetRequiredService<IDataService>(); var emailService = serviceProvider.GetRequiredService<IEmailService>(); var logger = serviceProvider.GetRequiredService<ILogger<EnhancedTaskService>>(); logger.LogInformation("🔄 开始复杂任务: {TaskName}", taskName); // 先处理数据 await dataService.ProcessDataAsync($"{taskName}-data"); // 再发送通知邮件 await emailService.SendEmailAsync("admin@example.com", $"任务 {taskName} 已完成"); logger.LogInformation("✅ 复杂任务完成: {TaskName}", taskName); }); _logger.LogInformation("📋 复杂任务已加入队列: {TaskName}", taskName); } } internal class Program { static async Task Main(string[] args) { var host = Host.CreateDefaultBuilder(args) .ConfigureServices(services => { // 注册队列和后台服务 services.AddSingleton<IEnhancedBackgroundTaskQueue, EnhancedBackgroundTaskQueue>(); services.AddHostedService<EnhancedBackgroundWorker>(); // 注册业务服务 services.AddScoped<IDataService, DataService>(); services.AddScoped<IEmailService, EmailService>(); services.AddScoped<EnhancedTaskService>(); }) .ConfigureLogging(logging => { logging.ClearProviders(); logging.AddConsole(); }) .Build(); // 启动主机 _ = host.RunAsync(); // 添加示例任务 using var scope = host.Services.CreateScope(); var taskService = scope.ServiceProvider.GetRequiredService<EnhancedTaskService>(); await AddSampleTasks(taskService); // 等待用户输入 Console.WriteLine("\n按 'q' 退出程序..."); while (Console.ReadKey().KeyChar != 'q') { Console.WriteLine("\n按 'q' 退出程序..."); } // 停止主机 await host.StopAsync(); } private static async Task AddSampleTasks(EnhancedTaskService taskService) { Console.WriteLine("🎯 开始添加增强任务...\n"); // 添加数据处理任务 await taskService.QueueDataProcessingAsync("用户行为数据"); // 添加邮件发送任务 await taskService.QueueEmailSendingAsync("user@example.com", "欢迎消息"); // 添加复杂任务(使用多个服务) await taskService.QueueComplexTaskAsync("月报生成"); Console.WriteLine("\n📢 所有增强任务已添加完成!"); } } }

image.png

⚖️ 背压控制策略

c#
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System.Threading.Channels; namespace AppBackgroundTask { public interface IBackgroundTaskQueue { ValueTask QueueAsync(Func<CancellationToken, Task> workItem); ValueTask<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken); } public class BackpressureTaskQueue : IBackgroundTaskQueue { private readonly SemaphoreSlim _semaphore; private readonly Channel<Func<CancellationToken, Task>> _queue; private readonly ILogger<BackpressureTaskQueue> _logger; public BackpressureTaskQueue(int maxConcurrency = 10, ILogger<BackpressureTaskQueue> logger = null) { _semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency); _logger = logger; var options = new UnboundedChannelOptions { SingleReader = false, SingleWriter = false, AllowSynchronousContinuations = false }; _queue = Channel.CreateUnbounded<Func<CancellationToken, Task>>(options); } public async ValueTask QueueAsync(Func<CancellationToken, Task> workItem) { _logger?.LogInformation("⏳ 等待获取执行许可 (当前并发限制)"); await _semaphore.WaitAsync(); // 背压控制 _logger?.LogInformation("✅ 获得执行许可,任务加入队列"); await _queue.Writer.WriteAsync(async token => { try { await workItem(token); } finally { _semaphore.Release(); // 释放信号量 _logger?.LogInformation("🔓 释放执行许可"); } }); } public async ValueTask<Func<CancellationToken, Task>> DequeueAsync(CancellationToken cancellationToken) => await _queue.Reader.ReadAsync(cancellationToken); } public class BackpressureWorker : BackgroundService { private readonly IBackgroundTaskQueue _taskQueue; private readonly ILogger<BackpressureWorker> _logger; public BackpressureWorker(IBackgroundTaskQueue taskQueue, ILogger<BackpressureWorker> logger) { _taskQueue = taskQueue; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("🚀 背压控制工作服务已启动"); while (!stoppingToken.IsCancellationRequested) { try { var workItem = await _taskQueue.DequeueAsync(stoppingToken); // 不阻塞地启动任务(让信号量控制并发) _ = Task.Run(async () => { try { await workItem(stoppingToken); } catch (Exception ex) { _logger.LogError(ex, "❌ 后台任务执行失败"); } }, stoppingToken); } catch (OperationCanceledException) { break; } } _logger.LogInformation("🛑 背压控制工作服务已停止"); } } // 示例重型任务服务 public class HeavyTaskService { private readonly IBackgroundTaskQueue _taskQueue; private readonly ILogger<HeavyTaskService> _logger; public HeavyTaskService(IBackgroundTaskQueue taskQueue, ILogger<HeavyTaskService> logger) { _taskQueue = taskQueue; _logger = logger; } public async Task ProcessLargeFileAsync(string fileName, int processingTime) { await _taskQueue.QueueAsync(async (cancellationToken) => { _logger.LogInformation("📁 开始处理大文件: {FileName} (预计{Time}秒)", fileName, processingTime); await Task.Delay(processingTime * 1000, cancellationToken); _logger.LogInformation("✅ 大文件处理完成: {FileName}", fileName); }); _logger.LogInformation("📋 大文件处理任务已排队: {FileName}", fileName); } public async Task GenerateReportAsync(string reportName, int complexity) { await _taskQueue.QueueAsync(async (cancellationToken) => { _logger.LogInformation("📊 开始生成报告: {ReportName} (复杂度{Complexity})", reportName, complexity); await Task.Delay(complexity * 500, cancellationToken); _logger.LogInformation("✅ 报告生成完成: {ReportName}", reportName); }); _logger.LogInformation("📋 报告生成任务已排队: {ReportName}", reportName); } public async Task BackupDatabaseAsync(string dbName, int sizeGB) { await _taskQueue.QueueAsync(async (cancellationToken) => { _logger.LogInformation("💾 开始数据库备份: {DbName} ({SizeGB}GB)", dbName, sizeGB); await Task.Delay(sizeGB * 200, cancellationToken); // 模拟按大小的备份时间 _logger.LogInformation("✅ 数据库备份完成: {DbName}", dbName); }); _logger.LogInformation("📋 数据库备份任务已排队: {DbName}", dbName); } } internal class Program { static async Task Main(string[] args) { var host = Host.CreateDefaultBuilder(args) .ConfigureServices(services => { // 注册背压控制队列,限制最大并发为3 services.AddSingleton<IBackgroundTaskQueue>(provider => { var logger = provider.GetService<ILogger<BackpressureTaskQueue>>(); return new BackpressureTaskQueue(maxConcurrency: 3, logger); }); services.AddHostedService<BackpressureWorker>(); services.AddScoped<HeavyTaskService>(); }) .ConfigureLogging(logging => { logging.ClearProviders(); logging.AddConsole(); }) .Build(); // 启动主机 _ = host.RunAsync(); // 添加大量重型任务来测试背压控制 using var scope = host.Services.CreateScope(); var taskService = scope.ServiceProvider.GetRequiredService<HeavyTaskService>(); await SimulateHeavyWorkload(taskService); // 等待用户输入 Console.WriteLine("\n按 'q' 退出程序..."); while (Console.ReadKey().KeyChar != 'q') { Console.WriteLine("\n按 'q' 退出程序..."); } await host.StopAsync(); } private static async Task SimulateHeavyWorkload(HeavyTaskService taskService) { Console.WriteLine("🎯 开始添加重型任务(测试背压控制)...\n"); // 快速添加多个重型任务 var tasks = new List<Task> { taskService.ProcessLargeFileAsync("video1.mp4", 3), taskService.ProcessLargeFileAsync("video2.mp4", 4), taskService.ProcessLargeFileAsync("video3.mp4", 2), taskService.GenerateReportAsync("月度报告", 6), taskService.GenerateReportAsync("年度报告", 8), taskService.BackupDatabaseAsync("用户数据库", 5), taskService.BackupDatabaseAsync("订单数据库", 3), taskService.ProcessLargeFileAsync("backup.zip", 2) }; // 并发添加所有任务 await Task.WhenAll(tasks); Console.WriteLine("\n📢 所有重型任务已提交!观察并发控制效果..."); Console.WriteLine("💡 注意:最多只有3个任务会同时执行"); } } }

image.png

🎯 技术选型决策指南

✅ 适合使用Worker Services + Channels的场景

  • 内部任务处理:无需复杂的任务调度界面
  • 自包含服务:本地部署或容器化应用
  • 微服务架构:配合消息总线使用
  • 性能敏感:对延迟和吞吐量有严格要求

❌ 建议使用第三方库的场景

  • 复杂调度需求:需要Cron表达式、延迟任务
  • 管理界面:需要任务监控、重试、暂停等功能
  • 持久化要求:任务需要跨应用重启保持
  • 分布式场景:多实例间的任务协调

🔥 专家建议:对于90%的企业应用场景,Worker Services + Channels已经足够强大且简洁。

💎 核心要点总结

经过深入的技术分析和实战验证,我们可以得出三个核心要点:

  1. 原生方案的威力:.NET 9的Worker Services + Channels组合提供了企业级的后台任务处理能力,性能媲美甚至超越许多第三方库
  2. 简化的架构设计:无需引入额外依赖,减少了系统复杂性和潜在的兼容性问题,同时降低了学习和维护成本
  3. 生产级的可靠性:微软内部服务大量使用这套方案,经过了海量流量的验证,稳定性和性能都值得信赖

你在项目中是如何处理后台任务的?是否遇到过性能瓶颈或稳定性问题?

欢迎在评论区分享你的经验,或者提出在实际应用中遇到的挑战。让我们一起探讨更多.NET后台任务处理的最佳实践!

如果这篇文章对你有帮助,请点赞并转发给更多需要的同行开发者! 🚀


本文作者:技术老小子

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!