铁匠 铁匠
首页
收藏
java
架构之路
常用算法
  • Java
  • nginx
  • 系统运维
  • 系统安全
  • mysql
  • redis
参考文档
关于
链接
  • 分类
  • 标签
  • 归档

专注、不予评判地关注当下
首页
收藏
java
架构之路
常用算法
  • Java
  • nginx
  • 系统运维
  • 系统安全
  • mysql
  • redis
参考文档
关于
链接
  • 分类
  • 标签
  • 归档
  • 概览

  • 设计模式

  • 高可用

  • 性能优化

  • 分布式

  • 网关

  • 流量治理

  • 数据治理

    • MySQL 异构数据同步方案 - canal 高可用部署
      • canal 是什么
        • 架构
        • 组件
        • 逻辑概念
      • 部署
        • 环境准备
        • 部署 canal-admin
        • 部署 canal-deployer
        • 配置 Instance
        • 部署 canal-adapter
        • 测试
      • 监控
      • 参考
  • 云原生

  • 网络安全

  • 架构
  • 数据治理
fengjx
2022-12-22
目录

MySQL 异构数据同步方案 - canal 高可用部署

canal 的官方文档并不是很详细,在学习过程中踩了很多坑。一边改配置一边调试代码找问题,最终是把 canal 的高可用搭建搞清楚了。整理一下给同样遇到一些问题的同学做个参考。

# canal 是什么

canal 是阿里开源的 MySQL binlog 订阅中间件。纯 java 开发,模拟 mysql slave 协议将自己伪装成一个 slave 向 master 发送 dump 请求,并将数据解析后投递给客户端,客户端可以根据需求对数据进行处理。

# 架构

# 组件

  • canal.deployer: canal-server,是伪装成 MySQL slave 的服务
  • canal.admin: canal 管理后台,在高可用环境中动态管理服务配置,启停服务
  • canal.adapter: canal 官方实现的 canal-client,内置了将数据同步到 db、es、hbase、kudo 等组件,也可以自己定制开发

# 逻辑概念

  • 集群:一组 canal-server 组成的高可用集群,主备模式,当主节点宕机后,其他节点会发起选主,选主成功后会启动服务,读取之前的 binlog 位点信息,继续向 master 发送 dump 请求。
  • server: 实际执行 binlog 订阅并解析的服务
  • instance: 配置需要订阅的 master 信息,并交给 server 执行,具体是哪个 server 执行,由 zookeeper 调度
  • client: 订阅 canal-server 解析到的数据,对数据进行处理(etl)

# 部署

实验中的部署架构如上。

canal-server 本身也实现了一个消息订阅服务(tcp 模式),但是这里还是引入了 RocketMQ,主要考虑一下几点:

  1. 专业的事给专业的组件负责,消息投递到 RocketMQ,数据更可靠
  2. canal-server 支持不同的表投递到不同的 topic(顺序消息),client 可以订阅多个 topic,提高消息处理速度,只需要保证一个表的消息顺序执行就行。

环境搭建采用当前最新版本canal-1.1.7-alpha-1(其实 release 版本问题也很多,直接用测试版本也行,有问题就 debug 调试下)。

下载地址:https://github.com/alibaba/canal/releases (opens new window)

# 环境准备

以下依赖环境自行安装

  • MySQL 源库(对应实验里 192.168.1.10)

    1. 主库开启 binlog,修改my.cnf配置
    [mysqld]
    # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    server_id=1 
    # 开启 binlog
    log-bin=mysql-bin
    # 选择 ROW 模式
    binlog-format=ROW
    
    1
    2
    3
    4
    5
    6
    7
    1. 创建具有 slave 权限的账户
    CREATE USER sync IDENTIFIED BY '1234';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'sync'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    
    1
    2
    3
    4
  • MySQL 目标库(对应实验里 192.168.1.200)

  • zookeeper

  • RocketMQ

# 部署 canal-admin

  1. 下载canal.admin-1.1.7-SNAPSHOT.tar.gz并解压,得到一下文件
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7-alpha-1/canal.admin-1.1.7-SNAPSHOT.tar.gz
mkdir admin
tar zxvf canal.admin-1.1.7-SNAPSHOT.tar.gz -C ./admin
1
2
3
├── bin
│   ├── admin.pid
│   ├── restart.sh
│   ├── startup.bat
│   ├── startup.sh
│   └── stop.sh
├── conf
│   ├── application.yml
│   ├── canal-template.properties
│   ├── canal_manager.sql
│   ├── instance-template.properties
│   ├── logback.xml
│   └── public
├── lib
└── logs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  1. 准备一个MySQL数据库,用来保存配置数据(方便测试,直接用实验里面的任意一个库也行)
mysql -h${host} -u${user} -p
# 导入初始化 SQL
> source conf/canal_manager.sql
1
2
3
  1. 修改conf/application.yml
