基于Docker结合Canal实现MySQL实时增量数据传输功能

yizhihongxing

背景介绍

Docker 是一种容器化技术,具有可移植性、扩展性和可靠性等优点,能够帮助开发者更方便地构建、发布和运行应用程序。而 Canal 是阿里巴巴开源的一套 MySQL 数据库增量订阅和消费组件,它通过解析 MySQL 的 binlog,实时将增量数据同步到 RabbitMQ、Kafka 和 Canal Server 等中间件中。在本文中,我们将分享如何基于 Docker 结合 Canal 实现 MySQL 实时增量数据传输功能的完整攻略。

步骤说明

第一步:安装 Docker 和 Docker-Compose

在安装 Docker 和 Docker-Compose 前先确认是否已经安装了 Git。安装完 Git 后,则可以利用 Git 下载 Docker 和 Docker-Compose 两个安装包,其中 Docker 是构建和发布容器的工具,Docker-Compose 是 Docker 的服务编排工具。

第二步:创建 Dockerfile

在创建 Dockerfile 时,需指明该镜像是基于官方 MySQL 镜像构建的,安装 Cana 环境的基础镜像,以及从服务器端将私有仓库镜像拉取到本地所需的环境等。

FROM mysql:5.7
MAINTAINER Canal-Server <Canal@example.com>

ADD ./sources.list /etc/apt/

