Canal

1. Canal介绍

Releases · alibaba/canal

  • Canal是一个组件,canal感知到MySQL数据变动,然后解析变动数据,将变动数据发送到MQ或者同步到其他数据库,等待进一步业务逻辑处理。

2. canal工作原理

2.1 MySQL的主从复制原理

img

2.2 MySQL Binlog日志

2.2.1 介绍

  • MySQL的Binlog是MySQL最重要的日志,记录所有的增删改(DML)建表/改结构(DDL)操作。
  • MySQL默认情况下是不开启Binlog,因为记录Binlog日志需要消耗时间,官方给出的数据是有1%的性能损耗。
  • 具体开不开启,开发中需要根据实际情况做取舍。 一般来说,在下面两场景下会开启Binlog日志:
    1️⃣MySQL主从集群部署时,需要在Master端开启Binlog,方便将数据同步到Slaves中
    2️⃣数据恢复了,通过使用MySQLBinlog工具来使恢复数据

2.2.2 Binlog的分类

  • MySQL Binlog的格式有三种,分别是STATEMENT, MIXEDROW
  • 在配置文件中可以选择配置binlog_format= statement | mixed | row。
分类 介绍 优点 缺点
STATEMENT 语句级别,记录每一次执行写操作的语句,相对于ROW模式节省了空间,但是可能产生数据不一致如update set create_date=now(),由于执行时间不同产生的数据就不同 节省空间 可能造成数据不一致
ROW 行级,记录每次操作后每行记录的变化。假如一个update的sql执行结果是1万行statement只存一条,如果是row的话会把这个1万行的结果存这。 持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果 占用较大空间
MIXED 是对statement的升级,如当函数中包含UUID时,包含AUTO_INCREMENT字段的表被更新时,执行INSERTDELAYED语句时,用UDF时,会按照ROW的方式进行处理 节省空间,同时兼顾了一定的一致性 还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便
  • STATEMENT(语句级)格式:你告诉别人“去把饭炒一下”,但没告诉他加了哪些调料,不同人做出来味道不同。
  • ROW(行级)格式:你告诉别人“把这个菜炒成这个味道”,详细到每一克盐、油、火候,结果完全一样,但要讲很多很多细节。
  • MIXED(混合)格式:有时候说“去炒饭”(简单的语句),有时候说“把米、菜、调料都列出来”(复杂情况),比较灵活,但别人可能不好还原全过程。

综上对比,Canal想做监控分析,选择ROW格式比较合适

2.3 Canal工作原理

  • Canal模拟MysQL slave的交互协议,伪装自己为MySQL slave,向MySQL master发送dump协议
  • MySQL master收到dump请求,开始推送binary log给slave(即canal)
  • Canal解析Binlog对象(原始为byte流)

img

3. Canal运用场景

img

img

img点击并拖拽以移动

img

img

img

4. MySQL准备

4.1 创建数据库 创建表

1
2
3
4
5
6
7
8
CREATE DATABASE canal_demo;

