单机Worker跑得好好的,业务量一上来就开始"喘气"——队列积压、内存告警、任务超时。加机器?代码根本没考虑分布式,改起来像拆房子重建。
这不是个例。在实际项目里,单机Worker的瓶颈往往不是代码写得烂,而是架构从一开始就没有为"扩展"留门。
我在一个订单处理系统里就踩过这个坑:单机峰值QPS撑到800就开始丢任务,加了两台机器却因为没有协调机制,同一批任务被重复处理了三遍,客诉直接打过来。
读完这篇文章,你将掌握:
单机Worker的处理能力受限于单台服务器的CPU核心数、内存容量与网络带宽。以一个典型的图片处理Worker为例,单核处理一张图平均耗时120ms,8核机器理论并发上限约67张/秒。一旦业务峰值超过这个数字,队列就开始无限膨胀。
单机处理模型(测试环境:8核16G,.NET 8) 峰值吞吐:~67 tasks/s 队列积压临界点:500 tasks 内存压力点:任务堆积超过2000条时RSS增长约40%
单机宕机 = 整个管道停摆。没有故障转移,没有任务重新投递,业务直接中断。这在金融、电商等对可用性要求高的场景里是不可接受的。
多个Worker实例横向扩展时,最大的难题不是"怎么多跑几个进程",而是"怎么让它们协调工作"。没有共享状态层,就会出现:
这三道墙,是单机Worker走向分布式必须逐一击破的核心障碍。
Redis在这个场景里扮演的不是"数据库",而是分布式协调层。它的几个特性天然契合Worker管道的需求:
原子操作保证:LPUSH/BRPOP等命令是原子的,多个Worker同时抢任务不会出现竞争条件,这是避免重复消费的基础。
阻塞式消费:BRPOP支持阻塞等待,Worker不需要轮询,节省CPU资源,延迟也更低(通常在1ms以内)。
Stream数据结构:Redis 5.0引入的XADD/XREADGROUP提供了消费者组语义,天然支持ACK确认、消息重投、消费进度追踪,是构建可靠管道的利器。
轻量级:相比Kafka、RabbitMQ,Redis的运维复杂度低得多,对中小团队极其友好。
这是改造成本最低的起点,适合已有单机Worker、需要快速水平扩展的场景。
核心思路:用Redis List替代本地内存队列,所有Worker实例共享同一个队列,通过BRPOP的原子性保证每条任务只被一个Worker消费。
csharpusing Microsoft.Extensions.Hosting;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
namespace AppWorkerRedis
{
public class MyTask
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public string Name { get; set; } = string.Empty;
public string Payload { get; set; } = string.Empty;
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
}
// 任务生产者
public class TaskProducer
{
private readonly IDatabase _db;
private const string QueueKey = "worker:task:queue";
public TaskProducer(IConnectionMultiplexer redis)
{
_db = redis.GetDatabase();
}
public async Task EnqueueAsync<T>(T task) where T : class
{
var payload = JsonSerializer.Serialize(task);
// LPUSH 将任务推入队列头部
await _db.ListLeftPushAsync(QueueKey, payload);
}
}
// Worker消费者(可多实例部署)
public class TaskWorker : BackgroundService
{
private readonly IDatabase _db;
private const string QueueKey = "worker:task:queue";
private readonly TimeSpan _blockTimeout = TimeSpan.FromSeconds(5);
public TaskWorker(IConnectionMultiplexer redis)
{
_db = redis.GetDatabase();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// BRPOP 阻塞等待,原子性弹出,多实例安全
var result = await _db.ListRightPopAsync(QueueKey);
if (result.IsNull)
continue;
try
{
var task = JsonSerializer.Deserialize<MyTask>(result.ToString(), new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
await ProcessAsync(task!, stoppingToken);
}
catch (Exception ex)
{
// 基础版:失败直接记录,不重试(方案三会解决这个问题)
Console.WriteLine($"[Worker] Task failed: {ex.Message}");
}
}
}
private async Task ProcessAsync(MyTask task, CancellationToken ct)
{
// 实际业务处理逻辑
await Task.Delay(100, ct); // 模拟处理耗时
Console.WriteLine($"[Worker] Processed task: {task.Id}");
}
}
}

