农企新闻网

gti9300要多少钱(基于binlog日志之canal企业应用及高可用原理)

发布者:马悦东
导读目录前言canal架构安装配置高可用架构控制台总结前言之前3篇文章老顾都介绍了基于binlog的一些应用场景,今天我们来分享一下Canal组件的具体的使用方法,以及集群的部署。Canal

目录

  1. 前言
  2. canal架构
  3. 安装配置
  4. 高可用架构
  5. 控制台
  6. 总结(源码bug)

前言

之前3篇文章老顾都介绍了基于binlog的一些应用场景,今天我们来分享一下Canal组件的具体的使用方法,以及集群的部署。

Canal架构

单应用

上面中我们需要启动一个Canal Server,负责伪装mysql的slave订阅binlog的;我们还需要一个Client监听Server。Canal最近的版本提供了一个client的实现----Canal Adapter。然后由Client解析后把相关数据同步的DB/ES/Redis。

上面的架构可以在开发环境进行,但不适合生产环境;因为几个问题存在。

1)Canal Server和Client 都存在单点问题

2)如果我们流量很大,Canal Server收到的binlog量太大,导致Canal Adapter来不及处理,很有可能会把Client搞崩溃掉。

下面我们会介绍高可用的架构,我们先了解一下Canal基本用法

安装配置

环境

Canal的github的地址https://github.com/alibaba/canal/releases,目前最新版本1.1.4

注意:如果小伙伴们需要adapter client同步的es7,需要使用1.1.5版本才行哦

老顾正在使用的是1.1.5。老顾有特殊需求

java jdk1.8需要提前安装哦。

安装canal server

下载canal.deployer-1.1.4.tar.gz

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz

解压文件

tar -zxvf canal.deployer-1.1.4.tar.gz

进入解压后的文件夹,有5个目录

 bin         //可执行的脚本文件
 conf        //配置文件
 lib         //需要用到的核心jar包
 logs        //启动运行的日志文件
 plugin      //支持的适配插件

我们需要修改一些配置文件,进入conf

canal.properties

canal.id = 1 # 每个canal server实例的唯一标识,暂无实际意义
canal.ip = 192.111.112.103 # canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务
canal.port = 11111 # canal server提供socket服务的端口
canal.metrics.pull.port = 11112
canal.zkServers = 192.168.1.111:2181 #canal server链接zookeeper集群的链接信息,集群模式需要用到,单机模式可以不配置


# flush data to zk
canal.zookeeper.flush.period = 1000 #canal持久化数据到zookeeper上的更新频率,单位毫秒
canal.withoutNetty = false 
# tcp, kafka, RocketMQ
canal.serverMode = tcp #这个非常重要,代表是binlog的变化信息是直接用tcp方式放送给下游的client,还是先放到MQ中,暂时支持kafka、RocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true


## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false


# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60


# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30


# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false #这个也非常重要,true表示不监听queue查询命令;只监听insert、update、delete命令;false即全部监听
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false


# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB


# binlog ddl isolation
canal.instance.get.ddl.isolation = false


# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256


# table meta tsdb info
canal.instance.tsdb.enable = false
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = password
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360


# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =


#################################################
#########               destinations            ############# 
#################################################
canal.destinations = example_01,example_02  # 当前server上部署的instance列表,instance很重要的概念,就是需要监听的表实例。
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5


#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml


canal.instance.global.mode = spring # 全局配置加载方式
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml


##################################################
#########                    MQ                      #############
##################################################
#如果监听模式canal.serverMode为 kafka、RocketMq,需要配置以下配置
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false
#canal.mq.

配置说明

上面核心的配置,需要注意的

canal.serverMode
canal.instance.filter.query.dml
canal.destinations
canal.mq.servers #如果需要同步到MQ

注意:MQ的配置,1.1.5版本会有点区别,分开了kafka和rocketmq;小伙伴们一看就知道


针对实例instance数据位点,存储配置canal.instance.global.spring.xml

  • memory-instance.xml: 所有的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析
  • default-instance.xml: store选择了内存模式,其余的parser/sink依赖的位点管理选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享.
  • group-instance.xml: 主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问