CREATE TABLE `user` (
`id` bigint NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`age` int DEFAULT NULL,
PRIMARY KEY(`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

img

4.2 修改配置文件开启

img

1
2
3
4
5
6
7
8
#给mysql服务指定一个唯一的ID
server-id=1
#开启bInlog
log-bin=C:/ProgramData/MySQL/MySQL Server 8.0/binlogs/mysql-bin.log
#以数据的方式写binlog日志 :statement 是记录SQLrow是记录数据
binlog-format=ROW
#同步的数据库名
binlog-do-db=canal_demo

img

img

img

5. Canal安装与配置

5.1 下载解压

  • canal.deployer:相当于 canal 的服务端,启动它才可以在客户端接收数据库变更信息。
  • canal.adapter:增加客户端数据落地的适配及启动功能(当 deployer 接收到消息后,会根据不同的目标源做适配,比如是 es 目标源适配和 hbase 适配等等)。

img

5.2 修改canal.properties的配置

  • canal.port:默认端口11111
  • canal.serverMode:服务模式,tcp表示输入客户端,xxMQ输出到各类消息中间件
  • canal.destinations:canal能可以收集多个MySQL数据库数据,每个MySQL数据库都有独立的配置文件控制。具体配置规则:conf/目录下,使用文件夹放置,文件夹名代表一个MySQL实例。canal.destinations用于配置需要监控数据的数据库。如果是多个,使用,隔开

img

5.3 修改MySQL实例配置文件instance.properties

img

5.4 canal.deployer启动

双击D:\1Learn JAVA\canal.deployer-1.1.8\bin\startup.bat

img

系统属性里改成JDK17打开startup.bat立马闪退

img

6. ⭐️虚拟机

6.1 虚拟机中MySQL操作

6.1.1 修改/etc/my.cnf 开启binlog

然后再重启 MySQL。

img

BinLog 开启后,会看到 BinLog 日志。

img

img

6.1.2 创建MySQL账号 Cannal

账号和密码都是 Cannal,给后面 Canal 使用。

用root 123456也能成功但有弊端。所以换成了canal。

不用改cannal了,改了后面双击启动D:\1LearnJAVA\canal.adapter-1.1.8\bin\startup.bat有问题

用root,123456.一点儿问题没有

img

1
2
3
CREATE USER 'canal'@'localhost' IDENTIFIED BY 'Canal123!';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost';
FLUSH PRIVILEGES;

img

img

6.2 Canal安装解压

img

6.2.1 修改canal.properties配置端口

  • canal.port:默认端口11111
  • canal.serverMode:服务模式,tcp表示输入客户端,xxMQ输出到各类消息中间件
  • canal.destinations:canal能可以收集多个MySQL数据库数据,每个MySQL数据库都有独立的配置文件控制。具体配置规则:conf/目录下,使用文件夹放置,文件夹名代表一个MySQL实例。canal.destinations用于配置需要监控数据的数据库。如果是多个,使用,隔开

img

6.2.2 canal.deployer配置

instance.properties配置账号密码

img

img

6.2.3 canal.deployer启动

双击D:\1Learn JAVA\canal.deployer-1.1.8\bin\startup.bat

img

6.2.4 canal.adapter 配置

Step1: 先把 adapter 下面的 bootstrap.yml,全部注释掉,否则会提示你 XX 表不存在。

img

img

Step2: 再修改 adapter 的 application.yml 配置文件。

img

img

这里的坑,一般就是 mysql 的账号密码不对,或者给的 es 链接,没有”http://“前缀,这些都是我过踩的坑.

Step3: 修改我们在 application.yml 中配置的目标数据源 es7 文件夹内容。

img

由于我们需要把技术派项目中的文章查询功能,改造成 ES 的查询方式,所以我们就把技术派的文章表 article,同步到 ES 中。
yml文件配置如下:
img

1
2
3
4
5
6
7
8
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
_index: article # es 的索引名称
_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
sql: "SELECT t.id AS _id,t.id, t.user_id, t.article_type, t.title, t.short_title, t.picture, t.summary, t.category_id, t.source, t.source_url, t.offical_stat, t.topping_stat, t.cream_stat, t.`status`, t.deleted, t.create_time, t.update_time FROM article t" # sql映射
commitBatch: 1 # 提交批大小

img

Step4: 在 Kibana 中创建 ES 的 article 索引。

代码如下:

img

不要article_v2就是article。与pai_coding里的article表一致。

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
PUT /article
{
"mappings" : {
"properties" : {
"id" : {
"type" : "integer"
},
"user_id" : {
"type" : "integer"
},
"article_type" : {
"type" : "integer"
},
"title" : {
"type" : "text",
"analyzer": "ik_max_word"
},
"short_title" : {
"type" : "text",
"analyzer": "ik_max_word"
},
"picture" : {
"type" : "text",
"analyzer": "ik_max_word"
},
"summary" : {
"type" : "text",
"analyzer": "ik_max_word"
},
"category_id" : {
"type" : "integer"
},
"source" : {
"type" : "integer"
},
"source_url" : {
"type" : "text",
"analyzer": "ik_max_word"
},
"offical_stat" : {
"type" : "integer"
},
"topping_stat" : {
"type" : "integer"
},
"cream_stat" : {
"type" : "integer"
},
"status" : {
"type" : "integer"
},
"deleted" : {
"type" : "integer"
},
"create_time" : {
"type" : "date"
},
"update_time" : {
"type" : "date"
}
}
}
}

6.2.5 canal.adapter启动

双击D:\1Learn JAVA\canal.adapter-1.1.8\bin\startup.bat

img

img

6.3 数据同步实战

6.3.1 全量同步

img

win+R cmd输入:

1
curl http://127.0.0.1:8081/etl/es7/article.yml -X POST

img

上面就是执行同步成功后,提示已经导入8 条(我的数据库article表里就8条)。

然后运行kibana下列代码,查看article索引命令

1
2
3
4
5
6
7
 GET article/_search
{
"query": {
"match_all": {}
},
"size": 20
}

img

可以看到与上面执行同步命令提示10条对应上了

img

6.3.2 增量同步

img

即增量同步成功

增量数据就是当我在 MySQL 中 update、delete 和 insert 时,那么 ES 中数据也会对应发生变化,我下面演示下修改:

img

日志打印如下

img

ES查询结果如下:

img

上面结果中说明 ES 已经更改成功。

6.3.3 总结

回顾一下整体执行流程:

img

写到这里,就结束了,教你如何将 MySQL 同步到 ES,不仅是增量同步,还包括全量同步,如果你的项目也需要用到该场景,基本可以直接照搬。

7. Canal编程

7.1 Hello

1>创建IDEA项目:canal

2>导入canal相关依赖

1
2
3
4
5
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>

3>编写测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.xirui.canalhello.canal_demo;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class CanalDemo {

public static void main(String[] args) throws InvalidProtocolBufferException {
// 1. 获取 canal 连接对象
CanalConnector canalConnector = CanalConnectors.newSingleConnector(
new InetSocketAddress("localhost", 11111), "example", "", "");

while (true) {
// 2. 建立连接
canalConnector.connect();
// 3. 订阅数据库
canalConnector.subscribe("canal_demo\\..*"); // 注意这里需要使用正则 \\. 表示匹配所有表
// 4. 获取数据
Message message = canalConnector.get(100);
List<CanalEntry.Entry> entries = message.getEntries();

if (entries.size() <= 0) {
System.out.println("没有数据,休息一会...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}

for (CanalEntry.Entry entry : entries) {
String tableName = entry.getHeader().getTableName();
CanalEntry.EntryType entryType = entry.getEntryType();

if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
ByteString storeValue = entry.getStoreValue();
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
CanalEntry.EventType eventType = rowChange.getEventType();
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();

for (CanalEntry.RowData rowData : rowDatasList) {
Map<String, Object> beforeMap = new HashMap<>();
Map<String, Object> afterMap = new HashMap<>();

for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
beforeMap.put(column.getName(), column.getValue());
}

for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
afterMap.put(column.getName(), column.getValue());
}

System.out.println("表名:" + tableName + ",操作类型:" + eventType);
System.out.println("修改前:" + beforeMap);
System.out.println("修改后:" + afterMap);
}
}
}
}
}
}

img

我的程序报错 1️⃣要用JDK8,2️⃣使用的SpringBoot版本要和JDK8能用 上

img

img

img

本程序:

  • 一次性拿100个SQL语句所变动的数据放在message里。
  • 一条SQL对应一个Entry。整个message可以拿一堆SQL,所以有一堆entry
  • entry.size()<=0,没有变动,休息一秒钟再循环一次
  • 拿到entry对象遍历。

img

img

7.2 同类型技术

img

7.3 常见面试题

img


Canal
https://blog.xirui.work/posts/2040d45d.html
作者
xirui
发布于
2025年1月13日
许可协议