在现代网络应用中,构建高性能、可扩展的网络服务器是一个常见的挑战。本文将深入探讨一个基于 C# 的高性能 Socket 服务器实现,程序实现了一个基于 TCP 的高性能异步 Socket 服务器,该服务器接收客户端消息后,通过 RabbitMQ 队列进行消息的异步发送和处理,适合用于需要高并发连接和消息队列集成的实时通信场景。
该 Socket 服务器的整体架构包括以下核心组件:
SocketServerOptions
:配置管理HighPerformanceSocketServer
:核心服务器逻辑Program
:服务器启动与生命周期管理C#public class SocketServerOptions
{
public int Port { get; set; } = 8888; // 监听端口
public int BacklogSize { get; set; } = 10000; // 连接积压队列大小
public int BufferSize { get; set; } = 1024; // 数据缓冲区大小
public string MqHost { get; set; } = "localhost"; // RabbitMQ 主机
public string QueueName { get; set; } = "socket_messages"; // 消息队列名称
}
C#private void OptimizeThreadPool()
{
int minThreads = Math.Max(100, Environment.ProcessorCount * 32);
ThreadPool.SetMinThreads(minThreads, minThreads);
ThreadPool.SetMaxThreads(minThreads * 2, minThreads * 2);
}
通过动态调整线程池参数,服务器可以根据 CPU 核心数调整线程数量,提高并发处理能力。
使用 ConcurrentDictionary
实现线程安全的连接管理:
C#private readonly ConcurrentDictionary<Guid, SocketConnection> _connections
= new ConcurrentDictionary<Guid, SocketConnection>();
服务器通过 RabbitMQ 实现消息解耦和异步处理:
C#// RabbitMQ 连接初始化
var factory = new ConnectionFactory()
{
HostName = _options.MqHost,
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
};
// 队列声明,增加可靠性配置
_mqChannel.QueueDeclareAsync(
queue: _options.QueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string, object>
{
{ "x-max-length", 100000 }, // 队列最大长度
{ "x-message-ttl", 86400000 } // 消息过期时间
}
);
使用异步处理每个客户端连接,避免阻塞主线程:
C#private async Task HandleConnectionAsync(SocketConnection connection)
{
try
{
// 读取客户端消息
while (true)
{
int bytesRead = await networkStream.ReadAsync(buffer, 0, buffer.Length);
if (bytesRead == 0) break;
string message = Encoding.UTF8.GetString(buffer, 0, bytesRead);
await SendToMQAsync(message);
}
}
catch (Exception ex)
{
_logger.LogWarning($"连接发生错误: {ex.Message}");
}
}
实现 IAsyncDisposable
接口,确保资源得到正确释放:
C#public async ValueTask DisposeAsync()
{
try
{
// 关闭所有活跃连接
foreach (var conn in _connections.Values)
{
conn.Socket.Close();
}
_connections.Clear();
_listener?.Close();
await _mqChannel.CloseAsync();
await _mqConnection.CloseAsync();
}
catch (Exception ex)
{
_logger.LogError($"资源释放失败: {ex.Message}");
}
}
C#Microsoft.Extensions.Logging Microsoft.Extensions.Logging.Console RabbitMQ.Client
C#using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
namespace AppHSocketServer
{
public class SocketServerOptions
{
public int Port { get; set; } = 8888;
public int BacklogSize { get; set; } = 10000;
public int BufferSize { get; set; } = 1024;
public string MqHost { get; set; } = "localhost";
public string QueueName { get; set; } = "socket_messages";
}
public class HighPerformanceSocketServer : IAsyncDisposable
{
private readonly ILogger<HighPerformanceSocketServer> _logger;
private readonly SocketServerOptions _options;
private Socket _listener;
// 线程安全的连接管理
private readonly ConcurrentDictionary<Guid, SocketConnection> _connections
= new ConcurrentDictionary<Guid, SocketConnection>();
// RabbitMQ相关
private readonly IConnection _mqConnection;
private readonly IChannel _mqChannel;
// 连接状态追踪
private class SocketConnection
{
public Guid Id { get; set; }
public Socket Socket { get; set; }
public DateTime ConnectedAt { get; set; }
}
public HighPerformanceSocketServer(
ILogger<HighPerformanceSocketServer> logger,
SocketServerOptions options = null)
{
_logger = logger;
_options = options ?? new SocketServerOptions();
// 初始化RabbitMQ连接
try
{
var factory = new ConnectionFactory()
{
HostName = _options.MqHost,
// 添加更多连接配置
AutomaticRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
};
_mqConnection = factory.CreateConnectionAsync().Result;
_mqChannel = _mqConnection.CreateChannelAsync().Result;
// 声明队列,增加更多配置
_mqChannel.QueueDeclareAsync(
queue: _options.QueueName,
durable: true,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string, object>
{
{ "x-max-length", 100000 }, // 队列最大长度
{ "x-message-ttl", 86400000 } // 消息过期时间24小时
}
);
}
catch (Exception ex)
{
_logger.LogError($"RabbitMQ初始化失败: {ex.Message}");
throw;
}
// 优化线程池
OptimizeThreadPool();
}
private void OptimizeThreadPool()
{
int minThreads = Math.Max(100, Environment.ProcessorCount * 32);
ThreadPool.SetMinThreads(minThreads, minThreads);
ThreadPool.SetMaxThreads(minThreads * 2, minThreads * 2);
}
public async Task StartAsync()
{
try
{
_listener = new Socket(AddressFamily.InterNetwork,
SocketType.Stream,
ProtocolType.Tcp);
_listener.Bind(new IPEndPoint(IPAddress.Any, _options.Port));
_listener.Listen(_options.BacklogSize);
_logger.LogInformation($"服务器启动,监听端口 {_options.Port}");
while (true)
{
var socket = await Task.Factory.FromAsync(
_listener.BeginAccept,
_listener.EndAccept,
null
);
var clientId = Guid.NewGuid();
var connection = new SocketConnection
{
Id = clientId,
Socket = socket,
ConnectedAt = DateTime.UtcNow
};
_connections[clientId] = connection;
// 不等待处理,提高并发性
_ = HandleConnectionAsync(connection);
}
}
catch (Exception ex)
{
_logger.LogCritical($"服务器启动失败: {ex.Message}");
throw;
}
}
private async Task HandleConnectionAsync(SocketConnection connection)
{
try
{
using var networkStream = new NetworkStream(connection.Socket);
var buffer = new byte[_options.BufferSize];
while (true)
{
int bytesRead = await networkStream.ReadAsync(buffer, 0, buffer.Length);
if (bytesRead == 0) break;
string message = Encoding.UTF8.GetString(buffer, 0, bytesRead);
await SendToMQAsync(message);
_logger.LogInformation($"收到客户端 {connection.Id} 消息: {message}");
}
}
catch (Exception ex)
{
_logger.LogWarning($"连接 {connection.Id} 发生错误: {ex.Message}");
}
finally
{
_connections.TryRemove(connection.Id, out _);
connection.Socket.Close();
}
}
private async Task SendToMQAsync(string message)
{
try
{
var body = Encoding.UTF8.GetBytes(message);
await _mqChannel.BasicPublishAsync(
exchange: "",
routingKey: _options.QueueName,
body: body
);
}
catch (Exception ex)
{
_logger.LogError($"发送消息到MQ失败: {ex.Message}");
}
}
public async ValueTask DisposeAsync()
{
try
{
// 关闭所有活跃连接
foreach (var conn in _connections.Values)
{
conn.Socket.Close();
}
_connections.Clear();
_listener?.Close();
await _mqChannel.CloseAsync();
await _mqConnection.CloseAsync();
}
catch (Exception ex)
{
_logger.LogError($"资源释放失败: {ex.Message}");
}
}
}
}
C#using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;
namespace AppHSocketServer
{
class Program
{
static async Task Main(string[] args)
{
// 创建依赖注入容器
var services = new ServiceCollection();
// 配置日志
services.AddLogging(configure =>
{
configure.AddConsole(); // 控制台日志
configure.SetMinimumLevel(LogLevel.Information);
});
// 配置Socket服务器
services.AddSingleton(sp =>
{
var logger = sp.GetRequiredService<ILogger<HighPerformanceSocketServer>>();
// 自定义配置(可选)
var options = new SocketServerOptions
{
Port = 8888, // 监听端口
BacklogSize = 10000, // 连接积压队列大小
BufferSize = 1024, // 缓冲区大小
MqHost = "localhost", // RabbitMQ主机
QueueName = "socket_messages" // 队列名称
};
return new HighPerformanceSocketServer(logger, options);
});
// 构建服务提供者
var serviceProvider = services.BuildServiceProvider();
// 获取Socket服务器实例
var socketServer = serviceProvider.GetRequiredService<HighPerformanceSocketServer>();
try
{
// 启动服务器
Console.WriteLine("Socket服务器正在启动...");
// 捕获取消操作
using var cancellationTokenSource = new CancellationTokenSource();
// 处理控制台退出信号
Console.CancelKeyPress += (sender, e) =>
{
e.Cancel = true; // 阻止默认的退出行为
cancellationTokenSource.Cancel();
};
// 启动服务器
await StartServerWithGracefulShutdown(socketServer, cancellationTokenSource.Token);
}
catch (Exception ex)
{
Console.WriteLine($"服务器启动失败: {ex.Message}");
}
}
static async Task StartServerWithGracefulShutdown(
HighPerformanceSocketServer server,
CancellationToken cancellationToken)
{
try
{
// 创建一个任务来运行服务器
var serverTask = server.StartAsync();
// 等待取消信号
await Task.Delay(Timeout.Infinite, cancellationToken);
}
catch (OperationCanceledException)
{
Console.WriteLine("服务器正在关闭...");
}
finally
{
// 异步释放资源
await server.DisposeAsync();
Console.WriteLine("服务器已安全关闭");
}
}
}
}
C#using System.Net.Sockets;
using System.Text;
namespace AppHSocketClient
{
internal class Program
{
public static async Task Main()
{
// 创建多个客户端模拟并发
var tasks = new List<Task>();
for (int i = 0; i < 10; i++)
{
tasks.Add(SimulateClient(i));
}
await Task.WhenAll(tasks);
Console.ReadKey();
}
static async Task SimulateClient(int clientId)
{
using (var client = new TcpClient())
{
await client.ConnectAsync("localhost", 8888);
using (var stream = client.GetStream())
{
for (int i = 0; i < 100; i++)
{
string message = $"Client {clientId} - Message {i}";
byte[] data = Encoding.UTF8.GetBytes(message);
await stream.WriteAsync(data, 0, data.Length);
Console.WriteLine($"发送消息: {message}");
// 模拟发送间隔
await Task.Delay(10);
}
}
}
}
}
}
这种高性能 Socket 服务器适用于:
本文展示了一个 C# 实现的高性能 Socket 服务器,通过线程池优化、并发连接管理和 RabbitMQ 集成,提供了一个可扩展、高效的网络服务解决方案。
相关信息
通过网盘分享的文件:AppHSocketClient.zip 链接: https://pan.baidu.com/s/1UvxTKGtqQDXvCL-Ip5cm9A?pwd=xbac 提取码: xbac --来自百度网盘超级会员v9的分享
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!