server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  # 修改对应测试库的连接信息
  address: 192.168.1.200:3106
  database: canal_manager
  username: root
  password: 1234
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?allowPublicKeyRetrieval=true&useUnicode=true&useSSL=false&characterEncoding=UTF-8
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  adminPasswd: admin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
  1. 启动服务
sh bin/startup.sh
# 也可以添加下面的参数,开启远程 debug,调试端口 5566
sh bin/startup.sh debug 5566
1
2
3

idea 中这样配置,就可以进行 debug 调试

  1. 查看日志是否启动成功
tail -f logs/admin.log
1
  1. 浏览器打开http://192.168.1.101:8089 (opens new window),账号:admin / 123456

  1. 添加集群配置(先添加配置,否则后面 canal-server 启动会报错)

输入集群名称和zk地址,点击确认。

点击“载入模板”,然后修改这几个地方

# 数据解析后投递到 RocketMQ
canal.serverMode = rocketMQ
# zk 连接(其实我觉得不需要配,已经和集群关联了,直接读集群的链接就可以了,但是目前版本还需要配)
canal.zkServers = 192.168.1.101:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

canal.instance.tsdb.enable = true
#canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
#canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.url=jdbc:mysql://192.168.1.200:3106/canal_tsdb?useUnicode=true&characterEncoding=UTF-8&useSSL=false
canal.instance.tsdb.dbUsername = root
canal.instance.tsdb.dbPassword = 1234

#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

# RocketMQ 地址
rocketmq.namesrv.addr = 192.168.1.101:9876
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

生产环境需要开启 tsdb,关于 tsdb 的作用,可以查看官方文档:https://github.com/alibaba/canal/wiki/TableMetaTSDB (opens new window)

  1. 添加 server

后面 eployer 启动时,会根据 ip 读取配置,如果没有找到配置启动会报错,所以这里先配置了(之后可能会修复这个问题)。

java.lang.IllegalArgumentException: managerAddress:192.168.1.101:8089 can't not found config for [192.168.1.10:11110]
	at com.alibaba.otter.canal.deployer.CanalLauncher.main(CanalLauncher.java:80) ~[canal.deployer-1.1.7-SNAPSHOT.jar:na]
1
2

# 部署 canal-deployer

下载canal.deployer-1.1.7-SNAPSHOT.tar.gz并解压,得到以下文件。

wget https://github.com/alibaba/canal/releases/download/canal-1.1.7-alpha-1/canal.deployer-1.1.7-SNAPSHOT.tar.gz
mkdir deployer
tar zxvf canal.deployer-1.1.7-SNAPSHOT.tar.gz -C ./deployer
1
2
3
├── bin
│   ├── restart.sh
│   ├── startup.bat
│   ├── startup.sh
│   └── stop.sh
├── conf
│   ├── canal_local.properties
│   ├── canal.properties
│   ├── example
│   ├── logback.xml
│   ├── metrics
│   └── spring
├── lib
├── logs
└── plugin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

替换 druid jar 包版本(druid-1.2.12.jar 有 bug,这个 bug 是我调试出来的,你可以尝试一下,没有替换情况下会有什么报错)

https://github.com/alibaba/canal/issues/4491 (opens new window)

cd lib
rm druid-1.2.12.jar
wget https://repo1.maven.org/maven2/com/alibaba/druid/1.2.14/druid-1.2.14.jar
1
2
3

使用canal_local.properties替换掉canal.properties

cd conf
mv canal.properties canal.properties.orig
mv canal_local.properties canal.properties
1
2
3

修改canal.properties为如下配置

# register ip,如果是多网卡,这里可以手动指定一下 ip
# canal.register.ip = 192.168.1.200
canal.register.ip = 192.168.1.101

# canal admin 地址
canal.admin.manager = 192.168.1.101:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# 自动注册,这里设置成 false,手动在 admin 设置
canal.admin.register.auto = false
# 集群名称,这里不指定,手动在 admin 配置
#canal.admin.register.cluster =
#canal.admin.register.name =
1
2
3
4
5
6
7
8
9
10
11
12
13
14

启动服务

sh bin/startup.sh debug 5567
1

在 admin 查看是否启动成功

如果启动失败可以查看日志

tail -f logs/canal/canal.log
1

# 配置 Instance

在 admin 后台 -> Instance 管理 -> 新建 Instance -> 载入模板。然后修改以下配置。

# MySQL binlog 位点信息,即从哪个记录开始读取,可以用binlog + postion或者gtid
canal.instance.master.address=192.168.1.10:3306
# 如果想把历史数据一起读取,可以把位点配置为0
canal.instance.master.journal.name=mysql-bin.000001
canal.instance.master.position=0
canal.instance.master.timestamp=
canal.instance.master.gtid=

# 源库master用于数据同步的账号
canal.instance.dbUsername=sync
canal.instance.dbPassword=1234

