博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka Kafka-Connect Debezium 目标:同步Mysql
阅读量:3497 次
发布时间:2019-05-19

本文共 2495 字,大约阅读时间需要 8 分钟。

Kafka Kafka-Connect Debezium 目标:同步Mysql



  • 现阶段实现到 通过以上插件可以实现检测到Mysql 把更改信息通过 Connect 写入到 Kafka的topic中一下是对现阶段的记录。由于需要多个包,采用docker来进行
  1. 首先安装docker
    1. yum install docker
    2. systemctl start docker
  2. 下载镜像 debezium/zookeeper:0.9
    1. docker pull debezium/zookeeper:0.9
    2. 运行zookeeper docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.9
  3. 下载镜像debezium/kafka:0.9
    1. docker pull debezium/kafka:0.9
    2. docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.9
  4. 安装mysql
    1. docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9
  5. Mysql-client
    1. docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c ‘exec mysql -h" M Y S Q L P O R T 3 30 6 T C P A D D R " − P " MYSQL_PORT_3306_TCP_ADDR" -P" MYSQLPORT3306TCPADDR"P"MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"’
    2. use inventory;
    3. show tables;
    4. SELECT * FROM customers;
  6. Kafka connect
    1. docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.9
    2. 检查
      1.$ curl -H “Accept:application/json” localhost:8083/
      2.$ curl -H “Accept:application/json” localhost:8083/connectors/
  7. 监视MySQL数据库
    1. curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” localhost:8083/connectors/ -d ‘{ “name”: “inventory-connector”, “config”: { “connector.class”: “io.debezium.connector.mysql.MySqlConnector”, “tasks.max”: “1”, “database.hostname”: “mysql”, “database.port”: “3306”, “database.user”: “debezium”, “database.password”: “dbz”, “database.server.id”: “184054”, “database.server.name”: “dbserver1”, “database.whitelist”: “inventory”, “database.history.kafka.bootstrap.servers”: “kafka:9092”, “database.history.kafka.topic”: “dbhistory.inventory” } }’
      2.查看创建的connectors curl -H “Accept:application/json” localhost:8083/connectors/
  8. 查看变更事件
    1.进入 kafka中的 查询topic 会发现多了一个dbhistory.inventory节点
    2.创建dbhistory.inventory topic的节点消费者
  9. 变更数据
    1. UPDATE customers SET first_name=‘Anne Marie’ WHERE id=1004;
  10. 观察消费者topic节点发现记录下mysql的更改


以上实现了检测数据库的变更并把变更信息写入到kafka中,为实现以及现有的思路

  • 现在已将mysql中的数据变更记录在kafka中现在只能检测到变更后然后记录到kafka中接下来大目标是实现mysql的增量同步思路是通过检测到的mysql信息借助kafka—connect实现数据库的增量同步
  • 以上的思路是增量同步思路但是通过思考感觉全量同步通过记录变更不太现实,感觉需要通过debezium 来进行读取分析binlog 日志 通过这种方式可以实现全量同步

转载地址:http://twklj.baihongyu.com/

你可能感兴趣的文章
并发与并行,线程与进程
查看>>
方法引用,通过对象名引用成员变量
查看>>
常用工具类 Math:数学计算 Random:生成伪随机数 SecureRandom:生成安全的随机数 2020-2-13
查看>>
Java的异常Exception 2020-2-13
查看>>
Java标准库定义的常用异常,自定义异常 2020-2-15
查看>>
Java问题百度/Google记录 2020-2-16
查看>>
【PADS9.5】9,对比ECO核心板,Router移动元件后布线消失,Router找不到自动布线策略文件丢失或损坏
查看>>
【STM32+w5500汇总】23,HTTP_Client 连接到ONENET上传了一段数据之后会断开,数据上传格式的设置
查看>>
【STM32+W5500+MQTT】24,所有功能都可以通过API函数的调用来实现;HTTP接入ONENET,API开发手册和打包函数,串口软件HTTP连接服务器上传数据,2018年12月28日
查看>>
【STM32+W5500+HTTPClient】25,路由器DHCP租赁IP时间为2h,NetBios可以很好的解决IP变化的问题,DNS,2018年12月25日
查看>>
【STM32+MQTT+ONENET】26,MQTT协议接入OneNET
查看>>
【STM32+W5500+MQTT+ONENET】27,MQTT协议接入OneNET实际编程操作 2018年12月27日
查看>>
【STM32Cube+FreeRTOS 】28,KEIL5的F12不起作用;***JLink Error: Can not read register x while CPU is running
查看>>
【STM32CubeMX+FreeRTOS 】29,prtinf卡死;4任务只运行了3个;W5500联网失败(堆栈不能太大或者太小)
查看>>
【STM32+FreeRTOS +W5500移植要点】30,RTOS中断;从TIM2,主TIM3;RTOS主要用在LCD中;RT-Thread;标志重定义问题 2019年01月22日
查看>>
【STM32+FPGA+FSMC】31,FSMC熟练掌握;KEIL5生成bin文件;SDRAM的使用;IAP检验码 2019年04月10日
查看>>
【IC1】【转 非常好】运算放大器使用的六个经验
查看>>
【IC-ADC 3】ADC的选型
查看>>
2019年03月18日 查看数据手册的注意点,极限参数、电气参数、推荐参数
查看>>
HiKey960/970用户手册;HiKey960 Development Board User Manual
查看>>