Mac Docker环境,利用Canal实现MySQL同步ES
suiw9 2024-12-01 04:02 31 浏览 0 评论
Canal的使用
使用docker环境安装mysql、canal、elasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中,并在springboot进行整合。
- 安装mysql并配置binlog
- 安装elasticsearch
- 安装Canal server
- 在springboot实现整合
所用环境版本类型:
- MacOS Monterey
- mysql 5.7.36
- es 7.17.9
- cannal.server: 1.1.5
1、安装mysql
安装命令
shell复制代码// 拉取镜像
docker pull mysql:5.7.36
// 创建容器
docker run --name mysql5.7.36 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=admin -d mysql:5.7.36
// 进入容器
docker exec -it mysql5.7.36 /bin/bash
// 添加vim编辑
apt-get update
apt-get install vim
#注意 apt-get报错,可换yum命令使用。
// 配置mysql
cd /etc/mysql/mysql.conf.d
vim mysqld.cnf // 修改mysql配置
配置:与官网一致
bash复制代码[mysqld]
#binlog setting
log-bin=mysql-bin // 开启logbin
binlog-format=ROW // binlog日志格式
server-id=1 // mysql主从备份serverId,canal中不能与此相同
数据库创建一个canal账号,并且设置slave,dump权限
bash复制代码CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
因为mysql8.0.3后身份检验方式为caching_sha2_password,但canal使用的是mysql_native_password,因此需要设置检验方式(如果该版本之前的可跳过),否则会报错IOException: caching_sha2_password Auth failed
bash复制代码ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
select host,user,plugin from mysql.user ;
保存退出,然后重启mysql容器
bash复制代码docker restart mysql5.7.36
做完这一步可查看前面是否有误:
bash复制代码// 进入容器
docker exec -it mysql5.7.36 /bin/bash
mysql -uroot -p
show master status // binlog日志文件
reset master; // 重启日志
show variables like 'binlog_format'; //查看是否配置成功
查看日志文件:
bash复制代码cd /var/lib/mysql // 进入日志文件目录
mysqlbinlog -vv mysql-bin.000001 // row格式查看日志
mysql已经安装成功了。
2、安装es
略。
3、安装canal-server
shell复制代码docker pull canal/canal-server:v1.1.5
docker run --name canal1.1.5 -p 11111:11111 --link mysql5.7.36:mysql5.7.36 -id canal/canal-server:v1.1.5
修改对应的配置:
bash复制代码docker exec -it canal1.1.5 /bin/bash
cd canal-server/conf/example/
vi instance.properties // 修改配置
# 把0改成10,只要不和mysql的id相同就行
canal.instance.mysql.slaveId=10
# 修改成mysql对应的账号密码,mysql5.7.36就是mysql镜像的链接别名
canal.instance.master.address=mysql5.7.36:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
properties复制代码#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=10
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=mysql5.7.36:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
也可精确点对点同步(不是很复杂默认就好,无需配置下面内容)
properties复制代码# 开始同步的binlog日志文件,注意这里的binlog文件名以你自己查出来的为准
canal.instance.master.journal.name=mysql-bin.000001
# 开始同步的binlog文件位置
canal.instance.master.position=0
# 开始同步时间点 时间戳形式
canal.instance.master.timestamp=1546272000
# 配置不同步的mysql库
canal.instance.filter.black.regex=mysql\..*
mysql数据同步起点说明:
canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动
canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动不指定任何信息:默认从当前数据库的位点,进行启动。(show master status)
instance.properties文件的配置详情可访问github.com/alibaba/can…
查看配置是否成功:
bash复制代码#首先重启一下canal
docker restart canal1.1.5
docker exec -it canal1.1.5 /bin/bash
cd canal-server/logs/example/
tail -100f example.log // 查看日志
如以上状况,则说明mysql连接canal-server成功,此时mysql中的数据变化,都会在canal中有同步。
可以通过Java程序测试有没连接上mysql:
xml复制代码<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
java复制代码
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
在mysql的可视化工具进行数据更新后idea控制台就会有数据的显示。
4、整合springboot
自定义客户端client
1、新建一个springboot项目
引入依赖spring-boot-starter-data-elasticsearch、canal-spring-boot-starter
xml复制代码<!-- Canal客户端服务 -->
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
<!-- elasticsearch-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
2、修改配置文件application.yml
yml复制代码# 应用名称
spring:
# es配置
elasticsearch:
uris: http://localhost:9200
username: root
password: 123456
# 数据库配置
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://192.168.244.17:3306/canal_test?useSSL=false&useUnicode=true&characterEncoding=utf-8
username: root
password: 123456
# canal服务端地址
canal:
destination: example # canal的集群名字,要与安装canal时设置的名称一致
server: 127.0.0.1:11111 # canal服务地址
# 设置canal消息日志打印级别
logging:
level:
top.javatool.canal.client: warn
3、新建实体类为下面数据同步做准备。
新建mysql表实体类
java复制代码import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableLogic;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable;
import java.util.Date;
import lombok.Data;
/**
* 帖子
*
*/
@TableName(value = "post")
@Data
public class Post implements Serializable {
/**
* id
*/
@TableId(type = IdType.ASSIGN_ID)
private Long id;
/**
* 标题
*/
private String title;
/**
* 内容
*/
private String content;
/**
* 标签列表 json
*/
private String tags;
/**
* 点赞数
*/
private Integer thumbNum;
/**
* 收藏数
*/
private Integer favourNum;
/**
* 创建用户 id
*/
private Long userId;
/**
* 创建时间
*/
private Date createTime;
/**
* 更新时间
*/
private Date updateTime;
/**
* 是否删除
*/
@TableLogic
private Integer isDelete;
@TableField(exist = false)
private static final long serialVersionUID = 1L;
}
新建es实体类
java复制代码import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.song.search.model.entity.Post;
import lombok.Data;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
/**
* 帖子 ES 包装类
*
*
**/
@Document(indexName = "post_v1")
@Data
public class PostEsDTO implements Serializable {
private static final String DATE_TIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
/**
* id
*/
@Id
private Long id;
/**
* 标题
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
private String title;
/**
* 内容
*/
@Field(type = FieldType.Text, analyzer = "ik_max_word", searchAnalyzer = "ik_smart")
private String content;
/**
* 标签列表
*/
@Field(type = FieldType.Keyword)
private List<String> tags;
/**
* 创建用户 id
*/
@Field(type = FieldType.Keyword)
private Long userId;
/**
* 创建时间
*/
@Field(index = false, store = true, type = FieldType.Date, format = {}, pattern = DATE_TIME_PATTERN)
private Date createTime;
/**
* 更新时间
*/
@Field(index = false, store = true, type = FieldType.Date, format = {}, pattern = DATE_TIME_PATTERN)
private Date updateTime;
/**
* 是否删除
*/
@Field(type = FieldType.Keyword)
private Integer isDelete;
private static final long serialVersionUID = 1L;
private static final Gson GSON = new Gson();
/**
* 对象转包装类
*
* @param post
* @return
*/
public static PostEsDTO objToDto(Post post) {
if (post == null) {
return null;
}
PostEsDTO postEsDTO = new PostEsDTO();
BeanUtils.copyProperties(post, postEsDTO);
String tagsStr = post.getTags();
if (StringUtils.isNotBlank(tagsStr)) {
postEsDTO.setTags(GSON.fromJson(tagsStr, new TypeToken<List<String>>() {
}.getType()));
}
return postEsDTO;
}
/**
* 包装类转对象
*
* @param postEsDTO
* @return
*/
public static Post dtoToObj(PostEsDTO postEsDTO) {
if (postEsDTO == null) {
return null;
}
Post post = new Post();
BeanUtils.copyProperties(postEsDTO, post);
List<String> tagList = postEsDTO.getTags();
if (CollectionUtils.isNotEmpty(tagList)) {
post.setTags(GSON.toJson(tagList));
}
return post;
}
}
4、接下来我们基于canal-client提供的EntryHandler类来实现对于数据表的监控,从而达到数据的增删改同步。
java复制代码import com.song.search.esdao.PostEsDao;
import com.song.search.model.dto.post.PostEsDTO;
import com.song.search.model.entity.Post;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
import javax.annotation.Resource;
@CanalTable("post") // mysql数据库表明
@Component
@Slf4j
public class CanalSyncPostToEs implements EntryHandler<Post> {
@Resource
private PostEsDao postEsDao;
/**
* mysql中数据有新增时自动执行
*
* @param hotel 新增的数据
*/
@Override
public void insert(Post hotel) {
PostEsDTO postEsDTO = PostEsDTO.objToDto(hotel);
//把新增数据hotel,添加到ES即可
postEsDao.save(postEsDTO);
}
/**
* mysql中数据有修改时自动执行
*
* @param before 修改前的数据
* @param after 修改后的数据
*/
@Override
public void update(Post before, Post after) {
//把修改数据,更新到ES即可
PostEsDTO postEsDTO = PostEsDTO.objToDto(after);
//把新增数据hotel,添加到ES即可
postEsDao.save(postEsDTO);
}
/**
* ysql中数据有删除时自动执行
*
* @param hotel 要删除的数据
*/
@Override
public void delete(Post hotel) {
//把要删除的数据hotel,从ES删除即可
postEsDao.deleteById(hotel.getId());
}
}
补充:es的dao操作/与mybatis操作数据库相同
java复制代码/**
* 帖子 ES 操作
*
*
*/
public interface PostEsDao extends ElasticsearchRepository<PostEsDTO, Long> {
}
最后测试一下数据库的增删改。
总结
自此利用canal实现mysql的数据同步到elasticsearch就演示完成了,如果有更加复杂的同步逻辑,也可以在代码中自定义实现,并且第三方组件canal-spring-boot-starter极大的简化了自定义canal客户端的难度。
遗憾的是canal-spring-boot-starter的作者目前已经停止了对其的维护,其最新版对应的canal实际是1.1.3版本的,不过实测还不影响我们对接canal1.1.5。对canal客户端又更高性能的需求,可以研究源码,高度二开。
- 上一篇:将VIM配置成简单的IDE
- 下一篇:如何成功运用扁平化设计与色彩趋势
相关推荐
- 10款超实用JavaScript音频库(js播放音频代码)
-
HTML5提供了一种新的音频标签实现和规范用一个简单的HTML对象而无需音频插件来控制音频。这只是一个简单的整合这些新的HTML5音频特征及使用JavaScript来创建各种播放控制。下面将介绍10款...
- PROFINET转Modbus网关——工业协议融合的智能枢纽
-
三格电子SG-PNh750-MOD-221,无缝连接Profinet与Modbus,赋能工业物联产品概述...
- 简单实用的Modbus类库,支持从站和DTU
-
一、简介...
- [西门子PLC] S7-200 SMART PROFINET :通过GSD组态PLC设备
-
从S7-200SMARTV2.5版本开始,S7-200SMART开始支持做PROFINETIO通信的智能设备。从而,两个S7-200SMART之间可以进行PROFINETI...
- Modbus(RTU / TCP)有什么异同(modbus tcp和tcp)
-
Modbus是一种广泛使用的工业自动化通信协议,它支持设备之间的数据交换。Modbus协议有两个主要的变体:ModbusRTU(二进制模式)和ModbusTCP(基于TCP/IP网络的模式)。尽管...
- Modbus通信调试步骤详解(modbus调试工具怎么用)
-
Modbus通信调试步骤详解 Modbus通信分为串口和以太网,无论是串口还是以太网,只要是标准Modbus,就可以用Modbus模拟器进行调试。按以下几步进行调试。...
- 理解Intel手册汇编指令(intel 汇编指令手册)
-
指令格式...
- 「西门子PLC」S7-200 SMART的Modbus RTU通讯
-
S7-200SMART集成的RS485端口(端口0)以及SBCM01RS485/232信号板(端口1)两个通信端口可以同时做MODBUSRTU主站,或者一个做MODBUSRTU主站一个做MO...
- InfiniBand网络运维全指南:从驱动安装到故障排查
-
一、InfiniBand网络概述InfiniBand(直译为“无限带宽”技术,缩写为IB)是一种用于高性能计算的计算机网络通信标准,具有极高的吞吐量和极低的延迟,用于计算机与计算机之间的数据互连。它...
- 一加回归 OPPO,背后的秘密不可告人
-
有这样一个手机品牌,它诞生于互联网品牌。在大众群体看来,它的身世似乎模糊不清,许多人以为它是国外品牌。它的产品定位是极客群体,深受国内发烧友,甚至国外极客玩家喜爱。...
- [西门子PLC] S7-200SMART快速高效的完成Modbus通信程序的设计
-
一、导读Modbus通信是一种被广泛应用的通信协议,在变频器、智能仪表还有其他一些智能设备上都能见到它的身影。本文呢,就把S7-200SMART系列PLC当作Modbus主站,把...
- 狂肝10个月手搓GPU,他们在我的世界中玩起我的世界,梦想成真
-
梦晨衡宇萧箫发自凹非寺量子位|公众号QbitAI自从有人在《我的世界》里用红石电路造出CPU,就流传着一个梗:...
- [西门子PLC] 博途TIA portal SCL编程基础入门:1-点动与自锁
-
一、S7-SCL编程语言简介...
- 工作原理系列之:Modbus(modbus工作过程)
-
MODBUS是一种在自动化工业中广泛应用的高速串行通信协议。该协议是由Modion公司(现在由施耐德电气公司获得)于1979年为自己的可编程逻辑控制器开发的。该协议充当了PLCS和智能自动化设备之间的...
你 发表评论:
欢迎- 一周热门
-
-
Linux:Ubuntu22.04上安装python3.11,简单易上手
-
宝马阿布达比分公司推出独特M4升级套件,整套升级约在20万
-
MATLAB中图片保存的五种方法(一)(matlab中保存图片命令)
-
别再傻傻搞不清楚Workstation Player和Workstation Pro的区别了
-
Linux上使用tinyproxy快速搭建HTTP/HTTPS代理器
-
如何提取、修改、强刷A卡bios a卡刷bios工具
-
Element Plus 的 Dialog 组件实现点击遮罩层不关闭对话框
-
MacOS + AList + 访达,让各种云盘挂载到本地(建议收藏)
-
日本组合“岚”将于2020年12月31日停止团体活动
-
SpringCloud OpenFeign 使用 okhttp 发送 HTTP 请求与 HTTP/2 探索
-
- 最近发表
-
- 10款超实用JavaScript音频库(js播放音频代码)
- Howler.js,一款神奇的 JavaScript 开源网络音频工具库
- PROFINET转Modbus网关——工业协议融合的智能枢纽
- 简单实用的Modbus类库,支持从站和DTU
- [西门子PLC] S7-200 SMART PROFINET :通过GSD组态PLC设备
- Modbus(RTU / TCP)有什么异同(modbus tcp和tcp)
- Modbus通信调试步骤详解(modbus调试工具怎么用)
- 理解Intel手册汇编指令(intel 汇编指令手册)
- 「西门子PLC」S7-200 SMART的Modbus RTU通讯
- InfiniBand网络运维全指南:从驱动安装到故障排查
- 标签列表
-
- dialog.js (57)
- importnew (44)
- windows93网页版 (44)
- yii2框架的优缺点 (45)
- tinyeditor (45)
- qt5.5 (60)
- windowsserver2016镜像下载 (52)
- okhttputils (51)
- android-gif-drawable (53)
- 时间轴插件 (56)
- docker systemd (65)
- slider.js (47)
- android webview缓存 (46)
- pagination.js (59)
- loadjs (62)
- openssl1.0.2 (48)
- velocity模板引擎 (48)
- pcre library (47)
- zabbix微信报警脚本 (63)
- jnetpcap (49)
- pdfrenderer (43)
- fastutil (48)
- uinavigationcontroller (53)
- bitbucket.org (44)
- python websocket-client (47)