MySQL 异构数据同步方案 - canal 高可用部署
canal 的官方文档并不是很详细,在学习过程中踩了很多坑。一边改配置一边调试代码找问题,最终是把 canal 的高可用搭建搞清楚了。整理一下给同样遇到一些问题的同学做个参考。
# canal 是什么
canal 是阿里开源的 MySQL binlog 订阅中间件。纯 java 开发,模拟 mysql slave 协议将自己伪装成一个 slave 向 master 发送 dump 请求,并将数据解析后投递给客户端,客户端可以根据需求对数据进行处理。
# 架构
# 组件
- canal.deployer: canal-server,是伪装成 MySQL slave 的服务
- canal.admin: canal 管理后台,在高可用环境中动态管理服务配置,启停服务
- canal.adapter: canal 官方实现的 canal-client,内置了将数据同步到 db、es、hbase、kudo 等组件,也可以自己定制开发
# 逻辑概念
- 集群:一组 canal-server 组成的高可用集群,主备模式,当主节点宕机后,其他节点会发起选主,选主成功后会启动服务,读取之前的 binlog 位点信息,继续向 master 发送 dump 请求。
- server: 实际执行 binlog 订阅并解析的服务
- instance: 配置需要订阅的 master 信息,并交给 server 执行,具体是哪个 server 执行,由 zookeeper 调度
- client: 订阅 canal-server 解析到的数据,对数据进行处理(etl)
# 部署
实验中的部署架构如上。
canal-server 本身也实现了一个消息订阅服务(tcp 模式),但是这里还是引入了 RocketMQ,主要考虑一下几点:
- 专业的事给专业的组件负责,消息投递到 RocketMQ,数据更可靠
- canal-server 支持不同的表投递到不同的 topic(顺序消息),client 可以订阅多个 topic,提高消息处理速度,只需要保证一个表的消息顺序执行就行。
环境搭建采用当前最新版本canal-1.1.7-alpha-1
(其实 release 版本问题也很多,直接用测试版本也行,有问题就 debug 调试下)。
下载地址:https://github.com/alibaba/canal/releases (opens new window)
# 环境准备
以下依赖环境自行安装
MySQL 源库(对应实验里 192.168.1.10)
- 主库开启 binlog,修改
my.cnf
配置
[mysqld] # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 server_id=1 # 开启 binlog log-bin=mysql-bin # 选择 ROW 模式 binlog-format=ROW
1
2
3
4
5
6
7- 创建具有 slave 权限的账户
CREATE USER sync IDENTIFIED BY '1234'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'sync'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
1
2
3
4- 主库开启 binlog,修改
MySQL 目标库(对应实验里 192.168.1.200)
zookeeper
RocketMQ
# 部署 canal-admin
- 下载
canal.admin-1.1.7-SNAPSHOT.tar.gz
并解压,得到一下文件
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7-alpha-1/canal.admin-1.1.7-SNAPSHOT.tar.gz
mkdir admin
tar zxvf canal.admin-1.1.7-SNAPSHOT.tar.gz -C ./admin
2
3
├── bin
│ ├── admin.pid
│ ├── restart.sh
│ ├── startup.bat
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── application.yml
│ ├── canal-template.properties
│ ├── canal_manager.sql
│ ├── instance-template.properties
│ ├── logback.xml
│ └── public
├── lib
└── logs
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- 准备一个MySQL数据库,用来保存配置数据(方便测试,直接用实验里面的任意一个库也行)
mysql -h${host} -u${user} -p
# 导入初始化 SQL
> source conf/canal_manager.sql
2
3
- 修改
conf/application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
# 修改对应测试库的连接信息
address: 192.168.1.200:3106
database: canal_manager
username: root
password: 1234
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?allowPublicKeyRetrieval=true&useUnicode=true&useSSL=false&characterEncoding=UTF-8
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
- 启动服务
sh bin/startup.sh
# 也可以添加下面的参数,开启远程 debug,调试端口 5566
sh bin/startup.sh debug 5566
2
3
idea 中这样配置,就可以进行 debug 调试
- 查看日志是否启动成功
tail -f logs/admin.log
- 浏览器打开http://192.168.1.101:8089 (opens new window),账号:admin / 123456
- 添加集群配置(先添加配置,否则后面 canal-server 启动会报错)
输入集群名称和zk地址,点击确认。
点击“载入模板”,然后修改这几个地方
# 数据解析后投递到 RocketMQ
canal.serverMode = rocketMQ
# zk 连接(其实我觉得不需要配,已经和集群关联了,直接读集群的链接就可以了,但是目前版本还需要配)
canal.zkServers = 192.168.1.101:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
canal.instance.tsdb.enable = true
#canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
#canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.url=jdbc:mysql://192.168.1.200:3106/canal_tsdb?useUnicode=true&characterEncoding=UTF-8&useSSL=false
canal.instance.tsdb.dbUsername = root
canal.instance.tsdb.dbPassword = 1234
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
# RocketMQ 地址
rocketmq.namesrv.addr = 192.168.1.101:9876
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
生产环境需要开启 tsdb,关于 tsdb 的作用,可以查看官方文档:https://github.com/alibaba/canal/wiki/TableMetaTSDB (opens new window)
- 添加 server
后面 eployer 启动时,会根据 ip 读取配置,如果没有找到配置启动会报错,所以这里先配置了(之后可能会修复这个问题)。
java.lang.IllegalArgumentException: managerAddress:192.168.1.101:8089 can't not found config for [192.168.1.10:11110]
at com.alibaba.otter.canal.deployer.CanalLauncher.main(CanalLauncher.java:80) ~[canal.deployer-1.1.7-SNAPSHOT.jar:na]
2
# 部署 canal-deployer
下载canal.deployer-1.1.7-SNAPSHOT.tar.gz
并解压,得到以下文件。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7-alpha-1/canal.deployer-1.1.7-SNAPSHOT.tar.gz
mkdir deployer
tar zxvf canal.deployer-1.1.7-SNAPSHOT.tar.gz -C ./deployer
2
3
├── bin
│ ├── restart.sh
│ ├── startup.bat
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── canal_local.properties
│ ├── canal.properties
│ ├── example
│ ├── logback.xml
│ ├── metrics
│ └── spring
├── lib
├── logs
└── plugin
2
3
4
5
6
7
8
9
10
11
12
13
14
15
替换 druid jar 包版本(druid-1.2.12.jar 有 bug,这个 bug 是我调试出来的,你可以尝试一下,没有替换情况下会有什么报错)
https://github.com/alibaba/canal/issues/4491 (opens new window)
cd lib
rm druid-1.2.12.jar
wget https://repo1.maven.org/maven2/com/alibaba/druid/1.2.14/druid-1.2.14.jar
2
3
使用canal_local.properties
替换掉canal.properties
cd conf
mv canal.properties canal.properties.orig
mv canal_local.properties canal.properties
2
3
修改canal.properties
为如下配置
# register ip,如果是多网卡,这里可以手动指定一下 ip
# canal.register.ip = 192.168.1.200
canal.register.ip = 192.168.1.101
# canal admin 地址
canal.admin.manager = 192.168.1.101:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# 自动注册,这里设置成 false,手动在 admin 设置
canal.admin.register.auto = false
# 集群名称,这里不指定,手动在 admin 配置
#canal.admin.register.cluster =
#canal.admin.register.name =
2
3
4
5
6
7
8
9
10
11
12
13
14
启动服务
sh bin/startup.sh debug 5567
在 admin 查看是否启动成功
如果启动失败可以查看日志
tail -f logs/canal/canal.log
# 配置 Instance
在 admin 后台 -> Instance 管理 -> 新建 Instance
-> 载入模板
。然后修改以下配置。
# MySQL binlog 位点信息,即从哪个记录开始读取,可以用binlog + postion或者gtid
canal.instance.master.address=192.168.1.10:3306
# 如果想把历史数据一起读取,可以把位点配置为0
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=0
canal.instance.master.timestamp=
canal.instance.master.gtid=
# 源库master用于数据同步的账号
canal.instance.dbUsername=sync
canal.instance.dbPassword=1234
# 需要同步的库
canal.instance.filter.regex=fjx.*
# 动态 tipic,这里的配置会把 t1, user 2个表的数据投递到 fjx_t1, fjx_user 2个 topic
canal.mq.dynamicTopic=fjx.t1,fjx.user
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 部署 canal-adapter
下载canal.adapter-1.1.7-SNAPSHOT.tar.gz
并解压,得到一下文件
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7-alpha-1/canal.adapter-1.1.7-SNAPSHOT.tar.gz
mkdir admin
tar zxvf canal.adapter-1.1.7-SNAPSHOT.tar.gz -C ./adapter
2
3
├── bin
│ ├── restart.sh
│ ├── startup.bat
│ ├── startup.sh
│ └── stop.sh
├── conf
│ ├── application.yml
│ ├── bootstrap.yml
│ ├── es6
│ ├── es7
│ ├── hbase
│ ├── kudu
│ ├── logback.xml
│ ├── META-INF
│ ├── rdb
│ └── tablestore
├── lib
├── logs
└── plugin
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
替换 druid jar 包版本
cd lib
rm druid-1.2.12.jar
wget https://repo1.maven.org/maven2/com/alibaba/druid/1.2.14/druid-1.2.14.jar
2
3
实验中同步的目标库是MySQL,所以配置文件只保留rdb
即可。
rm -rf conf/es6 conf/es7 conf/hbase conf/kudu conf/tablestore
修改以下配置
conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: rocketMQ #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: -1
timeout:
accessKey:
secretKey:
consumerProperties:
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 192.168.1.101:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
canalAdapters:
- instance: fjx_user # canal instance Name or mq topic name
groups:
- groupId: group_user
outerAdapters:
- name: logger
- name: rdb
key: mysql_user
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://192.168.1.200:3106/fjx?useUnicode=true&characterEncoding=UTF-8&useSSL=false
jdbc.username: app
jdbc.password: 1234
druid.stat.enable: false
druid.stat.slowSqlMillis: 1000
- instance: fjx_t1 # canal instance Name or mq topic name
groups:
- groupId: group_t1
outerAdapters:
- name: rdb
key: mysql_t1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://192.168.1.200:3106/fjx?useUnicode=true&characterEncoding=UTF-8&useSSL=false
jdbc.username: app
jdbc.password: 1234
druid.stat.enable: false
druid.stat.slowSqlMillis: 1000
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
删除 rdb 下的配置文件(也可以先备份下)
rm conf/rdb/mytest_user.yml
然后添加2个配置,分别处理不同表的数据
conf/rdb/fjx_t1.yml
# 和 application.yml 中的 instance 对应
destination: fjx_t1
# 和 application.yml 中 groups 的 groupId 对应
groupId: group_t1
# 和 application.yml 中 outerAdapters 的 key 对应
outerAdapterKey: mysql_t1
concurrent: true
dbMapping:
database: fjx
mirrorDb: true
commitBatch: 100 # 批量提交的大小
2
3
4
5
6
7
8
9
10
11
conf/rdb/fjx_user.yml
# 和 application.yml 中的 instance 对应
destination: fjx_user
groupId: group_user
outerAdapterKey: mysql_user
concurrent: true
dbMapping:
database: fjx
mirrorDb: true
commitBatch: 100 # 批量提交的大小
2
3
4
5
6
7
8
9
以上配置,多个实例完全一致
启动服务
sh bin/startup.sh
# 查看日志,是否启动成功
tail -f logs/adapter/adapter.log
2
3
4
# 测试
回顾一下,上面启动了2个server和2个adapter。
同步开启前,源库和目标库的数据情况,具体表创建语句这里就不给出了,你可以自由发挥。
进入 Instance管理
,点击启动
同步完成后的数据
如果启动失败,可以在这里查看日志。
# 监控
管理监控直接看官方文档就可以了,需要你对 prometheus 有一定了解,比较详细。
https://github.com/alibaba/canal/wiki/Prometheus-QuickStart (opens new window)
# 参考
- [Canal Admin 高可用集群使用教程](Canal Admin 高可用集群使用教程)
- 官方文档 (opens new window)