# 需要同步的库
canal.instance.filter.regex=fjx.*
# 动态 tipic,这里的配置会把 t1, user 2个表的数据投递到 fjx_t1, fjx_user 2个 topic
canal.mq.dynamicTopic=fjx.t1,fjx.user
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 部署 canal-adapter

下载canal.adapter-1.1.7-SNAPSHOT.tar.gz并解压,得到一下文件

wget https://github.com/alibaba/canal/releases/download/canal-1.1.7-alpha-1/canal.adapter-1.1.7-SNAPSHOT.tar.gz
mkdir admin
tar zxvf canal.adapter-1.1.7-SNAPSHOT.tar.gz -C ./adapter
1
2
3
├── bin
│   ├── restart.sh
│   ├── startup.bat
│   ├── startup.sh
│   └── stop.sh
├── conf
│   ├── application.yml
│   ├── bootstrap.yml
│   ├── es6
│   ├── es7
│   ├── hbase
│   ├── kudu
│   ├── logback.xml
│   ├── META-INF
│   ├── rdb
│   └── tablestore
├── lib
├── logs
└── plugin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

替换 druid jar 包版本

cd lib
rm druid-1.2.12.jar
wget https://repo1.maven.org/maven2/com/alibaba/druid/1.2.14/druid-1.2.14.jar
1
2
3

实验中同步的目标库是MySQL,所以配置文件只保留rdb即可。

rm -rf conf/es6 conf/es7 conf/hbase conf/kudu conf/tablestore
1

修改以下配置

conf/application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: rocketMQ #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: -1
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 192.168.1.101:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:

  canalAdapters:
    - instance: fjx_user # canal instance Name or mq topic name
      groups:
        - groupId: group_user
          outerAdapters:
            - name: logger
            - name: rdb
              key: mysql_user
              properties:
                jdbc.driverClassName: com.mysql.jdbc.Driver
                jdbc.url: jdbc:mysql://192.168.1.200:3106/fjx?useUnicode=true&characterEncoding=UTF-8&useSSL=false
                jdbc.username: app
                jdbc.password: 1234
                druid.stat.enable: false
                druid.stat.slowSqlMillis: 1000
    - instance: fjx_t1 # canal instance Name or mq topic name
      groups:
        - groupId: group_t1
          outerAdapters:
            - name: rdb
              key: mysql_t1
              properties:
                jdbc.driverClassName: com.mysql.jdbc.Driver
                jdbc.url: jdbc:mysql://192.168.1.200:3106/fjx?useUnicode=true&characterEncoding=UTF-8&useSSL=false
                jdbc.username: app
                jdbc.password: 1234
                druid.stat.enable: false
                druid.stat.slowSqlMillis: 1000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

删除 rdb 下的配置文件(也可以先备份下)

rm conf/rdb/mytest_user.yml
1

然后添加2个配置,分别处理不同表的数据

conf/rdb/fjx_t1.yml

# 和 application.yml 中的 instance 对应
destination: fjx_t1
# 和 application.yml 中 groups 的 groupId 对应
groupId: group_t1
# 和 application.yml 中 outerAdapters 的 key 对应
outerAdapterKey: mysql_t1
concurrent: true
dbMapping:
  database: fjx
  mirrorDb: true
  commitBatch: 100 # 批量提交的大小
1
2
3
4
5
6
7
8
9
10
11

conf/rdb/fjx_user.yml

# 和 application.yml 中的 instance 对应
destination: fjx_user
groupId: group_user
outerAdapterKey: mysql_user
concurrent: true
dbMapping:
  database: fjx
  mirrorDb: true
  commitBatch: 100 # 批量提交的大小
1
2
3
4
5
6
7
8
9

以上配置,多个实例完全一致

启动服务

sh bin/startup.sh

# 查看日志,是否启动成功
tail -f logs/adapter/adapter.log
1
2
3
4

# 测试

回顾一下,上面启动了2个server和2个adapter。

同步开启前,源库和目标库的数据情况,具体表创建语句这里就不给出了,你可以自由发挥。

进入 Instance管理,点击启动

同步完成后的数据

如果启动失败,可以在这里查看日志。

# 监控

管理监控直接看官方文档就可以了,需要你对 prometheus 有一定了解,比较详细。

https://github.com/alibaba/canal/wiki/Prometheus-QuickStart (opens new window)

# 参考

  • [Canal Admin 高可用集群使用教程](Canal Admin 高可用集群使用教程)
  • 官方文档 (opens new window)
#mysql#canal
sentinel-dashboard-apollo 使用
ubuntu 安装 Kubernetes

← sentinel-dashboard-apollo 使用 ubuntu 安装 Kubernetes→

最近更新
01
策略模式
01-09
02
模板方法
01-06
03
观察者模式
01-06
更多文章>
Theme by Vdoing | Copyright © 2016-2023 铁匠 | 粤ICP备15021633号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式