Canal最新1.1.4版安装部署(1)
发布日期:2021-06-30 21:31:39 浏览次数:2 分类:技术文章

本文共 13142 字,大约阅读时间需要 43 分钟。

Mysql环境配置

  • 开启binlog

  • 配置canal权限

#创建用户CREATE USER canal IDENTIFIED BY 'canal';  #创建权限GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;#刷新生效FLUSH PRIVILEGES;#查看用户列表SELECT DISTINCT CONCAT('User: ''',user,'''@''',host,''';') AS query FROM mysql.user;

 

安装

  • 版本根据情况自行调整,最新版本参考:

 

#进入安装包cd /opt#下载安装包wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz#创建安装目录mkdir canal#解压到指定目录tar zxvf canal.deployer-1.1.4.tar.gz -C ./canal

配置

实例配置:instance.properties

vim conf/example/instance.properties
################################################### mysql serverId , v1.0.26+ will autoGen# 同mysql集群配置中的serverId,mysql的server_id参数canal.instance.mysql.slaveId=1000# 开启gtid,生成同步数据全局id,防止主从不一致canal.instance.gtidon=false# binlog的位置信息# mysql连接地址canal.instance.master.address=127.0.0.1:3306#mysql起始的binlog文件canal.instance.master.journal.name=#mysql起始的binlog偏移量canal.instance.master.position=#mysql起始的binlog时间戳canal.instance.master.timestamp=#ysql起始的binlog的gtidcanal.instance.master.gtid=# 阿里云rds的sso配置canal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=# 开启tsdb功能,记录table mate变动canal.instance.tsdb.enable=true# tsdb数据存储在位置#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal# 备用数据库,当master数据库检查失败后,切换到该节点继续消费#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=# mysql连接用户名和密码canal.instance.dbUsername=canalcanal.instance.dbPassword=canalcanal.instance.connectionCharset = UTF-8# 开启druid数据库密码加密canal.instance.enableDruid=false# 加密公钥#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# 匹配table表达式,需要处理的表canal.instance.filter.regex=.*\\..*# 匹配过滤table表达式,不需要处理的表canal.instance.filter.black.regex=# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq消息配置# mq topiccanal.mq.topic=example# 动态topic配置,topic为表名#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*# mq分区canal.mq.partition=0# hash分区数量#canal.mq.partitionsNum=3# hash分区主键#canal.mq.partitionHash=test.table:id^name,.*\\..*#################################################

sever配置:canal.properties

vim conf/canal.properties
########################################################## 		common argument		############################################################### 绑定tcp连接的本地ip,需要配置本机局域网ipcanal.ip = 127.0.0.1# 项zookeeper注册的本地地址canal.register.ip = 192.168.11.11# 本地连接端口canal.port = 11111# 监控数据拉取端口canal.metrics.pull.port = 11112# canal instance user/passwd# canal.user = canal# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal 控制台配置 admin config#canal.admin.manager = 127.0.0.1:8089canal.admin.port = 11110canal.admin.user = admincanal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441#持久化到zk配置canal.zkServers =# 数据刷新到zk的频率,mscanal.zookeeper.flush.period = 1000# 关闭nettycanal.withoutNetty = false# server模式 tcp, kafka, RocketMQcanal.serverMode = tcp# tsdb信息刷新flush meta cursor/parse position to file# 文件位置canal.file.data.dir = ${canal.conf.dir}# 刷新间隔canal.file.flush.period = 1000# 缓存的数据最大条数,必须为2的倍数canal.instance.memory.buffer.size = 16384## 缓存限制# 缓存快的大小,默认1024b,总内存等条数*大小canal.instance.memory.buffer.memunit = 1024# 缓存限制模式,MEMSIZE-显示缓存大小,ITEMSIZE限制记录数canal.instance.memory.batch.mode = MEMSIZE# 针对entry是否开启raw模式canal.instance.memory.rawEntry = true## 心跳检查# 是否开启心跳检查canal.instance.detecting.enable = false# 心跳sql#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()canal.instance.detecting.sql = select 1# 心跳频率canal.instance.detecting.interval.time = 3# 失败重试次数canal.instance.detecting.retry.threshold = 3# 是否开启失败切换mysql,需要配置standby数据库canal.instance.detecting.heartbeatHaEnable = false# 并发处理事务数canal.instance.transaction.size =  1024# mysql主备切换时,binlog监控需要回退的时间,防止切换导致的数据不同步canal.instance.fallbackIntervalInSeconds = 60## 网络配置# 数据发送缓冲区,字节canal.instance.network.receiveBufferSize = 16384# 数据接受缓冲区,字节canal.instance.network.sendBufferSize = 16384# 获取数据的超时时间,秒canal.instance.network.soTimeout = 30# binlog 过滤配置# 是否使用druid解析ddlcanal.instance.filter.druid.ddl = true# 是否忽略dcl语句canal.instance.filter.query.dcl = false# 是否忽略dml语句canal.instance.filter.query.dml = false# 是否忽略ddl语句canal.instance.filter.query.ddl = false# 是否忽略table异常,用于排查table异常情况canal.instance.filter.table.error = false# 是否忽略dml的数据变动,如update/insert/update操作canal.instance.filter.rows = false# 忽略数据库事务的相关事件,如在写入kafka时,忽略TransactionBegin/Transactionend事件,canal.instance.filter.transaction.entry = false# 支持的binlog文件格式canal.instance.binlog.format = ROW,STATEMENT,MIXED# 支持的binlog记录格式canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# ddl语句是否单独处理,防止语句内有无序并发处理导致数据不一致。canal.instance.get.ddl.isolation = false# 并行处理配置# 是否开启并行处理binlogcanal.instance.parser.parallel = true## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()# 并发线程数,理论上不要超过可用处理器数,默认Runtime.getRuntime().availableProcessors()。#canal.instance.parser.parallelThreadSize = 16## 并行处理的缓冲区大小, 必须2的幂次方canal.instance.parser.parallelBufferSize = 256# 是否开启table mate 的tsdb功能canal.instance.tsdb.enable = true# 存储修改table mate的记录文件,默认使用h2数据库canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}# h2数据相关配置canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;canal.instance.tsdb.dbUsername = canalcanal.instance.tsdb.dbPassword = canal# 快照存储间隔,小时canal.instance.tsdb.snapshot.interval = 24# 快照过期时间,小时canal.instance.tsdb.snapshot.expire = 360# 阿里云的访问秘钥, 支持阿里云的rds和mqcanal.aliyun.accessKey =canal.aliyun.secretKey =########################################################## 		destinations		############################################################### 当前server中的实例列表canal.destinations = example# canal配置文件目录canal.conf.dir = ../conf# 是否开启自动扫描,添加启动和删除停止实例canal.auto.scan = true# 自动扫描间隔时间,秒canal.auto.scan.interval = 5# tsdb配置路径,在canal.conf.dir路径下canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml# 全局配置加载方式canal.instance.global.mode = spring# 是否开启lazy懒加载canal.instance.global.lazy = false# 管理配置的加载地址canal.instance.global.manager.address = ${canal.admin.manager}# 全局配置文件 file单机模式,default集群模式#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml#canal.instance.global.spring.xml = classpath:spring/default-instance.xml########################################################### 		     MQ 		     ################################################################ MQ地址canal.mq.servers = 127.0.0.1:6667# 链接重试次数canal.mq.retries = 0# 每次发送消息的数据包大小,bytecanal.mq.batchSize = 16384# 最大请求大小,bytecanal.mq.maxRequestSize = 1048576# 每次发送消息的间隔时间,mscanal.mq.lingerMs = 100# 消息缓存大小canal.mq.bufferMemory = 33554432# canal消息体最大值canal.mq.canalBatchSize = 50# 获取消息超时时间,mscanal.mq.canalGetTimeout = 100# 是否为json格式消息canal.mq.flatMessage = true# 数据压缩canal.mq.compressionType = none# 消息状态canal.mq.acks = all# 自定义mq属性#canal.mq.properties. =# 消息组canal.mq.producerGroup = test# 消息跟踪, cloud-可在阿里云中查看canal.mq.accessChannel = local# 阿里云 namespace#canal.mq.namespace =###########################################################     Kafka Kerberos Info    ################################################################kafka的kerberos 认证,开启需要配置2个认证文件canal.mq.kafka.kerberos.enable = falsecanal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