多个destination配置

  • 在canal.properties里边配置canal.destinations , 用英文逗号分隔
  • 在conf路径下创建对应的路径并添加对应的instance.properties

配置多个destination, 需要在conf下创建对应的目录

mkdir conf/example_01
mkdir conf/example_02

在对应的目录下边编写配置文件instance.properties

#canal.instance.mysql.slaveId=
canal.instance.gtidon=false  #这个在回环问题存在的时候开启true,可以看老顾前一篇文章


# position info
canal.instance.master.address=    #数据库连接地址
canal.instance.master.journal.name=  #数据库的binlog文件名
canal.instance.master.position=      #数据库的binlog位点
canal.instance.master.timestamp=
canal.instance.master.gtid=


# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=


# table meta tsdb info
canal.instance.tsdb.enable=false


# username/password
canal.instance.dbUsername=username
canal.instance.dbPassword=password
canal.instance.defaultDatabaseName=dbName
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false


# table regex
canal.instance.filter.regex=.*\..*


# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\..*,.*\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*

配置说明

  • canal.instance.master.address 数据库连接地址
  • canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动
  • canal.instance.master.timestamp : 指定一个时间戳。如果上面的位点不配置,时间戳生效;canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动
  • 如果时间戳也不配置,即不指定任何信息:默认从当前数据库的位点,进行启动
  • canal.instance.dbUsername 数据库账户
  • canal.instance.dbPassword 数据库密码
  • canal.instance.connectionCharset 连接字符类型
  • canal.instance.filter.regex
1.  所有表:.*   or  .*\..*
2.  canal schema下所有表: canal\..*
3.  canal下的以canal打头的表:canal\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔)
  • canal.mq.topic 如果发送到MQ,那此实例的消息发送到哪个主题,也可以支持动态主题

根据canal.mq.dynamicTopic设置

  • canal.mq.partition 指定分区队列号,默认为0;即所有消息发送到0号队列
  • canal.mq.partitionsNum + canal.mq.partitionHash 如果需要分散到其他队列,可以提供消息消费的速度;即可用到这2个配置,partitionsNum队列数,partitionHash分配规则

启动

进入到路径bin下边,有几个脚本

canal.pid     # 记录服务的进程ID
restart.sh    # 重启服务
startup.sh    # 启动脚本
stop.sh       # 停止服务

运行./startup.sh就可以启动了

查看日志

  • 服务启动日志(logs/canal/canal.log)
  • 实例运行日志 (logs/example/example.log)

canal-adapter的安装

下载安装包

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz

解压

tar xzvf canal.adapter-1.1.4.tar.gz

修改配置文件

  • 修改conf/application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null


canal.conf:
  mode: tcp #这个就对应server端配置的 同步mode
  zookeeperHosts: 192.111.111.173:2181
#  mqServers: 127.0.0.1:9092 #or rocketmq
#  flatMessage: true
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources: #源数据库地址,可以多个
    defaultDS:
      url: jdbc:mysql://192.168.1.100:3306/test?useUnicode=true
      username: username
      password: password
    defaultDS2:
      url: jdbc:mysql://192.168.1.101:3306/test?useUnicode=true
      username: username
      password: password
  canalAdapters: #可以配置多个实例配置
  - instance: example_01
    groups:
    - groupId: g1
      outerAdapters: #输出适配器,可以配置多个
      - name: logger  #日志打印适配器
      - name: es    #es同步适配器
        hosts: 192.168.1.110:9300
        properties:
          cluster.name: okami-application
  - instance: example_02
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es
        hosts: 192.168.1.111:9300
        properties:
          cluster.name: okami-application

在conf/es/路径下添加配置文件example_01.yml

vi conf/es/example_01.yml


dataSourceKey: defaultDS
destination: example_01
groupId: g1
esMapping:
  _index: indexName
  _type: typeName
  _id: _id
  upsert: true
#  pk: id
  sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
        a.c_time as _c_time from user a
        left join role b on b.id=a.role_id"
