CanalSharp-mysql数据库binlog的增量订阅&消费组件Canal的.NET客户端

一.前言 CanalSharp是阿里巴巴开源项目mysql数据库binlog的增量订阅&消费组件 Canal 的.NET客户端,关于什么是 Canal?又能做什么?我会在后文为大家一一介绍。CanalSharp 这个项目,是由我和 WithLin(主要贡献) 完成,并将一直进行维护的Canal的.NET客户端项目。目前开源在github:https://github.com/CanalSharp/CanalSharp/ 希望大家多多支持,旨在为.NET开发者提供一个友好的对接Canal的选择,为.NET社区生态做贡献。 二.Canal介绍 1.背景 早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。 ps. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48) 基于日志增量订阅&消费支持的业务: 数据库镜像 数据库实时备份 多级索引 (卖家和买家各自分库索引) search build 业务cache刷新 价格变化等重要业务消息 2.工作原理 2.1 mysql主备复制实现 1537856325274 从上层来看,复制分成三步: master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看); slave将master的binary log events拷贝到它的中继日志(relay log); slave重做中继日志中的事件,将改变反映它自己的数据。 2.2 Canal的工作原理 1537856417016 原理相对比较简单: canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议 mysql master收到dump请求,开始推送binary log给slave(也就是canal) canal解析binary log对象(原始为byte流) 以上内容摘自Canal项目官方资料 https://github.com/alibaba/canal 3.Canal的安装以及使用 Canal的安装以及使用请查阅官方文档,本文不在赘述。 https://github.com/alibaba/canal/wiki 三.CanalSharp介绍 1.工作原理 CanalSharp 是 Canal 的 .NET 客户端,它与 Canal 是采用的Socket来进行通信的,传输协议是TCP,交互协议采用的是 Google Protocol Buffer 3.0。 2.工作流程 1.Canal连接到mysql数据库,模拟slave 2.CanalSharp与Canal建立连接 2.数据库发生变更写入到binlog 5.Canal向数据库发送dump请求,获取binlog并解析 4.CanalSharp向Canal请求数据库变更 4.Canal发送解析后的数据给CanalSharp 5.CanalSharp收到数据,消费成功,发送回执。(可选) 6.Canal记录消费位置。 以一张图来表示: 1537860226808 3.应用场景 CanalSharp作为Canal的客户端,其应用场景就是Canal的应用场景。关于应用场景在Canal介绍一节已有概述。这里我举一些实际的使用例子: 1.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。 2.根据数据库的变更实时更新搜索引擎,比如电商场景下商品信息发生变更,实时同步到商品搜索引擎 Elasticsearch、solr等 3.根据数据库的变更实时更新缓存,比如电商场景下商品价格、库存发生变更实时同步到redis 4.数据库异地备份、数据同步 5.根据数据库变更触发某种业务,比如电商场景下,创建订单超过xx时间未支付被自动取消,我们获取到这条订单数据的状态变更即可向用户推送消息。 6.将数据库变更整理成自己的数据格式发送到kafka等消息队列,供消息队列的消费者进行消费。 四.CanalSharp的使用 1.使用前的准备 使用 CanalSharp 之前,必然要先准备好mysql数据库以及Canal才行,这个步骤请直接查阅Canal官方文档 https://github.com/alibaba/canal/wiki 。但是为了让大家能快速跑通CanalSharp,CanalSharp 项目为大家提供了一个通过 docker-compose 同时运行 mysql和canal。 2.通过docker-compose运行mysql和canal: git clone https://github.com/CanalSharp/CanalSharp.git cd docker docker-compose up -d 出现下图表示运行成功: 1537866674285 3.使用navicat等数据库管理工具连接mysql ip:运行docker的服务器ip mysql用户:root mysql密码:000000 mysql端口:4406 默认提供了一个test数据库,然后有一张名为test的表。 1537866852816 4.创建一个 .NET Core 控制台项目 5.添加 Nuget 程序包 Install-Package CanalSharp.Client 1537867132107 6.编码 也可以直接下载源码运行 Sample 项目 https://github.com/CanalSharp/CanalSharp/tree/master/sample/CanalSharp.SimpleClient (1)建立连接 //canal 配置的 destination,默认为 example var destination = "example"; //创建一个简单CanalClient连接对象(此对象不支持集群)传入参数分别为 canal地址、端口、destination、用户名、密码 var connector = CanalConnectors.NewSingleConnector("127.0.0.1", 11111, destination, "", ""); //连接 Canal connector.Connect(); //订阅,同时传入Filter,如果不传则以Canal的Filter为准。Filter是一种过滤规则,通过该规则的表数据变更才会传递过来 connector.Subscribe(".*\\\\..*"); (2)获取数据 while (true) { //获取数据 1024表示数据大小 单位为字节 var message = connector.Get(1024); //批次id 可用于回滚 var batchId = message.Id; if (batchId == -1 || message.Entries.Count <= 0) { Thread.Sleep(300); continue; } PrintEntry(message.Entries); } (3)输出数据 /// /// 输出数据 /// /// 一个entry表示一个数据库变更 private static void PrintEntry(List entrys) { foreach (var entry in entrys) { if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend) { continue; } RowChange rowChange = null; try { //获取行变更 rowChange = RowChange.Parser.ParseFrom(entry.StoreValue); } catch (Exception e) { } if (rowChange != null) { //变更类型 insert/update/delete 等等 EventType eventType = rowChange.EventType; //输出binlog信息 表名 数据库名 变更类型 Console.WriteLine( $"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}"); //输出 insert/update/delete 变更类型列数据 foreach (var rowData in rowChange.RowDatas) { if (eventType == EventType.Delete) { PrintColumn(rowData.BeforeColumns.ToList()); } else if (eventType == EventType.Insert) { PrintColumn(rowData.AfterColumns.ToList()); } else { Console.WriteLine("-------> before"); PrintColumn(rowData.BeforeColumns.ToList()); Console.WriteLine("-------> after"); PrintColumn(rowData.AfterColumns.ToList()); } } } } } /// /// 输出每个列的详细数据 /// /// private static void PrintColumn(List columns) { foreach (var column in columns) { //输出列明 列值 是否变更 Console.WriteLine($"{column.Name} : {column.Value} update= {column.Updated}"); } } 7.测试运行 首次运行会输出一堆数据,那些都是初始化运行创建表的数据,忽略即可 运行项目,然后一次执行sql观察输出: insert into test values(1000,'111'); update test set name='222' where id=1000; delete from test where id=1000; 通过新标签页打开图片查看大图 可以看见我们分别执行 insert、update、delete 语句,我们的CanalSharp都获取到了数据库变更。 五.使用Canal的经验 1.mysql数据库版本有要求:5.7.13, 5.6.10,、5.5.18和5.1.40/48,不一定非要满足小版本号的要求,比如 5.7.x、5.6.x、5.5.x都应该可以,但是实际需要自己做测试。前面的具体版本号是Canal官方提供的资料,但是博主公司用的mysql 的版本是5.5.60,是可以正常使用Canal的。 2.mysql数据binlog的格式强烈建议设置为row 3.Canal并非必须连接到master数据库,它同样可以连接到slave数据库,只是从库出了需要开启写入binlog以外还需要设置 log-slave-updates 开启。 4.如果生产环境已经存在mysql集群,且集群主库的binlog格式为mixed,mysql数据库集群的主库binlog格式可以不用改依然为 mixed,设置某一个从库binlog格式配置为 row,让Canal连接从库,这样可以避免对生产环境的mysql集群产生影响。 5.mysql支持Statement,MiXED,以及ROW三种格式的binlog为什么推荐使用row格式binlog,经过博主实际测试,使用row格式兼容性是最好的,实际可以自己测试。 六.结束语 CanalSharp的介绍到这里就结束了,如果觉得这个项目有用的欢迎大家来个 star 。后续将会写几篇文章介绍更详细的使用方法以及实战。 七.资料 CanalSharp 开源地址:https://github.com/CanalSharp/CanalSharp Canal 开源地址:https://github.com/alibaba/canal .NET Core 交流群:4656606 欢迎加群交流 如果您认为这篇文章还不错或者有所收获,您可以点击右下角的【推荐】按钮精神支持,因为这种支持是我继续写作,分享的最大动力! 作者:晓晨Master(李志强) 声明:原创博客请在转载时保留原文链接或者在文章开头加上本人博客地址,如发现错误,欢迎批评指正。凡是转载于本人的文章,不能设置打赏功能,如有特殊需求请与本人联系!https://www.cnblogs.com/stulzq/p/9702385.html
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信