目录
- 前言
- canal架构
- 安装配置
- 高可用架构
- 控制台
- 总结(源码bug)
前言
之前3篇文章老顾都介绍了基于binlog的一些应用场景,今天我们来分享一下Canal组件的具体的使用方法,以及集群的部署。
Canal架构
单应用
上面中我们需要启动一个Canal Server,负责伪装mysql的slave订阅binlog的;我们还需要一个Client监听Server。Canal最近的版本提供了一个client的实现----Canal Adapter。然后由Client解析后把相关数据同步的DB/ES/Redis。
上面的架构可以在开发环境进行,但不适合生产环境;因为由几个问题存在。
1)Canal Server和Client 都存在单点问题
2)如果我们流量很大,Canal Server收到的binlog量太大,导致Canal Adapter来不及处理,很有可能会把Client搞崩溃掉。
下面我们会介绍高可用的架构,我们先了解一下Canal基本用法
安装配置
环境
Canal的github的地址https://github.com/alibaba/canal/releases,目前最新版本1.1.4。
注意:如果小伙伴们需要adapter client同步的es7,需要使用1.1.5版本才行哦
老顾正在使用的是1.1.5。老顾有特殊需求
java jdk1.8需要提前安装哦。
安装canal server
下载canal.deployer-1.1.4.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
解压文件
tar -zxvf canal.deployer-1.1.4.tar.gz
进入解压后的文件夹,有5个目录
bin //可执行的脚本文件
conf //配置文件
lib //需要用到的核心jar包
logs //启动运行的日志文件
plugin //支持的适配插件
我们需要修改一些配置文件,进入conf
canal.properties
canal.id = 1 # 每个canal server实例的唯一标识,暂无实际意义
canal.ip = 192.111.112.103 # canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务
canal.port = 11111 # canal server提供socket服务的端口
canal.metrics.pull.port = 11112
canal.zkServers = 192.168.1.111:2181 #canal server链接zookeeper集群的链接信息,集群模式需要用到,单机模式可以不配置
# flush data to zk
canal.zookeeper.flush.period = 1000 #canal持久化数据到zookeeper上的更新频率,单位毫秒
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp #这个非常重要,代表是binlog的变化信息是直接用tcp方式放送给下游的client,还是先放到MQ中,暂时支持kafka、RocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false #这个也非常重要,true表示不监听queue查询命令;只监听insert、update、delete命令;false即全部监听
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = false
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = password
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
#################################################
######### destinations #############
#################################################
canal.destinations = example_01,example_02 # 当前server上部署的instance列表,instance很重要的概念,就是需要监听的表实例。
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring # 全局配置加载方式
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
######### MQ #############
##################################################
#如果监听模式canal.serverMode为 kafka、RocketMq,需要配置以下配置
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false
#canal.mq.
配置说明
上面核心的配置,需要注意的
canal.serverMode
canal.instance.filter.query.dml
canal.destinations
canal.mq.servers #如果需要同步到MQ
注意:MQ的配置,1.1.5版本会有点区别,分开了kafka和rocketmq;小伙伴们一看就知道
针对实例instance数据位点,存储配置canal.instance.global.spring.xml
- memory-instance.xml: 所有的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析
- default-instance.xml: store选择了内存模式,其余的parser/sink依赖的位点管理选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享.
- group-instance.xml: 主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问
多个destination配置
- 在canal.properties里边配置canal.destinations , 用英文逗号分隔
- 在conf路径下创建对应的路径并添加对应的instance.properties
配置多个destination, 需要在conf下创建对应的目录
mkdir conf/example_01
mkdir conf/example_02
在对应的目录下边编写配置文件instance.properties
#canal.instance.mysql.slaveId=
canal.instance.gtidon=false #这个在回环问题存在的时候开启true,可以看老顾前一篇文章
# position info
canal.instance.master.address= #数据库连接地址
canal.instance.master.journal.name= #数据库的binlog文件名
canal.instance.master.position= #数据库的binlog位点
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=false
# username/password
canal.instance.dbUsername=username
canal.instance.dbPassword=password
canal.instance.defaultDatabaseName=dbName
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
# table regex
canal.instance.filter.regex=.*\..*
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\..*,.*\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*
配置说明
- canal.instance.master.address 数据库连接地址
- canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动
- canal.instance.master.timestamp : 指定一个时间戳。如果上面的位点不配置,时间戳生效;canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动
- 如果时间戳也不配置,即不指定任何信息:默认从当前数据库的位点,进行启动
- canal.instance.dbUsername 数据库账户
- canal.instance.dbPassword 数据库密码
- canal.instance.connectionCharset 连接字符类型
- canal.instance.filter.regex
1. 所有表:.* or .*\..*
2. canal schema下所有表: canal\..*
3. canal下的以canal打头的表:canal\.canal.*
4. canal schema下的一张表:canal.test1
5. 多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔)
- canal.mq.topic 如果发送到MQ,那此实例的消息发送到哪个主题,也可以支持动态主题
根据canal.mq.dynamicTopic设置
- canal.mq.partition 指定分区队列号,默认为0;即所有消息发送到0号队列
- canal.mq.partitionsNum + canal.mq.partitionHash 如果需要分散到其他队列,可以提供消息消费的速度;即可用到这2个配置,partitionsNum队列数,partitionHash分配规则
启动
进入到路径bin下边,有几个脚本
canal.pid # 记录服务的进程ID
restart.sh # 重启服务
startup.sh # 启动脚本
stop.sh # 停止服务
运行./startup.sh就可以启动了
查看日志
- 服务启动日志(logs/canal/canal.log)
- 实例运行日志 (logs/example/example.log)
canal-adapter的安装
下载安装包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
解压
tar xzvf canal.adapter-1.1.4.tar.gz
修改配置文件
- 修改conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #这个就对应server端配置的 同步mode
zookeeperHosts: 192.111.111.173:2181
# mqServers: 127.0.0.1:9092 #or rocketmq
# flatMessage: true
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
srcDataSources: #源数据库地址,可以多个
defaultDS:
url: jdbc:mysql://192.168.1.100:3306/test?useUnicode=true
username: username
password: password
defaultDS2:
url: jdbc:mysql://192.168.1.101:3306/test?useUnicode=true
username: username
password: password
canalAdapters: #可以配置多个实例配置
- instance: example_01
groups:
- groupId: g1
outerAdapters: #输出适配器,可以配置多个
- name: logger #日志打印适配器
- name: es #es同步适配器
hosts: 192.168.1.110:9300
properties:
cluster.name: okami-application
- instance: example_02
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es
hosts: 192.168.1.111:9300
properties:
cluster.name: okami-application
在conf/es/路径下添加配置文件example_01.yml
vi conf/es/example_01.yml
dataSourceKey: defaultDS
destination: example_01
groupId: g1
esMapping:
_index: indexName
_type: typeName
_id: _id
upsert: true
# pk: id
sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
a.c_time as _c_time from user a
left join role b on b.id=a.role_id"
# objFields:
# _labels: array:;
# etlCondition: "where a.c_time>='{0}'"
commitBatch: 3000
example_02.yml
dataSourceKey: defaultDS2
destination: example_02
groupId: g1
esMapping:
_index: indexName
_type: typeName
_id: _id
upsert: true
# pk: id
sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
a.c_time as _c_time from user a
left join role b on b.id=a.role_id"
# objFields:
# _labels: array:;
# etlCondition: "where a.c_time>='{0}'"
commitBatch: 300
配置说明
一份数据可以被多个group同时消费, 多个group之间会是一个并行执行, 一个group内部是一个串行执行多个outerAdapters
启动
进入到路径bin下边,有几个脚本
adapter.pid # 记录的进程ID
restart.sh # 重启服务
startup.sh # 启动脚本
stop.sh # 停止服务
查看日志
tail -f logs/adapter/adapter.log
上面介绍了canal的server和adapter的启动应用。下面我们看看搞可用架构的思路
高可用架构
canal server的高可用方案是结合了zookeeper,主要原理是:
需求:如果有3台Canal server,同步10个实例instance的配置。
1)canal server启动后,会把相关启动信息注册到zk上面。
即可以通过zk知道现在一共有几个canal server
2)并且会把10个instance 分配给这台canal server。
3)再启动第2台canal server,又会注册到zk上面。发现线上有1台canal server订阅了10个instance。第2台会分担第1台的工作,会分配5个instance给自己,留下另外5个给第一台
即2台canal server均匀分配了instance同步binlog工作
4)再启动第3台,流程和第2台一样,继续分担instance同步工作
最后形成1台同步4个instance,1台同步3个instance,1台同步3个instance
5)如果有一台canal server挂了;通过zk的协调机制,通知另外2台canal server;分担那个挂掉server的instance工作。
这样就整体实现了canal server高可用了哦
Canal Admin控制台
我们发现配置canal server的时候,是比较麻烦的;需要配置 application配置,以及各个instance目录的配置,1台canal server配置工作还行,如果集群模式下,多台canal server都需要配置,那就比较痛苦了。
还好canal最近的版本提供了admin控制台,可以进行远程配置
这个可以在官网上面找到如何安装,还是比较简单的。小伙伴可以自行去查看
注意:如果canal server采用 admin 远程配置,那conf目录下就用使用 canal_local.perproies这个配置文件,在启动canal server时需加上参数local;才会启动远程配置
./startup.sh local
总结
今天老顾整体介绍了canal的安装以及配置,其实这些东西都比较简单;老顾只是介绍了一些核心的知识点;方便小伙伴们在使用canal的时候,可以快速理解。
其实canal最核心的就是 adapter client的配置,如何结合业务?老顾在使用了时候,为了符合公司的业务,修改了几处adapter的源码【涉及到动态es索引,主键id前缀,以及只同步每一个命令的业务】;小伙伴可以研究一下adapter源码,值得一看。谢谢!!!
注意:canal-server同步信息到kafka时,会产生异常日志exception=java.io.IOException: Connection reset by peer 但不影响同步业务;已经同步给官方issue;希望官方尽快解决
---End---
老顾的微服务网关分享课程,请大家多多支持
推荐阅读
大厂如何基于binlog解决多机房同步mysql数据(一)?
大厂如何基于binlog解决多机房同步mysql数据(二)?
基于binlog的canal组件有哪些使用场景(三)?
可用于大型应用的微服务生态灰度发布如何实现?
一线大厂级别公共Redis集群监控,细化到每个项目实例
Sharding-jdbc的实战入门之水平分表(一)
Sharding-Jdbc之水平分库和读写分离(二)
a、dubbo如何处理业务异常,这个一定要知道哦!
b、企业级SpringBoot应用多个子项目配置文件规划、多环境支持(一)
c、企业级SpringBoot应用多个子项目配置文件规划、多环境支持(二)
d、企业级SpringBoot应用多个子项目配置文件之配置中心(三)
e、利用阿里开源工具进行排查线上CPU居高问题
f、阿里二面:如何快速排查死锁?如何避免死锁?
g、微服务分布式架构中,如何实现日志链路跟踪?
h、网关如何聚合各个微服务的接口文档?
i、Kubernetes之POD、容器之间的网络通信
j、K8S中的Service的存在理由
k、企业微服务项目如何进入K8S的全过程
l、阿里开源项目Sentinel限流、降级的统一处理
m、大厂二面:Redis的分布式布隆过滤器是什么原理?
1、基于RocketMq的SpringCloud Stream框架实战入门
2、如何搭建消息中间件应用框架之SpringCloud Stream
3、面试必备:网关异常了怎么办?如何做全局异常处理?
4、Gateway网关系列(二):SpringCloud Gateway入门实战,路由规则
5、Gateway网关系列开篇:SpringCloud的官方网关Gateway介绍
6、API网关在微服务架构中的应用,这一篇就够了
7、学习Lambda表达式看这篇就够了,不会让你失望的哦(续篇)
8、Lambda用在哪里?几种场景?
9、为什么会出现Lambda表达式,你知道吗?
10、不说“分布式事务”理论,直接上大厂阿里的解决方案,绝对实用
11、女程序员问到这个问题,让我思考了半天,Mysql的“三高”架构
12、大厂二面:CAP原则为什么只能满足其中两项?而不能同时满足
13、阿里P7二面:聊聊零拷贝的原理
14、秒杀系统的核心点都在这里,快来取
15、你了解如何利用token方式实现分布式Session吗?
16、Mysql索引结构演变,为什么最终会是那个结构呢?让你一看就懂
17、一场比赛涉及到的知识,用通俗易通的方式介绍并发协调
18、企业实战Redis全方面思考,你思考了吗?
19、面试题:Thread的start和run的区别
20、面试题:什么是CAS?CAS的作用以及缺点
21、如何访问redis中的海量数据?避免事故产生
22、如何解决Redis热点问题?以及如何发现热点?
23、如何设计API接口,实现统一格式返回?
24、你真的知道在生产环境下如何部署tomcat吗?
25、分享一线互联网大厂分布式唯一ID设计 之 snowflake方案
26、分享大厂分布式唯一ID设计方案,快来围观
27、你想了解一线大厂的分布式唯一ID生成方案吗?
28、你知道如何处理大数据量吗?(数据拆分篇)
29、如何永不迁移数据和避免热点? 根据服务器指标分配数据量(揭秘篇)
30、你知道怎么分库分表吗?如何做到永不迁移数据和避免热点吗?
31、你了解大型网站的页面静态化吗?
32、你知道如何更新缓存吗?如何保证缓存和数据库双写一致性?
33、你知道怎么解决DB读写分离,导致数据不一致问题吗?
34、DB读写分离情况下,如何解决缓存和数据库不一致性问题?
35、你真的知道怎么使用缓存吗?
36、如何利用锁,防止缓存击穿?重构思想的重要性
37、海量订单产生的业务高峰期,如何避免消息的重复消费?
38、你知道如何保障生产端100%消息投递成功吗?
39、微服务下的分布式session该如何管理?
40、阿里二面:filter、interceptor、aspect应如何选择?很多人中招
41、互联网架构重要组员CDN,很多高级开发都没有实操过,来看这里
42、阿里二面:CDN缓存控制原理,看看能不能难住你
43、SpringCloud Alibaba之Nacos多环境多项目管理
44、SpringCloud Alibaba系列之Nacos配置中心玩法
45、SpringCloud Alibaba之Nacos注册中心
46、SpringCloud Plus版本之SpringCloud Alibaba
47、SpringCloud Alibaba之Nacos集群、持久化
48、SpringCloud Alibaba之Nacos共享配置、灰度配置
49、SpringCloud Alibaba之Sentinel工作原理
50、SpringCloud Alibaba之Sentinel流控管理
51、SpringCloud Alibaba之Sentinel降级管理
52、SpringCloud Alibaba之Sentinel热点参数限流
53、SpringCloud Alibaba之Sentinel的API实战