#  objFields:
#    _labels: array:;
#  etlCondition: "where a.c_time>='{0}'"
  commitBatch: 3000

example_02.yml

dataSourceKey: defaultDS2
destination: example_02
groupId: g1
esMapping:
  _index: indexName
  _type: typeName
  _id: _id
  upsert: true
#  pk: id
  sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
        a.c_time as _c_time from user a
        left join role b on b.id=a.role_id"




#  objFields:
#    _labels: array:;
#  etlCondition: "where a.c_time>='{0}'"
  commitBatch: 300

配置说明

一份数据可以被多个group同时消费, 多个group之间会是一个并行执行, 一个group内部是一个串行执行多个outerAdapters

启动

进入到路径bin下边,有几个脚本

adapter.pid     # 记录的进程ID
restart.sh    # 重启服务
startup.sh    # 启动脚本
stop.sh       # 停止服务

查看日志

tail -f logs/adapter/adapter.log 

上面介绍了canal的server和adapter的启动应用。下面我们看看可用架构的思路

高可用架构

基于binlog日志之canal企业应用及高可用原理(四)

canal server的高可用方案是结合了zookeeper,主要原理是:

需求:如果有3台Canal server,同步10个实例instance的配置。

1)canal server启动后,会把相关启动信息注册到zk上面。

即可以通过zk知道现在一共有几个canal server

2)并且会把10个instance 分配给这台canal server

3)再启动第2台canal server,又会注册到zk上面。发现线上有1台canal server订阅了10个instance。第2台会分担第1台的工作,会分配5个instance给自己,留下另外5个给第一台

2台canal server均匀分配了instance同步binlog工作

4)再启动第3台,流程和第2台一样,继续分担instance同步工作

最后形成1台同步4个instance,1台同步3个instance,1台同步3个instance

5)如果有一台canal server挂了;通过zk的协调机制,通知另外2台canal server;分担那个挂掉server的instance工作。

这样就整体实现了canal server高可用了哦

Canal Admin控制台

我们发现配置canal server的时候,是比较麻烦的;需要配置 application配置,以及各个instance目录的配置,1台canal server配置工作还行,如果集群模式下,多台canal server都需要配置,那就比较痛苦了。

还好canal最近的版本提供了admin控制台,可以进行远程配置

基于binlog日志之canal企业应用及高可用原理(四)

这个可以在官网上面找到如何安装,还是比较简单的。小伙伴可以自行去查看

注意:如果canal server采用 admin 远程配置,那conf目录下就用使用 canal_local.perproies这个配置文件,在启动canal server时需加上参数local;才会启动远程配置

./startup.sh local

总结

今天老顾整体介绍了canal的安装以及配置,其实这些东西都比较简单;老顾只是介绍了一些核心的知识点;方便小伙伴们在使用canal的时候,可以快速理解。

其实canal最核心的就是 adapter client的配置,如何结合业务?老顾在使用了时候,为了符合公司的业务,修改了几处adapter的源码【涉及到动态es索引,主键id前缀,以及只同步每一个命令的业务】;小伙伴可以研究一下adapter源码,值得一看。谢谢!!!

注意:canal-server同步信息到kafka时,会产生异常日志exception=java.io.IOException: Connection reset by peer 但不影响同步业务;已经同步给官方issue;希望官方尽快解决

---End---


老顾的微服务网关分享课程,请大家多多支持

推荐阅读

大厂如何基于binlog解决多机房同步mysql数据(一)?

大厂如何基于binlog解决多机房同步mysql数据(二)?

基于binlog的canal组件有哪些使用场景(三)?

可用于大型应用的微服务生态灰度发布如何实现?

一线大厂级别公共Redis集群监控,细化到每个项目实例

Sharding-jdbc的实战入门之水平分表(一)

Sharding-Jdbc之水平分库和读写分离(二)

a、dubbo如何处理业务异常,这个一定要知道哦!

b、企业级SpringBoot应用多个子项目配置文件规划、多环境支持(一)

c、企业级SpringBoot应用多个子项目配置文件规划、多环境支持(二)

d、企业级SpringBoot应用多个子项目配置文件之配置中心(三)