Admin UI配置:canal_local.properties

vim conf/canal_local.properties
# canal server ipcanal.register.ip = 127.0.0.1# Admin UI 配置canal.admin.manager = 127.0.0.1:8089canal.admin.port = 11110canal.admin.user = admin# 暗文密码,可以通过select PASSWORD('admin')获取重置。canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441# 主动注册canal.admin.register.auto = true# 注册集群的名称canal.admin.register.cluster =

 

启动

  • 启动canal
sh bin/startup.sh

  • 查看启动日志
tailf logs/canal/canal.log

  • 关闭
sh bin/stop.sh

测试

  • 链接测试,使用telnet链接端口,测试服务端口是否可以链接

  • 使用测试类进行测试
/** * @description: todo * @author: lizz * @date: 2021/1/9 13:36 */import java.net.InetSocketAddress;import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.protocol.Message;import com.alibaba.otter.canal.protocol.CanalEntry.Column;import com.alibaba.otter.canal.protocol.CanalEntry.Entry;import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class CanalClientSampleTest {    public static void main(String args[]) {        // 创建链接,配置链接参数        CanalConnector connector = CanalConnectors.newSingleConnector(                new InetSocketAddress("172.x.x.x",11111),                "example", "", "");        int batchSize = 1000;        int emptyCount = 0;        try {            connector.connect();            connector.subscribe(".*\\..*");            connector.rollback();            int totalEmptyCount = 120;            while (emptyCount < totalEmptyCount) {                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据                long batchId = message.getId();                int size = message.getEntries().size();                if (batchId == -1 || size == 0) {                    emptyCount++;                    System.out.println("empty count : " + emptyCount);                    try {                        Thread.sleep(1000);                    } catch (InterruptedException e) {                    }                } else {                    emptyCount = 0;                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);                    printEntry(message.getEntries());                }                connector.ack(batchId); // 提交确认                // connector.rollback(batchId); // 处理失败, 回滚数据            }            System.out.println("empty too many times, exit");        } finally {            connector.disconnect();        }    }    private static void printEntry(List
entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List
columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } }}

运行main启动后,修改mysql中数据,即可获取到修改内容。

empty count : 1empty count : 2empty count : 3empty count : 4empty count : 5#修改mysql中数据后触发输出================> binlog[mysql-bin.000001:3258] , name[test,t_user] , eventType : UPDATE-------> beforeid : 5    update=falseusername : 嘿嘿    update=falsesex : 1    update=falsephone : 138815828282    update=falsecreatetime : 2021-01-11 14:59:14    update=false-------> afterid : 5    update=falseusername : 嘿嘿1    update=truesex : 1    update=falsephone : 138815828282    update=falsecreatetime : 2021-01-11 15:06:11    update=trueempty count : 1empty count : 2empty count : 3

 

转载地址:https://lizz6.blog.csdn.net/article/details/112369812 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:Mac如何查看系统根目录
下一篇:maven中pom依赖相同jar包优先顺序加载版本

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2024年04月12日 08时41分22秒