网站图片设置软件,快速一体化网站建设,昆山做网站怎么做,小小影视大全在线观看免费观看思维导图文章已收录Github精选#xff0c;欢迎Star#xff1a;https://github.com/yehongzhi/learningSummary前言我们都知道一个系统最重要的是数据#xff0c;数据是保存在数据库里。但是很多时候不单止要保存在数据库中#xff0c;还要同步保存到Elastic Search、HBase、…思维导图文章已收录Github精选欢迎Starhttps://github.com/yehongzhi/learningSummary前言我们都知道一个系统最重要的是数据数据是保存在数据库里。但是很多时候不单止要保存在数据库中还要同步保存到Elastic Search、HBase、Redis等等。这时我注意到阿里开源的框架Canal他可以很方便地同步数据库的增量数据到其他的存储应用。所以在这里总结一下分享给各位读者参考~一、什么是canal我们先看官网的介绍canal译意为水道/管道/沟渠主要用途是基于 MySQL 数据库增量日志解析提供增量数据订阅和消费。这句介绍有几个关键字增量日志增量数据订阅和消费。这里我们可以简单地把canal理解为一个用来同步增量数据的一个工具。接下来我们看一张官网提供的示意图anal的工作原理就是把自己伪装成MySQL slave模拟MySQL slave的交互协议向MySQL Mater发送 dump协议MySQL mater收到canal发送过来的dump请求开始推送binary log给canal然后canal解析binary log再发送到存储目的地比如MySQLKafkaElastic Search等等。二、canal能做什么与其问canal能做什么不如说数据同步有什么作用。但是canal的数据同步不是全量的而是增量。基于binary log增量订阅和消费canal可以做数据库镜像数据库实时备份索引构建和实时维护业务cache(缓存)刷新带业务逻辑的增量数据处理三、如何搭建canal3.1 首先有一个MySQL服务器当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x我的Linux服务器安装的MySQL服务器是5.7版本。MySQL的安装这里就不演示了比较简单网上也有很多教程。然后在MySQL中需要创建一个用户并授权--使用命令登录mysql -u root -p--创建用户 用户名canal 密码Canal123456create user canal% identified by Canal123456;--授权 *.*表示所有库grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to canal% identified by Canal123456;下一步在MySQL配置文件my.cnf设置如下信息[mysqld]#打开binloglog-binmysql-bin#选择ROW(行)模式binlog-formatROW#配置MySQL replaction需要定义不要和canal的slaveId重复server_id1改了配置文件之后重启MySQL使用命令查看是否打开binlog模式查看binlog日志文件列表查看当前正在写入的binlog文件MySQL服务器这边就搞定了很简单。3.2 安装canal去官网下载页面进行下载https://github.com/alibaba/canal/releases我这里下载的是1.1.4的版本解压canal.deployer-1.1.4.tar.gz我们可以看到里面有四个文件夹接着打开配置文件conf/example/instance.properties配置信息如下## mysql serverId , v1.0.26 will autoGen## v1.0.26版本后会自动生成slaveId所以可以不用配置#canal.instance.mysql.slaveId0#数据库地址canal.instance.master.address127.0.0.1:3306#binlog日志名称canal.instance.master.journal.namemysql-bin.000001#mysql主库链接时起始的binlog偏移量canal.instance.master.position154#mysql主库链接时起始的binlog的时间戳canal.instance.master.timestampcanal.instance.master.gtid#username/password#在MySQL服务器授权的账号密码canal.instance.dbUsernamecanalcanal.instance.dbPasswordCanal123456#字符集canal.instance.connectionCharset UTF-8#enable druid Decrypt database passwordcanal.instance.enableDruidfalse#table regex .*\\..*表示监听所有表 也可以写具体的表名用隔开canal.instance.filter.regex.*\\..*#mysql 数据解析表的黑名单多个表用隔开canal.instance.filter.black.regex我这里用的是win10系统所以在bin目录下找到startup.bat启动启动就报错坑呀要修改一下启动的脚本startup.bat然后再启动脚本这就启动成功了。Java客户端操作首先引入maven依赖com.alibaba.ottercanal.client1.1.4然后创建一个canal项目使用SpringBoot构建如图所示在CannalClient类使用Spring Bean的生命周期函数afterPropertiesSet()Componentpublic class CannalClient implementsInitializingBean {private final static int BATCH_SIZE 1000;Overridepublic void afterPropertiesSet() throwsException {//创建链接CanalConnector connector CanalConnectors.newSingleConnector(new InetSocketAddress(127.0.0.1, 11111), example, , );try{//打开连接connector.connect();//订阅数据库表,全部表connector.subscribe(.*\\..*);//回滚到未进行ack的地方下次fetch的时候可以从最后一个没有ack的地方开始拿connector.rollback();while (true) {//获取指定数量的数据Message message connector.getWithoutAck(BATCH_SIZE);//获取批量IDlong batchId message.getId();//获取批量的数量int size message.getEntries().size();//如果没有数据if (batchId -1 || size 0) {try{//线程休眠2秒Thread.sleep(2000);}catch(InterruptedException e) {e.printStackTrace();}}else{//如果有数据,处理数据printEntry(message.getEntries());}//进行 batch id 的确认。确认之后小于等于此 batchId 的 Message 都会被确认。connector.ack(batchId);}}catch(Exception e) {e.printStackTrace();}finally{connector.disconnect();}}/*** 打印canal server解析binlog获得的实体类信息*/private static void printEntry(Listentrys) {for(Entry entry : entrys) {if (entry.getEntryType() EntryType.TRANSACTIONBEGIN || entry.getEntryType() EntryType.TRANSACTIONEND) {//开启/关闭事务的实体类型跳过continue;}//RowChange对象包含了一行数据变化的所有特征//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等RowChange rowChage;try{rowChageRowChange.parseFrom(entry.getStoreValue());}catch(Exception e) {throw new RuntimeException(ERROR ## parser of eromanga-event has an error , data: entry.toString(), e);}//获取操作类型insert/update/delete类型EventType eventType rowChage.getEventType();//打印Header信息System.out.println(String.format(》; binlog[%s:%s] , name[%s,%s] , eventType : %s,entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));//判断是否是DDL语句if(rowChage.getIsDdl()) {System.out.println(》;isDdl: true,sql: rowChage.getSql());}//获取RowChange对象里的每一行数据打印出来for(RowData rowData : rowChage.getRowDatasList()) {//如果是删除语句if (eventType EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());//如果是新增语句} else if (eventType EventType.INSERT) {printColumn(rowData.getAfterColumnsList());//如果是更新的语句} else{//变更前的数据System.out.println(-------; before);printColumn(rowData.getBeforeColumnsList());//变更后的数据System.out.println(-------; after);printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(Listcolumns) {for(Column column : columns) {System.out.println(column.getName() : column.getValue() update column.getUpdated());}}}以上就完成了Java客户端的代码。这里不做具体的处理仅仅是打印先有个直观的感受。最后我们开始测试首先启动MySQL、Canal Server还有刚刚写的Spring Boot项目。然后创建表CREATE TABLEtb_commodity_info (idvarchar(32) NOT NULL,commodity_namevarchar(512) DEFAULT NULL COMMENT 商品名称,commodity_pricevarchar(36) DEFAULT 0 COMMENT 商品价格,number int(10) DEFAULT 0 COMMENT 商品数量,descriptionvarchar(2048) DEFAULT COMMENT 商品描述,PRIMARY KEY(id)) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COMMENT商品信息表;然后我们在控制台就可以看到如下信息如果新增一条数据到表中INSERT INTO tb_commodity_info VALUES(3e71a81fd80711eaaed600163e046cc3,叉烧包,3.99,3,又大又香的叉烧包老人小孩都喜欢);控制台可以看到如下信息总结canal的好处在于对业务代码没有侵入因为是基于监听binlog日志去进行同步数据的。实时性也能做到准实时其实是很多企业一种比较常见的数据同步的方案。通过上面的学习之后我们应该都明白canal是什么它的原理还有用法。实际上这仅仅只是入门因为实际项目中我们不是这样玩的…实际项目我们是配置MQ模式配合RocketMQ或者Kafkacanal会把数据发送到MQ的topic中然后通过消息队列的消费者进行处理。Canal的部署也是支持集群的需要配合ZooKeeper进行集群管理。Canal还有一个简单的Web管理界面。下一篇就讲一下集群部署Canal配合使用Kafka同步数据到Redis。絮叨上面所有例子的代码都上传Github了https://github.com/yehongzhi/mall如果你觉得这篇文章对你有用就点赞关注评论吧~你的三连是我创作的最大动力~拒绝做一条咸鱼我是一个努力让大家记住的程序员。我们下期再见转载于https://blog.csdn.net/yehongzhi1994/article/details/107880162