e、利用阿里开源工具进行排查线上CPU居高问题

f、阿里二面:如何快速排查死锁?如何避免死锁?

g、微服务分布式架构中,如何实现日志链路跟踪?

h、网关如何聚合各个微服务的接口文档?

i、Kubernetes之POD、容器之间的网络通信

j、K8S中的Service的存在理由

k、企业微服务项目如何进入K8S的全过程

l、阿里开源项目Sentinel限流、降级的统一处理

m、大厂二面:Redis的分布式布隆过滤器是什么原理?

1基于RocketMq的SpringCloud Stream框架实战入门

2、如何搭建消息中间件应用框架之SpringCloud Stream

3面试必备:网关异常了怎么办?如何做全局异常处理?

4Gateway网关系列(二):SpringCloud Gateway入门实战,路由规则

5Gateway网关系列开篇:SpringCloud的官方网关Gateway介绍

6API网关在微服务架构中的应用,这一篇就够了

7学习Lambda表达式看这篇就够了,不会让你失望的哦(续篇)

8Lambda用在哪里?几种场景?

9、为什么会出现Lambda表达式,你知道吗?

10、不说“分布式事务”理论,直接上大厂阿里的解决方案,绝对实用

11、女程序员问到这个问题,让我思考了半天,Mysql的“三高”架构

12、大厂二面:CAP原则为什么只能满足其中两项?而不能同时满足

13、阿里P7二面:聊聊零拷贝的原理

14、秒杀系统的核心点都在这里,快来取

15、你了解如何利用token方式实现分布式Session吗?

16、Mysql索引结构演变,为什么最终会是那个结构呢?让你一看就懂

17、一场比赛涉及到的知识,用通俗易通的方式介绍并发协调

18、企业实战Redis全方面思考,你思考了吗?

19、面试题:Thread的start和run的区别

20、面试题:什么是CAS?CAS的作用以及缺点

21、如何访问redis中的海量数据?避免事故产生

22、如何解决Redis热点问题?以及如何发现热点?

23、如何设计API接口,实现统一格式返回?

24、你真的知道在生产环境下如何部署tomcat吗?

25、分享一线互联网大厂分布式唯一ID设计 之 snowflake方案

26、分享大厂分布式唯一ID设计方案,快来围观

27、你想了解一线大厂的分布式唯一ID生成方案吗?

28、你知道如何处理大数据量吗?(数据拆分篇)

29、如何永不迁移数据和避免热点? 根据服务器指标分配数据量(揭秘篇)

30、你知道怎么分库分表吗?如何做到永不迁移数据和避免热点吗?

31、你了解大型网站的页面静态化吗?

32、你知道如何更新缓存吗?如何保证缓存和数据库双写一致性?

33、你知道怎么解决DB读写分离,导致数据不一致问题吗?

34、DB读写分离情况下,如何解决缓存和数据库不一致性问题?

35、你真的知道怎么使用缓存吗?

36、如何利用锁,防止缓存击穿?重构思想的重要性

37、海量订单产生的业务高峰期,如何避免消息的重复消费?

38、你知道如何保障生产端100%消息投递成功吗?

39、微服务下的分布式session该如何管理?

40、阿里二面:filter、interceptor、aspect应如何选择?很多人中招

41、互联网架构重要组员CDN,很多高级开发都没有实操过,来看这里

42、阿里二面:CDN缓存控制原理,看看能不能难住你

43、SpringCloud Alibaba之Nacos多环境多项目管理

44、SpringCloud Alibaba系列之Nacos配置中心玩法

45、SpringCloud Alibaba之Nacos注册中心

46、SpringCloud Plus版本之SpringCloud Alibaba

47、SpringCloud Alibaba之Nacos集群、持久化

48、SpringCloud Alibaba之Nacos共享配置、灰度配置

49、SpringCloud Alibaba之Sentinel工作原理

50、SpringCloud Alibaba之Sentinel流控管理

51、SpringCloud Alibaba之Sentinel降级管理

52、SpringCloud Alibaba之Sentinel热点参数限流

53、SpringCloud Alibaba之Sentinel的API实战