编辑
2025-09-18
C#
00

目录

系统架构
配置管理 (SocketServerOptions)
性能优化策略
线程池优化
并发连接管理
RabbitMQ 集成
连接处理
优雅关闭与资源管理
完整代码
Nuget 安装相关包
服务端
测试客户端
应用场景
总结

在现代网络应用中,构建高性能、可扩展的网络服务器是一个常见的挑战。本文将深入探讨一个基于 C# 的高性能 Socket 服务器实现,程序实现了一个基于 TCP 的高性能异步 Socket 服务器,该服务器接收客户端消息后,通过 RabbitMQ 队列进行消息的异步发送和处理,适合用于需要高并发连接和消息队列集成的实时通信场景。

系统架构

该 Socket 服务器的整体架构包括以下核心组件:

  • SocketServerOptions:配置管理
  • HighPerformanceSocketServer:核心服务器逻辑
  • Program:服务器启动与生命周期管理

配置管理 (SocketServerOptions)

C#
public class SocketServerOptions { public int Port { get; set; } = 8888; // 监听端口 public int BacklogSize { get; set; } = 10000; // 连接积压队列大小 public int BufferSize { get; set; } = 1024; // 数据缓冲区大小 public string MqHost { get; set; } = "localhost"; // RabbitMQ 主机 public string QueueName { get; set; } = "socket_messages"; // 消息队列名称 }

性能优化策略

线程池优化

C#
private void OptimizeThreadPool() { int minThreads = Math.Max(100, Environment.ProcessorCount * 32); ThreadPool.SetMinThreads(minThreads, minThreads); ThreadPool.SetMaxThreads(minThreads * 2, minThreads * 2); }

通过动态调整线程池参数,服务器可以根据 CPU 核心数调整线程数量,提高并发处理能力。

并发连接管理

使用 ConcurrentDictionary 实现线程安全的连接管理:

C#
private readonly ConcurrentDictionary<Guid, SocketConnection> _connections = new ConcurrentDictionary<Guid, SocketConnection>();

RabbitMQ 集成

服务器通过 RabbitMQ 实现消息解耦和异步处理:

C#
// RabbitMQ 连接初始化 var factory = new ConnectionFactory() { HostName = _options.MqHost, AutomaticRecoveryEnabled = true, NetworkRecoveryInterval = TimeSpan.FromSeconds(10) }; // 队列声明,增加可靠性配置 _mqChannel.QueueDeclareAsync( queue: _options.QueueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-max-length", 100000 }, // 队列最大长度 { "x-message-ttl", 86400000 } // 消息过期时间 } );

连接处理

使用异步处理每个客户端连接,避免阻塞主线程:

C#
private async Task HandleConnectionAsync(SocketConnection connection) { try { // 读取客户端消息 while (true) { int bytesRead = await networkStream.ReadAsync(buffer, 0, buffer.Length); if (bytesRead == 0) break; string message = Encoding.UTF8.GetString(buffer, 0, bytesRead); await SendToMQAsync(message); } } catch (Exception ex) { _logger.LogWarning($"连接发生错误: {ex.Message}"); } }

优雅关闭与资源管理

实现 IAsyncDisposable 接口,确保资源得到正确释放:

C#
public async ValueTask DisposeAsync() { try { // 关闭所有活跃连接 foreach (var conn in _connections.Values) { conn.Socket.Close(); } _connections.Clear(); _listener?.Close(); await _mqChannel.CloseAsync(); await _mqConnection.CloseAsync(); } catch (Exception ex) { _logger.LogError($"资源释放失败: {ex.Message}"); } }

完整代码

Nuget 安装相关包

C#
Microsoft.Extensions.Logging Microsoft.Extensions.Logging.Console RabbitMQ.Client

服务端

C#
using System; using System.Collections.Concurrent; using System.Net; using System.Net.Sockets; using System.Text; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using RabbitMQ.Client; namespace AppHSocketServer { public class SocketServerOptions { public int Port { get; set; } = 8888; public int BacklogSize { get; set; } = 10000; public int BufferSize { get; set; } = 1024; public string MqHost { get; set; } = "localhost"; public string QueueName { get; set; } = "socket_messages"; } public class HighPerformanceSocketServer : IAsyncDisposable { private readonly ILogger<HighPerformanceSocketServer> _logger; private readonly SocketServerOptions _options; private Socket _listener; // 线程安全的连接管理 private readonly ConcurrentDictionary<Guid, SocketConnection> _connections = new ConcurrentDictionary<Guid, SocketConnection>(); // RabbitMQ相关 private readonly IConnection _mqConnection; private readonly IChannel _mqChannel; // 连接状态追踪 private class SocketConnection { public Guid Id { get; set; } public Socket Socket { get; set; } public DateTime ConnectedAt { get; set; } } public HighPerformanceSocketServer( ILogger<HighPerformanceSocketServer> logger, SocketServerOptions options = null) { _logger = logger; _options = options ?? new SocketServerOptions(); // 初始化RabbitMQ连接 try { var factory = new ConnectionFactory() { HostName = _options.MqHost, // 添加更多连接配置 AutomaticRecoveryEnabled = true, NetworkRecoveryInterval = TimeSpan.FromSeconds(10) }; _mqConnection = factory.CreateConnectionAsync().Result; _mqChannel = _mqConnection.CreateChannelAsync().Result; // 声明队列,增加更多配置 _mqChannel.QueueDeclareAsync( queue: _options.QueueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-max-length", 100000 }, // 队列最大长度 { "x-message-ttl", 86400000 } // 消息过期时间24小时 } ); } catch (Exception ex) { _logger.LogError($"RabbitMQ初始化失败: {ex.Message}"); throw; } // 优化线程池 OptimizeThreadPool(); } private void OptimizeThreadPool() { int minThreads = Math.Max(100, Environment.ProcessorCount * 32); ThreadPool.SetMinThreads(minThreads, minThreads); ThreadPool.SetMaxThreads(minThreads * 2, minThreads * 2); } public async Task StartAsync() { try { _listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); _listener.Bind(new IPEndPoint(IPAddress.Any, _options.Port)); _listener.Listen(_options.BacklogSize); _logger.LogInformation($"服务器启动,监听端口 {_options.Port}"); while (true) { var socket = await Task.Factory.FromAsync( _listener.BeginAccept, _listener.EndAccept, null ); var clientId = Guid.NewGuid(); var connection = new SocketConnection { Id = clientId, Socket = socket, ConnectedAt = DateTime.UtcNow }; _connections[clientId] = connection; // 不等待处理,提高并发性 _ = HandleConnectionAsync(connection); } } catch (Exception ex) { _logger.LogCritical($"服务器启动失败: {ex.Message}"); throw; } } private async Task HandleConnectionAsync(SocketConnection connection) { try { using var networkStream = new NetworkStream(connection.Socket); var buffer = new byte[_options.BufferSize]; while (true) { int bytesRead = await networkStream.ReadAsync(buffer, 0, buffer.Length); if (bytesRead == 0) break; string message = Encoding.UTF8.GetString(buffer, 0, bytesRead); await SendToMQAsync(message); _logger.LogInformation($"收到客户端 {connection.Id} 消息: {message}"); } } catch (Exception ex) { _logger.LogWarning($"连接 {connection.Id} 发生错误: {ex.Message}"); } finally { _connections.TryRemove(connection.Id, out _); connection.Socket.Close(); } } private async Task SendToMQAsync(string message) { try { var body = Encoding.UTF8.GetBytes(message); await _mqChannel.BasicPublishAsync( exchange: "", routingKey: _options.QueueName, body: body ); } catch (Exception ex) { _logger.LogError($"发送消息到MQ失败: {ex.Message}"); } } public async ValueTask DisposeAsync() { try { // 关闭所有活跃连接 foreach (var conn in _connections.Values) { conn.Socket.Close(); } _connections.Clear(); _listener?.Close(); await _mqChannel.CloseAsync(); await _mqConnection.CloseAsync(); } catch (Exception ex) { _logger.LogError($"资源释放失败: {ex.Message}"); } } } }
C#
using System; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.DependencyInjection; namespace AppHSocketServer { class Program { static async Task Main(string[] args) { // 创建依赖注入容器 var services = new ServiceCollection(); // 配置日志 services.AddLogging(configure => { configure.AddConsole(); // 控制台日志 configure.SetMinimumLevel(LogLevel.Information); }); // 配置Socket服务器 services.AddSingleton(sp => { var logger = sp.GetRequiredService<ILogger<HighPerformanceSocketServer>>(); // 自定义配置(可选) var options = new SocketServerOptions { Port = 8888, // 监听端口 BacklogSize = 10000, // 连接积压队列大小 BufferSize = 1024, // 缓冲区大小 MqHost = "localhost", // RabbitMQ主机 QueueName = "socket_messages" // 队列名称 }; return new HighPerformanceSocketServer(logger, options); }); // 构建服务提供者 var serviceProvider = services.BuildServiceProvider(); // 获取Socket服务器实例 var socketServer = serviceProvider.GetRequiredService<HighPerformanceSocketServer>(); try { // 启动服务器 Console.WriteLine("Socket服务器正在启动..."); // 捕获取消操作 using var cancellationTokenSource = new CancellationTokenSource(); // 处理控制台退出信号 Console.CancelKeyPress += (sender, e) => { e.Cancel = true; // 阻止默认的退出行为 cancellationTokenSource.Cancel(); }; // 启动服务器 await StartServerWithGracefulShutdown(socketServer, cancellationTokenSource.Token); } catch (Exception ex) { Console.WriteLine($"服务器启动失败: {ex.Message}"); } } static async Task StartServerWithGracefulShutdown( HighPerformanceSocketServer server, CancellationToken cancellationToken) { try { // 创建一个任务来运行服务器 var serverTask = server.StartAsync(); // 等待取消信号 await Task.Delay(Timeout.Infinite, cancellationToken); } catch (OperationCanceledException) { Console.WriteLine("服务器正在关闭..."); } finally { // 异步释放资源 await server.DisposeAsync(); Console.WriteLine("服务器已安全关闭"); } } } }

测试客户端

C#
using System.Net.Sockets; using System.Text; namespace AppHSocketClient { internal class Program { public static async Task Main() { // 创建多个客户端模拟并发 var tasks = new List<Task>(); for (int i = 0; i < 10; i++) { tasks.Add(SimulateClient(i)); } await Task.WhenAll(tasks); Console.ReadKey(); } static async Task SimulateClient(int clientId) { using (var client = new TcpClient()) { await client.ConnectAsync("localhost", 8888); using (var stream = client.GetStream()) { for (int i = 0; i < 100; i++) { string message = $"Client {clientId} - Message {i}"; byte[] data = Encoding.UTF8.GetBytes(message); await stream.WriteAsync(data, 0, data.Length); Console.WriteLine($"发送消息: {message}"); // 模拟发送间隔 await Task.Delay(10); } } } } } }

image.png

image.png

image.png

应用场景

这种高性能 Socket 服务器适用于:

  • 实时通信系统
  • 日志收集
  • 物联网设备通信
  • 分布式系统消息传输

总结

本文展示了一个 C# 实现的高性能 Socket 服务器,通过线程池优化、并发连接管理和 RabbitMQ 集成,提供了一个可扩展、高效的网络服务解决方案。

相关信息

通过网盘分享的文件:AppHSocketClient.zip 链接: https://pan.baidu.com/s/1UvxTKGtqQDXvCL-Ip5cm9A?pwd=xbac 提取码: xbac --来自百度网盘超级会员v9的分享

本文作者:技术老小子

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!