在现代分布式系统中,你是否遇到过这样的场景:用户在A系统修改了数据,但B系统的缓存却迟迟不更新?订单状态变了,库存系统却还在"睡大觉"?
传统的定时轮询方式不仅效率低下,还可能漏掉关键数据变更。今天,我将向你揭示一个C#开发者的"秘密武器"——MySQL Binlog实时监听技术,让你的应用拥有"顺风耳"般的敏锐感知力,实现毫秒级的数据变更响应!
1. 定时轮询:效率低下的"笨办法"
C#// 传统的定时查询方式
while (true)
{
var changes = await CheckDataChanges(); // 大部分时候返回空
await Task.Delay(5000); // 白白浪费5秒
}
2. 触发器方案:维护成本高
3. 消息队列:需要修改业务代码
MySQL Binlog(Binary Log)是MySQL的二进制日志,记录了所有数据变更操作。通过监听Binlog,我们可以:
BashInstall-Package MySqlCdc Install-Package Microsoft.Extensions.Hosting Install-Package Newtonsoft.Json
appsettings.json:让配置更灵活
JSON{
"MySqlCdc": {
"Host": "localhost",
"Port": 3306,
"User": "root",
"Password": "your_password",
"Database": "your_database",
"MonitoredTables": ["users", "orders", "products"],
"MaxRetryAttempts": 5,
"RetryDelayMs": 5000
}
}
C#public class MySqlCdcConfig
{
public string Host { get; set; } = "localhost";
public int Port { get; set; } = 3306;
public string User { get; set; } = "root";
public string Password { get; set; } = "";
public string Database { get; set; } = "";
public List<string> MonitoredTables { get; set; } = new();
public int MaxRetryAttempts { get; set; } = 5;
public int RetryDelayMs { get; set; } = 5000;
}
C#using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MySqlCdc;
using MySqlCdc.Constants;
using MySqlCdc.Events;
using MySqlCdc.Providers.MariaDb;
using MySqlCdc.Providers.MySql;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
namespace AppMySqlBinlogListener;
class BinlogClientExample
{
private readonly MySqlCdcConfig _config;
private readonly ILogger<BinlogClientExample> _logger;
private readonly Dictionary<long, string> _tableDatabases = new Dictionary<long, string>();
private readonly Dictionary<long, string> _tableNames = new Dictionary<long, string>();
public BinlogClientExample(MySqlCdcConfig config, ILogger<BinlogClientExample> logger)
{
_config = config;
_logger = logger;
}
public async Task Start()
{
var client = new BinlogClient(options =>
{
options.Hostname = _config.Host;
options.Database = _config.Database;
options.Port = _config.Port;
options.Username = _config.User;
options.Password = _config.Password;
options.SslMode = SslMode.Disabled;
options.HeartbeatInterval = TimeSpan.FromSeconds(30);
options.Blocking = true;
// 根据配置决定从哪里开始监听
if (!string.IsNullOrEmpty(_config.BinlogFilename) && _config.BinlogPosition > 0)
{
options.Binlog = BinlogOptions.FromPosition(_config.BinlogFilename, _config.BinlogPosition);
_logger.LogInformation($"从指定位置开始监听: {_config.BinlogFilename}:{_config.BinlogPosition}");
}
else
{
options.Binlog = BinlogOptions.FromEnd();
_logger.LogInformation("从最新位置开始监听");
}
});
_logger.LogInformation($"开始监听数据库: {_config.Database}");
_logger.LogInformation($"监听的表: {string.Join(", ", _config.MonitoredTables)}");
var retryCount = 0;
while (retryCount < _config.MaxRetryAttempts)
{
try
{
await foreach (var (header, binlogEvent) in client.Replicate())
{
await HandleBinlogEvent(binlogEvent);
}
break; // 如果正常结束,跳出重试循环
}
catch (Exception ex)
{
retryCount++;
_logger.LogError(ex, $"监听失败,重试次数: {retryCount}/{_config.MaxRetryAttempts}");
if (retryCount < _config.MaxRetryAttempts)
{
_logger.LogInformation($"等待 {_config.RetryDelayMs}ms 后重试...");
await Task.Delay(_config.RetryDelayMs);
}
else
{
_logger.LogError("已达到最大重试次数,程序退出");
throw;
}
}
}
}
private async Task HandleBinlogEvent(IBinlogEvent binlogEvent)
{
switch (binlogEvent)
{
case TableMapEvent tableMap:
// 记录表ID与数据库、表名的映射关系
_tableDatabases[tableMap.TableId] = tableMap.DatabaseName;
_tableNames[tableMap.TableId] = tableMap.TableName;
// 只处理目标数据库的事件
if (tableMap.DatabaseName == _config.Database)
{
await HandleTableMapEvent(tableMap);
}
break;
case WriteRowsEvent writeRows:
if (IsMonitoredTable(writeRows.TableId))
{
await HandleWriteRowsEvent(writeRows);
}
break;
case UpdateRowsEvent updateRows:
if (IsMonitoredTable(updateRows.TableId))
{
await HandleUpdateRowsEvent(updateRows);
}
break;
case DeleteRowsEvent deleteRows:
if (IsMonitoredTable(deleteRows.TableId))
{
await HandleDeleteRowsEvent(deleteRows);
}
break;
}
}
private bool IsMonitoredTable(long tableId)
{
if (!_tableDatabases.TryGetValue(tableId, out var databaseName) ||
databaseName != _config.Database)
{
return false;
}
// 如果没有配置监听的表,则监听所有表
if (_config.MonitoredTables == null || !_config.MonitoredTables.Any())
{
return true;
}
// 检查是否在监听的表列表中
if (_tableNames.TryGetValue(tableId, out var tableName))
{
return _config.MonitoredTables.Contains(tableName);
}
return false;
}
private async Task PrintEventAsync(IBinlogEvent binlogEvent)
{
var json = JsonConvert.SerializeObject(binlogEvent, Formatting.Indented,
new JsonSerializerSettings()
{
Converters = new List<JsonConverter> { new StringEnumConverter() }
});
await Console.Out.WriteLineAsync(json);
}
private async Task HandleTableMapEvent(TableMapEvent tableMap)
{
_logger.LogInformation($"[TableMap] 表: {tableMap.DatabaseName}.{tableMap.TableName} (ID: {tableMap.TableId})");
// 如果是监听的表,则输出详细信息
if (_config.MonitoredTables == null || !_config.MonitoredTables.Any() ||
_config.MonitoredTables.Contains(tableMap.TableName))
{
await PrintEventAsync(tableMap);
}
}
private async Task HandleWriteRowsEvent(WriteRowsEvent writeRows)
{
var tableName = _tableNames.GetValueOrDefault(writeRows.TableId, "Unknown");
_logger.LogInformation($"[INSERT] {writeRows.Rows.Count} 行被插入到表 {tableName} (TableId: {writeRows.TableId})");
await PrintEventAsync(writeRows);
foreach (var row in writeRows.Rows)
{
Console.WriteLine($" 插入行数据: {JsonConvert.SerializeObject(row)}");
}
Console.WriteLine("----------------------------------------");
}
private async Task HandleUpdateRowsEvent(UpdateRowsEvent updateRows)
{
var tableName = _tableNames.GetValueOrDefault(updateRows.TableId, "Unknown");
_logger.LogInformation($"[UPDATE] {updateRows.Rows.Count} 行在表 {tableName} 中被更新 (TableId: {updateRows.TableId})");
await PrintEventAsync(updateRows);
foreach (var row in updateRows.Rows)
{
var rowBeforeUpdate = row.BeforeUpdate;
var rowAfterUpdate = row.AfterUpdate;
Console.WriteLine($" 更新前: {JsonConvert.SerializeObject(rowBeforeUpdate)}");
Console.WriteLine($" 更新后: {JsonConvert.SerializeObject(rowAfterUpdate)}");
}
Console.WriteLine("----------------------------------------");
}
private async Task HandleDeleteRowsEvent(DeleteRowsEvent deleteRows)
{
var tableName = _tableNames.GetValueOrDefault(deleteRows.TableId, "Unknown");
_logger.LogInformation($"[DELETE] {deleteRows.Rows.Count} 行从表 {tableName} 中被删除 (TableId: {deleteRows.TableId})");
await PrintEventAsync(deleteRows);
foreach (var row in deleteRows.Rows)
{
Console.WriteLine($" 删除行数据: {JsonConvert.SerializeObject(row)}");
}
Console.WriteLine("----------------------------------------");
}
}
class Program
{
static async Task Main(string[] args)
{
// 创建配置
var configuration = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
.AddEnvironmentVariables()
.AddCommandLine(args)
.Build();
// 创建主机
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((context, services) =>
{
// 注册配置
services.Configure<MySqlCdcConfig>(configuration.GetSection("MySqlCdc"));
// 注册服务
services.AddSingleton<MySqlCdcConfig>(provider =>
{
var config = new MySqlCdcConfig();
configuration.GetSection("MySqlCdc").Bind(config);
return config;
});
services.AddTransient<BinlogClientExample>();
})
.ConfigureLogging(logging =>
{
logging.ClearProviders();
logging.AddConsole();
logging.SetMinimumLevel(LogLevel.Information);
})
.Build();
try
{
using var scope = host.Services.CreateScope();
var binlogClient = scope.ServiceProvider.GetRequiredService<BinlogClientExample>();
var logger = scope.ServiceProvider.GetRequiredService<ILogger<Program>>();
logger.LogInformation("MySQL CDC 监听程序启动...");
await binlogClient.Start();
}
catch (Exception ex)
{
Console.WriteLine($"程序发生错误: {ex.Message}");
Console.WriteLine($"详细信息: {ex}");
}
finally
{
Console.WriteLine("按任意键退出...");
Console.ReadKey();
}
}
}
C#class Program
{
static async Task Main(string[] args)
{
var configuration = new ConfigurationBuilder()
.SetBasePath(Directory.GetCurrentDirectory())
.AddJsonFile("appsettings.json", optional: false)
.Build();
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((context, services) =>
{
services.AddSingleton<MySqlCdcConfig>(provider =>
{
var config = new MySqlCdcConfig();
configuration.GetSection("MySqlCdc").Bind(config);
return config;
});
services.AddTransient<BinlogClientExample>();
})
.ConfigureLogging(logging =>
{
logging.ClearProviders();
logging.AddConsole();
logging.SetMinimumLevel(LogLevel.Information);
})
.Build();
try
{
using var scope = host.Services.CreateScope();
var binlogClient = scope.ServiceProvider.GetRequiredService<BinlogClientExample>();
Console.WriteLine("🎯 MySQL CDC 监听程序启动中...");
await binlogClient.Start();
}
catch (Exception ex)
{
Console.WriteLine($"❌ 程序异常: {ex.Message}");
}
}
}
C#private async Task ProcessOrderUpdate(object beforeData, object afterData)
{
// 订单状态变更时,实时更新库存
if (IsOrderStatusChanged(beforeData, afterData))
{
await _inventoryService.SyncInventory(afterData);
await _cacheService.InvalidateCache($"product_{productId}");
}
}
C#private async Task ProcessDataForETL(string tableName, object rowData)
{
// 实时将业务数据同步到数据仓库
await _dataWarehouseService.SyncData(tableName, rowData);
await _analyticsService.TriggerRealTimeReport();
}
SQL-- 为监听用户分配必要权限
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc_user'@'%';
FLUSH PRIVILEGES;
C#// 批量处理提升性能
private readonly List<DataChange> _batchBuffer = new();
private readonly Timer _flushTimer = new(FlushBatch, null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
private async void FlushBatch(object state)
{
if (_batchBuffer.Any())
{
await ProcessBatchData(_batchBuffer.ToList());
_batchBuffer.Clear();
}
}
技术问题探讨:
分享你的经验:
如果你已经在使用类似技术,欢迎在评论区分享你的实战经验和踩坑记录!
经过今天的深入探讨,我们掌握了MySQL Binlog监听的三个关键要素:
掌握了这项技术,你就拥有了构建高性能实时数据系统的"超能力"。无论是电商库存同步、用户行为分析,还是数据仓库ETL,都能游刃有余地处理。
🔥 金句收藏:
觉得这篇文章对你有帮助吗?请点赞收藏并转发给更多C#开发同行!让我们一起在技术的道路上精进不止! 🚀
相关信息
通过网盘分享的文件:AppMySqlBinlogListener.zip 链接: https://pan.baidu.com/s/1r1znD4KMyqXqgypDR6fs8w?pwd=gjmk 提取码: gjmk --来自百度网盘超级会员v9的分享
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!