RUN apt-get update && apt-get install -y supervisor && apt-get clean && rm -rf /var/lib/apt/lists/*

RUN mkdir -p /usr/local/service

COPY ./mysql.ini /usr/local/service/
COPY ./supervisor.conf /etc/supervisor/conf.d/
ADD ./cana-bin.tar.gz /usr/local/service/

CMD ["/usr/bin/supervisord","-n"]

第三步:编写 Docker-Compose 文件

使用 Docker-Compose 实现在容器内部启动 Canal-Server、MySQL 和 Canal-Client。Docker-Compose 使用 YAML 文件编写配置信息,将 Canai-Server、MySQL 和 Canal-Client 打包成三个服务。其中,Canal-Server 和 MySQL 服务在后台运行,而 Canai-Client 则需要获取待同步的表信息,再进行增量数据同步。

version: '3.4'
services:
  canal:
    container_name: canal
    build: .
    restart: always
    ports:
      - "11111:11111"
      - "11112:11112"
      - "3306:3306"
    volumes:
      - ./mysql-data:/var/lib/mysql
      - ./canal-server:/usr/local/service/
    depends_on:
      - mysql
    links:
      - mysql
    command: bash -c "cd /usr/local/service/bin && start.sh"

  mysql:
    image: mysql:5.7
    container_name: mysql
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: pwd
      TZ: Asia/Shanghai
    ports:
      - "3306:3306"
    volumes:
      - ./mysql-data:/var/lib/mysql
      - ./mysql-conf:/etc/mysql/conf.d
    command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci

  canal_client:
    container_name: canal_client
    build: ./canal-client
    working_dir: /usr/src/app
    command: python app.py
    environment:
      - PYTHONUNBUFFERED=0
      - DATA_PORT=11112
      - CANAL_IP=canal
      - CANAL_PORT=11111
      - MYSQL_HOST=mysql
      - MYSQL_PORT=3306
      - MYSQL_USER=root
      - MYSQL_PASSWORD=pwd
    volumes:
      - ./canal-client:/usr/src/app
    depends_on:
      - mysql
      - canal

第四步:创建 Canal-Client 和 MySQL 实例

通过 Canal-Client 将 MySQL 数据库表中的数据实时同步到 Canal RabbitMQ 中,并在 RabbitMQ 中处理数据,以便进行任意消费。

#!/usr/bin/python
import os

from src.clientCanal.canalApi import CanalApi
from src.clientCanal.format.format import filter_uncode
from src.clientCanal.extra.connect_rabbitmq import enqueue_rabbitmq

if __name__ == '__main__':
    canal_api = CanalApi(os.getenv("CANAL_IP"), os.getenv("CANAL_PORT"), os.getenv("MYSQL_HOST"),
                         os.getenv("MYSQL_PORT"),
                         os.getenv("MYSQL_USER"), os.getenv("MYSQL_PASSWORD"), os.getenv("DATA_PORT"))
    canal_api.init_client()

    while True:
        message = canal_api.get_batch_message()
        records = filter_uncode(message['data'])
        for record in records:
            if 'before' in record:
                record.pop('before')
            enqueue_rabbitmq(record, os.getenv("ENQUEUE_QUEUE"))

第五步:从 Canal Server 拉取增量数据

在我们实现数据同步之前,需要对 Canal-Server 进行数据同步处理。可以通过调用 Canal 官方提供的 API 接口获取待同步表的名称,从而创建 Canal 基本信息,并调用 Canal 官方提供的命令,拉取增量数据。在 config.yml 文件中配置待同步的表名称、RabbitMQ 地址和 RabbitMQ 用户名密码等信息。

canalServer: "172.31.3.131:11111"
MySQLConf:
  - host: "172.31.3.131"
    port: 3306
    db: "test"
    username: "root"
    password: "password"
rabbitmq:
  username: "root"
  password: "password"
  host: "172.31.3.131"
  virtualhost: "/"
  queue: "my_queue"
  exchange: "my_exchange"
  routingKey: "my_routingKey"
table: ["test_table1", "test_table2"]

第六步:查看同步数据

最后,我们在 RabbitMQ 上查看待同步的数据是否成功发送到该中间件中。

示例说明一

在本例中,我们将演示如何通过 Canai-Server 和 RabbitMQ 实现两个不同 MySQL 数据表之间的实时数据同步。首先,下载 Canai-Server 镜像和 RabbitMQ 镜像,并通过 Docker-Compose 启动容器。

$ mkdir mysql-data
$ sudo chmod 777 mysql-data
$ docker-compose up -d

接着,编辑 config.yml 文件,指定待同步的 MySQL 数据表名称、RabbitMQ 配置信息,并将 config.yml 文件和 Canal 数据库 jar 包挂载到 Canal-Server 容器内。

canalServer: "172.31.3.131:11111"
MySQLConf:
  - host: "172.31.3.131"
    port: 3306
    db: "test"
    username: "root"
    password: "password"
rabbitmq:
  username: "root"
  password: "password"
  host: "172.31.3.131"
  virtualhost: "/"
  queue: "my_queue"
  exchange: "my_exchange"
  routingKey: "my_routingKey"
table: ["test_table1", "test_table2"]

最后,在 Canal-Client 目录下启动数据同步服务。

$ cd canal-client
$ python app.py

示例说明二

我们将演示如何通过 Canal 完成 MySQL 数据库和 HBase 之间的实时增量数据传输。首先,启动 MySQL、Canal 和 HBase 三个服务。

$mkdir mysql-data
$sudo chmod 777 mysql-data
$ docker-compose up -d

接着,通过 Canal-Client 拉取 MySQL 数据表的增量数据,并调用 HBase 数据接口,将这些增量数据保存到 HBase 中。由于 HBase 的写入速度很慢,因此在 Canal-Client 中使用线程池实现并发写入。

#!/usr/bin/env python
# coding: utf-8
import os
import threading
import Queue
from src.clientCanal.canalApi import CanalApi
from src.clientCanal.format.format import filter_utf8
from src.clientCanal.extra.connect_hbase import get_hbase_conn, set_table_value, bulk_set_table_values

TABLE = ''
HBASE_FAMILY = 'info'


class WriteToHbaseThread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            row_key, data = self.queue.get()
            set_table_value(TABLE, row_key, data, HBASE_FAMILY)
            self.queue.task_done()


def process_data_queue(rows, queue):
    if rows:
        for row in rows:
            data = filter_utf8(row[1]['data'])  # 去除一些其他应用导致的乱码
            row_key = str(row[1]['table']) + "_" + str(row[1]['id'])
            queue.put((row_key, str(data)))


if __name__ == '__main__':
    conn = get_hbase_conn()
    TABLE = os.environ.get('TABLE', 'test')
    canal_api = CanalApi(os.getenv("CANAL_IP"), os.getenv("CANAL_PORT"), os.getenv("MYSQL_HOST"),
                         os.getenv("MYSQL_PORT"),
                         os.getenv("MYSQL_USER"), os.getenv("MYSQL_PASSWORD"),
                         os.getenv("CANAL_SYNC_TABLE_NAME"))
    canal_api.init_client()
    write_queue = Queue.Queue()
    for x in range(4):
        ith_thread = WriteToHbaseThread(write_queue)
        ith_thread.daemon = True
        ith_thread.start()
    while True:
        message = canal_api.get_batch_message()
        process_data_queue(message['rows'], write_queue)
        write_queue.join()

接着,在 config.yml 中配置待同步的信息。

canalServer: "172.31.3.131:11111"
MySQLConf:
  - host: "172.31.3.131"
    port: 3306
    db: "test"
    username: "root"
    password: "password"
rabbitmq:
  username: "root"
  password: "password"
  host: "172.31.3.131"
  virtualhost: "/"
  queue: "my_queue"
  exchange: "my_exchange"
  routingKey: "my_routingKey"
table: ["test_table1", "test_table2"]
hbase:
  host: "172.31.3.131"
  port: 9090

最后,启动 Canal-Client,将数据同步到 HBase 中。

$ cd canal-client
$ python app.py

结论

基于 Docker 结合 Canal 实现 MySQL 实时增量数据传输功能的完整攻略是一项非常实用的工作,我们希望本文介绍的所有示例都能够帮助读者更深入地了解 Canal 和 Docker 的应用,同时也希望读者能够通过本文的指导,掌握 Canal 的使用技能,提升数据处理效率,实现更好的业务价值。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:基于Docker结合Canal实现MySQL实时增量数据传输功能 - Python技术站

(0)
上一篇 2023年5月21日
下一篇 2023年5月21日

相关文章

  • linux下mysql自动备份脚本代码

    下面就为你讲解“Linux下MySQL自动备份脚本代码”的完整攻略。 1. 安装MySQL 在开始备份之前,需要先安装MySQL,这里以CentOS系统为例。 sudo yum install mysql-server sudo service mysqld start sudo chkconfig mysqld on 2. 安装MySQL备份脚本 备份脚本…

    database 2023年5月22日
    00
  • 使用SQL Server 获取插入记录后的ID(自动编号)

    为了获取插入记录后的ID,我们需要使用 SQL Server 中的自增长字段(也称为自动编号)。自增长字段是一个特殊的列,它会自动为每个新的记录分配一个唯一的值,通常用于记录的主键。 下面是获取插入记录后的ID的步骤: 步骤一:创建表 首先,我们需要在数据库中创建一个包含自增长字段的表。 CREATE TABLE [dbo].[customers]( [cu…

    database 2023年5月21日
    00
  • oracle12C安装步骤(图文详解)

    这里是”oracle12C安装步骤(图文详解)”的完整攻略。 1. 下载Oracle 12c安装包 首先,你需要在Oracle官网上下载Oracle 12c的安装包。下载完毕后,解压缩到指定目录。 2. 安装JDK Oracle 12c需要JDK的支持。安装JDK的方法在这里略过,安装前需要确保已经安装了JDK,并且设置了环境变量。 3. 安装Oracle …

    database 2023年5月22日
    00
  • mysql建表常用的sql语句汇总

    下面我将详细讲解“mysql建表常用的sql语句汇总”的完整攻略。 一、创建数据库 在使用mysql建表之前,我们需要根据需求创建一个数据库。创建数据库的sql语句如下: CREATE DATABASE mydb CHARACTER SET utf8 COLLATE utf8_general_ci; 上面的sql语句创建了一个名为mydb的数据库,并设置了数…

    database 2023年5月21日
    00
  • MySQL主从同步中的server-id示例详解

    在MySQL主从同步中,每一个实例都需要有一个独一无二的server-id。server-id是MySQL实例在进行主从同步时,使用的一个重要标识,用于识别不同的MySQL实例,避免数据在传输过程中混淆。 下面是关于MySQL主从同步中的server-id的详细攻略: 什么是server-id server-id是MySQL主从同步中扮演重要角色的标识。每一…

    database 2023年5月22日
    00
  • CentOS7使用rpm包安装mysql 5.7.18

    下面是CentOS7使用rpm包安装MySQL 5.7.18的完整攻略: 1. 下载MySQL rpm包 从官方下载站点(https://dev.mysql.com/downloads/mysql/)下载适用于CentOS7的MySQL rpm包。可以根据自己的需要选择不同的包,比如选择一个基于gcc编译的通用Linux RPM安装包。 下载完成后可将安装包…

    database 2023年5月22日
    00
  • CentOS 7安装MySQL的详细步骤

    下面是CentOS 7安装MySQL的详细步骤: 环境准备 在开始安装MySQL之前,需要对环境进行准备: 确保服务器可以连接到互联网,并具备sudo权限。 确认本地没有安装MySQL或MariaDB,如果有,请先卸载。 安装MySQL 以下是在CentOS 7上安装MySQL的完整步骤: 1. 更新软件包 在安装MySQL之前,应该使用以下命令更新软件包:…

    database 2023年5月22日
    00
  • MySQL执行状态的查看与分析

    下面是关于“MySQL执行状态的查看与分析”的完整攻略。 概述 在MySQL数据库中,为了统计查询中语句的执行效率,可以通过查看和分析SQL执行状态来获取相应的信息。MySQL执行状态是一个可视化的记录工具,可以进行针对SQL语句的实时监控和查看。 MySQL执行状态的查看 查看MySQL执行状态可以使用命令:SHOW STATUS,该命令会列出MySQL服…

    database 2023年5月22日
    00
合作推广
合作推广
分享本页
返回顶部