这个方案的局限:BRPOP弹出任务后如果Worker崩溃,任务就丢了。对于不允许丢失的业务场景,需要升级到方案三。
真实业务里,一个"任务"往往不是原子的。比如订单处理:验证 → 扣库存 → 生成物流单 → 发通知,每个步骤都可能耗时不一、需要独立扩展。
这时候需要多级管道(Pipeline),每个阶段独立一个Redis队列,Worker之间通过队列传递中间结果。
csharpusing Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using StackExchange.Redis;
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
namespace AppWorkerRedis
{
// ─── 订单上下文(贯穿整个管道的数据载体)──────────────────────
public class OrderContext
{
public string OrderId { get; set; } = Guid.NewGuid().ToString();
public string ProductId { get; set; } = string.Empty;
public int Quantity { get; set; }
public string UserId { get; set; } = string.Empty;
// 各阶段写入的状态标记
public bool IsValid { get; set; }
public bool StockDeducted { get; set; }
public string? ShipmentId { get; set; }
public bool Notified { get; set; }
}
// ─── 泛型管道基类 ───────────────────────────────────────────
public abstract class PipelineStage<TInput, TOutput> : BackgroundService
{
private readonly IConnectionMultiplexer _redis;
private readonly string _inputQueue;
private readonly string _outputQueue;
private readonly TimeSpan _blockTimeout = TimeSpan.FromSeconds(5);
protected PipelineStage(IConnectionMultiplexer redis, string inputQueue, string outputQueue)
{
_redis = redis;
_inputQueue = inputQueue;
_outputQueue = outputQueue;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var db = _redis.GetDatabase();
Console.WriteLine($"[{GetType().Name}] Started, listening on '{_inputQueue}'...");
while (!stoppingToken.IsCancellationRequested)
{
try
{
var raw = await db.ListRightPopAsync(_inputQueue);
if (raw.IsNull)
{
// 队列为空,等待 200ms 后再试,避免 CPU 空转
await Task.Delay(200, stoppingToken);
continue;
}
TInput? input;
try
{
input = JsonSerializer.Deserialize<TInput>(
raw.ToString(),
new JsonSerializerOptions { PropertyNameCaseInsensitive = true }
);
}
catch (JsonException ex)
{
Console.WriteLine($"[{GetType().Name}] Deserialize error: {ex.Message}");
continue;
}
if (input is null) continue;
var output = await ProcessStageAsync(input, stoppingToken);
if (output != null && !string.IsNullOrEmpty(_outputQueue))
{
await db.ListLeftPushAsync(
_outputQueue,
JsonSerializer.Serialize(output)
);
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
Console.WriteLine($"[{GetType().Name}] Error: {ex.Message}");
await Task.Delay(500, stoppingToken);
}
}
Console.WriteLine($"[{GetType().Name}] Stopped.");
}
protected abstract Task<TOutput?> ProcessStageAsync(TInput input, CancellationToken ct);
}
// ─── 队列键常量(集中管理,避免魔法字符串)────────────────────
public static class QueueKeys
{
public const string ValidateIn = "pipeline:validate:in";
public const string ValidateOut = "pipeline:validate:out"; // = DeductStock 的输入
public const string DeductOut = "pipeline:deduct:out"; // = CreateShipment 的输入
public const string ShipmentOut = "pipeline:shipment:out"; // = Notify 的输入
// Notify 是终点,无输出队列
}
// ─── Stage 1:校验阶段 ──────────────────────────────────────
public class ValidateStage : PipelineStage<OrderContext, OrderContext>
{
public ValidateStage(IConnectionMultiplexer redis)
: base(redis, QueueKeys.ValidateIn, QueueKeys.ValidateOut) { }
protected override async Task<OrderContext?> ProcessStageAsync(
OrderContext ctx, CancellationToken ct)
{
await Task.Delay(30, ct);
// 简单校验:数量必须大于 0
if (ctx.Quantity <= 0)
{
Console.WriteLine($"[Validate] Order {ctx.OrderId} INVALID, dropped.");
return null; // 返回 null 表示不继续流转
}
ctx.IsValid = true;
Console.WriteLine($"[Validate] Order {ctx.OrderId} passed validation.");
return ctx;
}
}
// ─── Stage 2:库存扣减阶段 ──────────────────────────────────
public class DeductStockStage : PipelineStage<OrderContext, OrderContext>
{
public DeductStockStage(IConnectionMultiplexer redis)
: base(redis, QueueKeys.ValidateOut, QueueKeys.DeductOut) { }
protected override async Task<OrderContext?> ProcessStageAsync(
OrderContext ctx, CancellationToken ct)
{
await Task.Delay(50, ct);
ctx.StockDeducted = true;
Console.WriteLine($"[DeductStock] Order {ctx.OrderId} stock deducted.");
return ctx;
}
}
// ─── Stage 3:创建物流单阶段 ────────────────────────────────
public class CreateShipmentStage : PipelineStage<OrderContext, OrderContext>
{
public CreateShipmentStage(IConnectionMultiplexer redis)
: base(redis, QueueKeys.DeductOut, QueueKeys.ShipmentOut) { }
protected override async Task<OrderContext?> ProcessStageAsync(
OrderContext ctx, CancellationToken ct)
{
await Task.Delay(80, ct);
ctx.ShipmentId = $"SHIP-{Guid.NewGuid().ToString("N")[..8].ToUpper()}";
Console.WriteLine($"[CreateShipment] Order {ctx.OrderId} → ShipmentId: {ctx.ShipmentId}");
return ctx;
}
}
// ─── Stage 4:通知阶段(终点,无输出队列)──────────────────
public class NotifyStage : PipelineStage<OrderContext, OrderContext>
{
public NotifyStage(IConnectionMultiplexer redis)
: base(redis, QueueKeys.ShipmentOut, string.Empty) { }
protected override async Task<OrderContext?> ProcessStageAsync(
OrderContext ctx, CancellationToken ct)
{
await Task.Delay(20, ct);
ctx.Notified = true;
Console.WriteLine(
$"[Notify] Order {ctx.OrderId} completed. " +
$"Valid={ctx.IsValid}, StockDeducted={ctx.StockDeducted}, " +
$"ShipmentId={ctx.ShipmentId}, Notified={ctx.Notified}"
);
return null; // 终点,不再推入下一队列
}
}
// ─── 订单生产者 ─────────────────────────────────────────────
public class OrderProducer
{
private readonly IDatabase _db;
public OrderProducer(IConnectionMultiplexer redis)
{
_db = redis.GetDatabase();
}
public async Task SubmitOrderAsync(OrderContext order)
{
var payload = JsonSerializer.Serialize(order);
await _db.ListLeftPushAsync(QueueKeys.ValidateIn, payload);
Console.WriteLine($"[Producer] Submitted order: {order.OrderId} | Product: {order.ProductId} x{order.Quantity}");
}
}
internal class Program
{
static async Task Main(string[] args)
{
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((context, services) =>
{
// Redis 连接(单例)
services.AddSingleton<IConnectionMultiplexer>(sp =>
{
var config = new ConfigurationOptions
{
EndPoints = { "localhost:6379" },
Password = "123456",
AbortOnConnectFail = false,
ConnectRetry = 3,
ConnectTimeout = 5000
};
return ConnectionMultiplexer.Connect(config);
});
// 注册订单生产者
services.AddSingleton<OrderProducer>();
// 注册四个管道阶段(每个都是独立的 BackgroundService)
services.AddHostedService<ValidateStage>();
services.AddHostedService<DeductStockStage>();
services.AddHostedService<CreateShipmentStage>();
services.AddHostedService<NotifyStage>();
})
.Build();
// 演示:Host 启动后投入测试订单
_ = Task.Run(async () =>
{
await Task.Delay(800); // 等待所有 Stage 完成初始化
var producer = host.Services.GetRequiredService<OrderProducer>();
// 正常订单
for (int i = 1; i <= 4; i++)
{
await producer.SubmitOrderAsync(new OrderContext
{
OrderId = $"ORD-{i:D3}",
ProductId = $"SKU-{i * 100}",
Quantity = i,
UserId = $"user-{i}"
});
await Task.Delay(300);
}
// 非法订单(Quantity=0,将在 Validate 阶段被丢弃)
await producer.SubmitOrderAsync(new OrderContext
{
OrderId = "ORD-BAD",
ProductId = "SKU-999",
Quantity = 0,
UserId = "user-bad"
});
});
await host.RunAsync();
}
}
}

这种设计的优势在于每个阶段可以独立扩展。如果"生成物流单"这个步骤是瓶颈,只需要多起几个CreateShipmentStage实例,其他阶段不受影响。
这是真正适合生产环境的方案,解决方案一和二共同的痛点:任务丢失与缺乏ACK确认机制。
Redis Stream的消费者组(Consumer Group)提供了类似Kafka的语义:消息被消费后不立即删除,而是等待消费者显式ACK;如果消费者崩溃,消息会进入PEL(Pending Entry List),可以被重新投递。
csharpusing Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using StackExchange.Redis;
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
namespace AppWorkerRedis
{
// 1. 任务模型
public class MyTask
{
public string Id { get; set; } = Guid.NewGuid().ToString("N");
public string Name { get; set; } = string.Empty;
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
}
// 2. 可靠消费者 Worker
public class ReliableWorker : BackgroundService
{
private readonly IDatabase _db;
private const string StreamKey = "worker:reliable:stream";
private const string GroupName = "order-processors";
private readonly string _consumerId;
public ReliableWorker(IConnectionMultiplexer redis)
{
_db = redis.GetDatabase();
// 每个 Worker 实例有唯一 ID,便于追踪
_consumerId = $"worker-{Environment.MachineName}-{Guid.NewGuid():N}".Substring(0, 24);
EnsureConsumerGroup();
}
private void EnsureConsumerGroup()
{
try
{
// 创建消费者组,从 Stream 起始位置消费
_db.StreamCreateConsumerGroup(StreamKey, GroupName, "0", createStream: true);
Console.WriteLine($"[ReliableWorker] Consumer group '{GroupName}' created.");
}
catch (RedisException ex) when (ex.Message.Contains("BUSYGROUP"))
{
// 消费者组已存在,忽略
Console.WriteLine($"[ReliableWorker] Consumer group '{GroupName}' already exists.");
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
Console.WriteLine($"[ReliableWorker] Starting consumer: {_consumerId}");
// 启动时先处理上次崩溃遗留的 Pending 消息
await ProcessPendingMessagesAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
try
{
// XREADGROUP 读取新消息(">" 表示只读未投递给任何消费者的新消息)
var messages = await _db.StreamReadGroupAsync(
StreamKey, GroupName, _consumerId,
count: 10, // 每次批量读取 10 条
noAck: false // 需要手动 ACK
);
if (messages == null || messages.Length == 0)
{
await Task.Delay(500, stoppingToken);
continue;
}
foreach (var message in messages)
{
await ProcessWithAckAsync(message, stoppingToken);
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
Console.WriteLine($"[ReliableWorker] Unexpected error: {ex.Message}");
await Task.Delay(1000, stoppingToken);
}
}
Console.WriteLine("[ReliableWorker] Stopped.");
}
private async Task ProcessWithAckAsync(StreamEntry message, CancellationToken ct)
{
try
{
var payload = message["payload"].ToString();
var task = JsonSerializer.Deserialize<MyTask>(payload);
if (task == null)
{
Console.WriteLine($"[ReliableWorker] Null task for message {message.Id}, skipping.");
await _db.StreamAcknowledgeAsync(StreamKey, GroupName, message.Id);
return;
}
await ProcessAsync(task, ct);
// 处理成功,显式 ACK,消息从 PEL 移除
await _db.StreamAcknowledgeAsync(StreamKey, GroupName, message.Id);
Console.WriteLine($"[ReliableWorker] ACK message {message.Id}");
}
catch (Exception ex)
{
// 处理失败,不 ACK,消息留在 PEL 等待重投
Console.WriteLine($"[ReliableWorker] Failed {message.Id}: {ex.Message}");
}
}
private async Task ProcessPendingMessagesAsync(CancellationToken ct)
{
Console.WriteLine("[ReliableWorker] Checking pending messages...");
// 查询超过 30 秒未 ACK 的消息并重新处理
var pending = await _db.StreamPendingMessagesAsync(
StreamKey, GroupName,
count: 100,
minId: "30",
consumerName: _consumerId
);
if (pending == null || pending.Length == 0)
{
Console.WriteLine("[ReliableWorker] No pending messages.");
return;
}
Console.WriteLine($"[ReliableWorker] Found {pending.Length} pending message(s), reclaiming...");
foreach (var entry in pending)
{
// XCLAIM 将超时消息转移到当前消费者
var claimed = await _db.StreamClaimAsync(
StreamKey, GroupName, _consumerId,30,
new[] { entry.MessageId }
);
foreach (var msg in claimed)
{
await ProcessWithAckAsync(msg, ct);
}
}
}
private async Task ProcessAsync(MyTask task, CancellationToken ct)
{
// 模拟业务处理耗时
await Task.Delay(100, ct);
Console.WriteLine($"[ReliableWorker] Processed task => Id: {task.Id}, Name: {task.Name}, CreatedAt: {task.CreatedAt:O}");
}
}
// 3. 任务生产者(Stream 版本)
public class ReliableProducer
{
private readonly IDatabase _db;
private const string StreamKey = "worker:reliable:stream";
public ReliableProducer(IConnectionMultiplexer redis)
{
_db = redis.GetDatabase();
}
public async Task<RedisValue> EnqueueAsync<T>(T task) where T : class
{
var payload = JsonSerializer.Serialize(task);
// XADD 追加消息到 Stream,自动生成消息 ID
var messageId = await _db.StreamAddAsync(StreamKey, new NameValueEntry[]
{
new("payload", payload),
new("timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString())
});
Console.WriteLine($"[ReliableProducer] Enqueued message: {messageId}");
return messageId;
}
}
// 4. 入口:注册服务 + 投递测试消息 + 启动
internal class Program
{
static async Task Main(string[] args)
{
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((context, services) =>
{
// Redis 连接(单例)
services.AddSingleton<IConnectionMultiplexer>(sp =>
{
var config = new ConfigurationOptions
{
EndPoints = { "localhost:6379" },
Password = "123456",
AbortOnConnectFail = false,
ConnectRetry = 3,
ConnectTimeout = 5000
};
return ConnectionMultiplexer.Connect(config);
});
// 注册生产者(Scoped 或 Singleton 均可)
services.AddSingleton<ReliableProducer>();
// 注册后台消费者 Worker
services.AddHostedService<ReliableWorker>();
})
.Build();
// ── 在 Host 启动前预先投递几条测试消息 ──
var producer = host.Services.GetRequiredService<ReliableProducer>();
Console.WriteLine("=== Enqueuing test tasks ===");
for (int i = 1; i <= 5; i++)
{
await producer.EnqueueAsync(new MyTask
{
Id = Guid.NewGuid().ToString("N"),
Name = $"Order-{i:D3}",
CreatedAt = DateTime.UtcNow
});
}
Console.WriteLine("=== Starting host (Worker will consume messages) ===");
// 启动 Host,Worker 开始消费,Ctrl+C 优雅退出
await host.RunAsync();
}
}
}

坑一:消费者组不存在导致异常
生产环境多实例启动时,多个Worker同时调用StreamCreateConsumerGroup会抛BUSYGROUP异常。代码里务必捕获并忽略这个错误(已在示例中处理)。
坑二:PEL无限膨胀 如果某条消息永远处理失败,它会一直留在PEL被反复重投,消耗资源。需要设置最大重试次数,超限后移入死信队列(Dead Letter Queue)单独处理。
坑三:序列化版本兼容
多节点部署时,滚动升级期间新旧版本Worker会同时运行。如果任务对象的字段发生变化,旧版本Worker可能反序列化失败。推荐使用System.Text.Json的[JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)]做向后兼容。
坑四:Redis连接池配置
ConnectionMultiplexer应以单例注入,避免频繁创建连接。在appsettings.json里配置connectRetry和connectTimeout,防止Redis短暂不可用时Worker雪崩。
如果你想在这个方向继续深入,推荐按以下路径展开:
XREADGROUP、XACK、XCLAIM的语义StackExchange.Redis的StreamReadGroupAsync批量消费与背压控制你在项目里有没有遇到过Worker扩展的问题?是用Redis、还是用MQ、还是其他方案解决的?不同的业务场景下,选型的权衡点往往差异很大,欢迎分享你的实践经验。
另外,如果你的业务场景里有任务优先级的需求(高优先级任务插队处理),Redis的Sorted Set可以实现一个优先级队列,这是一个值得单独展开的话题,有兴趣的话可以在评论区告诉我。
#C# #性能优化 #Redis #分布式架构 #Worker管道 #微服务
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!