pom.xml 添加 canal.client 依赖(1.1.5 改动很大,这儿客户端用 1.1.4)
<?xml version="1.0" encoding="utf-8"?><project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelversion>4.0.0</modelversion> <parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version>2.2.2.release</version> <relativepath/> <!-- lookup parent from repository --> </parent> <groupid>top.yueshushu</groupid> <artifactid>learn</artifactid> <version>1.0-snapshot</version> <name>canal</name> <description>学习 canal</description> <properties> <java.version>1.8</java.version> <project.build.sourceencoding>utf-8</project.build.sourceencoding> <project.reporting.outputencoding>utf-8</project.reporting.outputencoding> </properties> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> <dependency> <groupid>org.projectlombok</groupid> <artifactid>lombok</artifactid> </dependency> <!-- 导入配置文件处理器,配置文件进行绑定就会有提示,需要重启 --> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-configuration-processor</artifactid> <optional>true</optional> </dependency> <!--导入自动热步署的依赖--> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-devtools</artifactid> <optional>true</optional> </dependency> <!--引入mysql的驱动--> <dependency> <groupid>mysql</groupid> <artifactid>mysql-connector-java</artifactid> </dependency> <!--引入springboot与mybatis整合的依赖--> <dependency> <groupid>org.mybatis.spring.boot</groupid> <artifactid>mybatis-spring-boot-starter</artifactid> <version>2.1.4</version> </dependency> <!-- 引入pagehelper分页插件 --> <dependency> <groupid>com.github.pagehelper</groupid> <artifactid>pagehelper-spring-boot-starter</artifactid> <version>1.2.5</version> </dependency> <!--添加 druid-spring-boot-starter的依赖的依赖--> <dependency> <groupid>com.alibaba</groupid> <artifactid>druid-spring-boot-starter</artifactid> <version>1.1.14</version> </dependency> <!--springboot 的aop 模块--> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-aop</artifactid> </dependency> <!--添加canal的依赖. 重要. 使用 1.1.4--> <dependency> <groupid>com.alibaba.otter</groupid> <artifactid>canal.client</artifactid> <version>1.1.4</version> </dependency> <dependency> <groupid>joda-time</groupid> <artifactid>joda-time</artifactid> <version>2.9.4</version> </dependency> </dependencies> <build> <!--将该目录下的文件全部打包成类的路径--> <resources> <resource> <directory>src/main/resources</directory> </resource> </resources> <plugins> <plugin> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-maven-plugin</artifactid> </plugin> </plugins> </build></project>
业务功能处理简单连接程序/** * 一个简单的canal 的连接测试程序 */ @test public void connectiontest() { //1. 创建连接 填充对应的地址信息 ,要监控的实例和相应的用户名和密码 canalconnector canalconnector = canalconnectors.newsingleconnector( new inetsocketaddress( "127.0.0.1", 11111 ), "example", "canal", "canal" ); //2. 进行连接 canalconnector.connect(); log.info(">>>连接成功:{}", canalconnector); }
17:26:32.179 [main] info top.yueshushu.learn.canaldemotest - >>>连接成功:com.alibaba.otter.canal.client.impl.simplecanalconnector@31ef45e3
单次获取数据/** * 获取数据信息. 可以发现,未获取到数据 . 这个应该是实时的. */ @test public void getdatatest() { //1. 创建连接 canalconnector canalconnector = canalconnectors.newsingleconnector( new inetsocketaddress("127.0.0.1", 11111), "example", "canal", "canal" ); // 进行连接 canalconnector.connect(); //3. 注册,看使用哪个数据库表 canalconnector.subscribe("springboot.user"); //4. 获取 1条数据 message message = canalconnector.get(1); log.info("获取的数据:id:{},数据:{}", message.getid(), message); if (message.getid() == -1) { log.info(">>>未获取到数据"); return; } //5. 获取相应的数据集合 list<canalentry.entry> entries = message.getentries(); for (canalentry.entry entry : entries) { log.info(">>>获取数据 {}", entry); //获取表名 canalentry.header header = entry.getheader(); log.info(">>>获取表名:{}", header.gettablename()); canalentry.entrytype entrytype = entry.getentrytype(); log.info(">>获取类型 {}:,对应的信息:{}", entrytype.getnumber(), entrytype.name()); //获取数据 bytestring storevalue = entry.getstorevalue(); log.info(">>>输出存储的值:{}", storevalue); } }
在主库里面插入一条数据
insert into springboot.user(id,name,age,sex,description) values(1,'canal添加用户',24,'男','学习canal');
再次执行:
循环获取数据/** * 获取数据信息. 获取现在的数据. 再次执行时,就没有这个数据了. */ @test public void getnowdatatest() { //1. 创建连接 canalconnector canalconnector = canalconnectors.newsingleconnector( new inetsocketaddress("127.0.0.1", 11111), "example", "canal", "canal" ); // 进行连接 canalconnector.connect(); //3. 注册,看使用哪个数据库表 canalconnector.subscribe("springboot.user"); for (;;) { //4. 获取 1条数据 message message = canalconnector.get(1); log.info("获取的数据:id:{},数据:{}", message.getid(), message); if (message.getid() == -1) { log.info(">>>未获取到数据"); try { timeunit.milliseconds.sleep(1000); } catch (interruptedexception e) { e.printstacktrace(); } continue; } //5. 获取相应的数据集合 list<canalentry.entry> entries = message.getentries(); for (canalentry.entry entry : entries) { log.info(">>>获取数据 {}", entry); //获取表名 canalentry.header header = entry.getheader(); log.info(">>>获取表名:{}", header.gettablename()); canalentry.entrytype entrytype = entry.getentrytype(); log.info(">>获取类型 {}:,对应的信息:{}", entrytype.getnumber(), entrytype.name()); //获取数据 bytestring storevalue = entry.getstorevalue(); log.info(">>>输出存储的值:{}", storevalue); } } }
可以随时获取相应的数据变更信息。
会发现, storevalue 的值是很难解读的。 需要将这个数据解析出来。
解析 storevalue 值/** * 将 storevalue 进行解析,解析成我们能看懂的语句. * 对数据库 cud 进行处理操作观看一下. * 发现,点是不好的,也有多余的记录信息. * * @throws exception 异常 */ @test public void convertdatatest() throws exception { //1. 创建连接 canalconnector canalconnector = canalconnectors.newsingleconnector( new inetsocketaddress("127.0.0.1", 11111), "example", "canal", "canal" ); //2. 进行连接 canalconnector.connect(); canalconnector.subscribe("springboot.user"); for (;;) { //获取信息 message message = canalconnector.get(1); if (message.getid() == -1l) { // log.info("未获取到数据"); try { timeunit.milliseconds.sleep(100); } catch (interruptedexception e) { e.printstacktrace(); } continue; } list<canalentry.entry> entrylist = message.getentries(); //对获取到的数据进行处理 log.info(">>获取到{}条数据", entrylist.size()); for (canalentry.entry entry : entrylist) { canalentry.header header = entry.getheader(); log.info(">>>获取表名:{}", header.gettablename()); //获取类型. canalentry.entrytype entrytype = entry.getentrytype(); log.info(">>类型编号 {},类型名称:{}", entrytype.getnumber(), entrytype.name()); //获取存入日志的值 bytestring storevalue = entry.getstorevalue(); //将这个值进行解析 canalentry.rowchange rowchange = rowchange.parsefrom(storevalue); string sql = rowchange.getsql(); log.info(">>>获取对应的sql:{}", sql); // 这个sql 可能是 批量的sql语句 list<canalentry.rowdata> rowdataslist = rowchange.getrowdataslist(); for (canalentry.rowdata rowdata : rowdataslist) { log.info(">>>获取信息:{}", rowdata); //对数据进行处理 list<canalentry.column> beforecolumnslist = rowdata.getbeforecolumnslist(); list<canalentry.column> aftercolumnslist = rowdata.getaftercolumnslist(); beforecolumnslist.foreach( n -> log.info("哪个列{},原先是{},是否被更新{}", n.getname(), n.getvalue(), n.getupdated()) ); aftercolumnslist.foreach( n -> log.info("哪个列{},后来是{},是否被更新{}", n.getname(), n.getvalue(), n.getupdated()) ); } } } }
再次执行sql
insert into springboot.user(id,name,age,sex,description) values(2,'canal添加用户2',25,'男','学习canal2');
不同的类型进行不同的处理发现 其他类型的 如: transactionbegin 也进行了处理
/** * 类型转换数据 * * @throws exception 异常 */ @test public void datatypetest() throws exception { canalconnector canalconnector = canalconnectors.newsingleconnector( new inetsocketaddress( "127.0.0.1", 11111 ), "example", "canal", "canal" ); canalconnector.connect(); canalconnector.subscribe("springboot.user"); for(;;){ message message = canalconnector.get(1); if (message.getid() == -1) { timeunit.seconds.sleep(1); continue; } list<canalentry.entry> entries = message.getentries(); for (canalentry.entry entry : entries) { canalentry.entrytype entrytype = entry.getentrytype(); //只要 rowdata 数据类型的 if (!canalentry.entrytype.rowdata.equals(entrytype)) { continue; } string tablename = entry.getheader().gettablename(); log.info(">>>对表 {} 进行操作", tablename); bytestring storevalue = entry.getstorevalue(); rowchange rowchange = rowchange.parsefrom(storevalue); //行改变 canalentry.eventtype eventtype = rowchange.geteventtype(); switch (eventtype) { case insert: { inserthandler(rowchange); break; } case update: { updatehandler(rowchange); break; } case delete: { deletehandler(rowchange); break; } default: { break; } } } } } private void deletehandler(rowchange rowchange) { log.info(">>>>执行删除的方法"); list<canalentry.rowdata> rowdataslist = rowchange.getrowdataslist(); for (canalentry.rowdata rowdata : rowdataslist) { list<canalentry.column> beforecolumnslist = rowdata.getbeforecolumnslist(); for (canalentry.column column : beforecolumnslist) { log.info(">>>>>字段 {} 删除数据 {}", column.getname(), column.getvalue()); } } } private void updatehandler(rowchange rowchange) { log.info(">>>执行更新的方法"); list<canalentry.rowdata> rowdataslist = rowchange.getrowdataslist(); for (canalentry.rowdata rowdata : rowdataslist) { list<canalentry.column> beforecolumnslist = rowdata.getbeforecolumnslist(); list<canalentry.column> aftercolumnslist = rowdata.getaftercolumnslist(); map<string, string> beforevaluemap = beforecolumnslist.stream().collect( collectors.tomap( canalentry.column::getname, canalentry.column::getvalue ) ); map<string, string> aftervaluemap = aftercolumnslist.stream().collect( collectors.tomap( canalentry.column::getname, canalentry.column::getvalue ) ); beforevaluemap.foreach((column, beforevalue) -> { string aftervalue = aftervaluemap.get(column); boolean update = beforevalue.equals(aftervalue); log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforevalue, aftervalue, update); }); } } /** * 插入数据. 只有后的数据. * * @param rowchange 行改变 */ private void inserthandler(rowchange rowchange) { log.info(">>>执行添加 的方法"); list<canalentry.rowdata> rowdataslist = rowchange.getrowdataslist(); for (canalentry.rowdata rowdata : rowdataslist) { list<canalentry.column> aftercolumnslist = rowdata.getaftercolumnslist(); for (canalentry.column column : aftercolumnslist) { if (!stringutils.hastext(column.getvalue())) { continue; } log.info("字段 {} 插入了数据 {}", column.getname(), column.getvalue()); } } }
插入,更新,删除,分别进行了处理.
先启动测试程序:
不打印任何信息。
主表执行添加语句:
insert into springboot.user(id,name,age,sex,description) values(4,'canal添加用户4',25,'男','学习canal4');
会打印信息:
这个可读性就非常高了.
主表执行修改的操作.
update springboot.user set name='开开心心',age=26,description='岳泽霖' where id =4;
更新时,若每一个字段都跟原先一样,不会产生日志消费。
主表执行删除的操作:
delete from springboot.user where id =4;
上面的获取,都是一条数据一条数据获取的。效率比较低
一次性获取多条数据/** * 一次性获取多条数据。 * sql 执行多条。 */ @test public void datamoretest() throws exception { //1. 创建 canal连接对象 canalconnector canalconnector = canalconnectors.newsingleconnector( new inetsocketaddress( "127.0.0.1", 11111 ), "example", "canal", "canal" ); canalconnector.connect(); // 订阅哪个对象 canalconnector.subscribe("springboot.user"); for (; ; ) { // message message = canalconnector.get(3, 5l, timeunit.seconds); message message = canalconnector.get(3); if (message.getid() == -1) { // 未获取到数据 continue; } list<canalentry.entry> entries = message.getentries(); for (canalentry.entry entry : entries) { canalentry.entrytype entrytype = entry.getentrytype(); if (!canalentry.entrytype.rowdata.equals(entrytype)) { continue; } string tablename = entry.getheader().gettablename(); log.info(">>>>对表{} 执行操作", tablename); canalentry.rowchange rowchange = canalentry.rowchange.parsefrom(entry.getstorevalue()); //对类型进行处理 canalentry.eventtype eventtype = rowchange.geteventtype(); switch (eventtype) { case insert: { inserthandler(rowchange); break; } case update: { updatehandler(rowchange); break; } case delete: { deletehandler(rowchange); break; } default: { break; } } } } } private void deletehandler(canalentry.rowchange rowchange) { log.info(">>>>执行删除的方法"); list<canalentry.rowdata> rowdataslist = rowchange.getrowdataslist(); for (canalentry.rowdata rowdata : rowdataslist) { list<canalentry.column> beforecolumnslist = rowdata.getbeforecolumnslist(); for (canalentry.column column : beforecolumnslist) { log.info(">>>>>字段 {} 删除数据 {}", column.getname(), column.getvalue()); } } } private void updatehandler(canalentry.rowchange rowchange) { log.info(">>>执行更新的方法"); list<canalentry.rowdata> rowdataslist = rowchange.getrowdataslist(); for (canalentry.rowdata rowdata : rowdataslist) { list<canalentry.column> beforecolumnslist = rowdata.getbeforecolumnslist(); list<canalentry.column> aftercolumnslist = rowdata.getaftercolumnslist(); map<string, string> beforevaluemap = beforecolumnslist.stream().collect( collectors.tomap( canalentry.column::getname, canalentry.column::getvalue ) ); map<string, string> aftervaluemap = aftercolumnslist.stream().collect( collectors.tomap( canalentry.column::getname, canalentry.column::getvalue ) ); beforevaluemap.foreach((column, beforevalue) -> { string aftervalue = aftervaluemap.get(column); boolean update = beforevalue.equals(aftervalue); log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforevalue, aftervalue, update); }); } } /** * 插入数据. 只有后的数据. * * @param rowchange 行改变 */ private void inserthandler(canalentry.rowchange rowchange) { log.info(">>>执行添加 的方法"); list<canalentry.rowdata> rowdataslist = rowchange.getrowdataslist(); for (canalentry.rowdata rowdata : rowdataslist) { list<canalentry.column> aftercolumnslist = rowdata.getaftercolumnslist(); for (canalentry.column column : aftercolumnslist) { if (!stringutils.hastext(column.getvalue())) { continue; } log.info("字段 {} 插入了数据 {}", column.getname(), column.getvalue()); } } }
修改点:
// message message = canalconnector.get(3, 5l, timeunit.seconds); message message = canalconnector.get(3);
.get(3) 表示 一次性获取3条记录.
canalconnector.get(3, 5l, timeunit.seconds); 表示5秒之内获取3条记录,
有两个触发条件,一个是获取了3条,一个是到了5秒。
效果展示信息与之前是一致的,就不重新演示了。
ack 配置信息/** * 一次性获取多条数据。 * sql 执行多条。 */ @test public void datamoretest() throws exception { //1. 创建 canal连接对象 canalconnector canalconnector = canalconnectors.newsingleconnector( new inetsocketaddress( "127.0.0.1", 11111 ), "example", "canal", "canal" ); canalconnector.connect(); // 订阅哪个对象 canalconnector.subscribe("springboot.user"); for (; ; ) { message message = canalconnector.getwithoutack(3, 2l, timeunit.seconds); if (message.getid() == -1) { // 未获取到数据 timeunit.milliseconds.sleep(500); continue; } log.info(">>>>获取对应的 id: {}",message.getid()); list<canalentry.entry> entries = message.getentries(); for (canalentry.entry entry : entries) { canalentry.entrytype entrytype = entry.getentrytype(); if (!canalentry.entrytype.rowdata.equals(entrytype)) { continue; } string tablename = entry.getheader().gettablename(); log.info(">>>>对表{} 执行操作", tablename); canalentry.rowchange rowchange = canalentry.rowchange.parsefrom(entry.getstorevalue()); //对类型进行处理 canalentry.eventtype eventtype = rowchange.geteventtype(); switch (eventtype) { case insert: { inserthandler(rowchange); break; } case update: { updatehandler(rowchange); break; } case delete: { deletehandler(rowchange); break; } default: { break; } } } //进行回滚 // canalconnector.rollback(); //确认ack 配置 canalconnector.ack(message.getid()); } } private void deletehandler(canalentry.rowchange rowchange) { log.info(">>>>执行删除的方法"); list<canalentry.rowdata> rowdataslist = rowchange.getrowdataslist(); for (canalentry.rowdata rowdata : rowdataslist) { list<canalentry.column> beforecolumnslist = rowdata.getbeforecolumnslist(); for (canalentry.column column : beforecolumnslist) { log.info(">>>>>字段 {} 删除数据 {}", column.getname(), column.getvalue()); } } } private void updatehandler(canalentry.rowchange rowchange) { log.info(">>>执行更新的方法"); list<canalentry.rowdata> rowdataslist = rowchange.getrowdataslist(); for (canalentry.rowdata rowdata : rowdataslist) { list<canalentry.column> beforecolumnslist = rowdata.getbeforecolumnslist(); list<canalentry.column> aftercolumnslist = rowdata.getaftercolumnslist(); map<string, string> beforevaluemap = beforecolumnslist.stream().collect( collectors.tomap( canalentry.column::getname, canalentry.column::getvalue ) ); map<string, string> aftervaluemap = aftercolumnslist.stream().collect( collectors.tomap( canalentry.column::getname, canalentry.column::getvalue ) ); beforevaluemap.foreach((column, beforevalue) -> { string aftervalue = aftervaluemap.get(column); boolean update = beforevalue.equals(aftervalue); log.info("修改列:{},修改前的值:{},修改后的值:{},是否更新:{}", column, beforevalue, aftervalue, update); }); } } /** * 插入数据. 只有后的数据. * * @param rowchange 行改变 */ private void inserthandler(canalentry.rowchange rowchange) { log.info(">>>执行添加 的方法"); list<canalentry.rowdata> rowdataslist = rowchange.getrowdataslist(); for (canalentry.rowdata rowdata : rowdataslist) { list<canalentry.column> aftercolumnslist = rowdata.getaftercolumnslist(); for (canalentry.column column : aftercolumnslist) { if (!stringutils.hastext(column.getvalue())) { continue; } log.info("字段 {} 插入了数据 {}", column.getname(), column.getvalue()); } } }
主要信息:
message message = canalconnector.getwithoutack(3, 2l, timeunit.seconds);
//进行回滚 // canalconnector.rollback();
//确认ack 配置canalconnector.ack(message.getid());
手动确认消息消费了.
当消息 rollback() 回滚后,会再次消费这条消息.
canalconnector.rollback();
执行语句:
insert into springboot.user(id,name,age,sex,description) values(5,'canal添加用户5',25,'男','学习canal5');
如果变成 手动确认,
canalconnector.ack(message.getid());
则只消费一次.
以上就是springboot怎么整合canal方法的详细内容。