基于Docker结合Canal实现MySQL实时增量数据传输功能

背景介绍

Docker 是一种容器化技术,具有可移植性、扩展性和可靠性等优点,能够帮助开发者更方便地构建、发布和运行应用程序。而 Canal 是阿里巴巴开源的一套 MySQL 数据库增量订阅和消费组件,它通过解析 MySQL 的 binlog,实时将增量数据同步到 RabbitMQ、Kafka 和 Canal Server 等中间件中。在本文中,我们将分享如何基于 Docker 结合 Canal 实现 MySQL 实时增量数据传输功能的完整攻略。

步骤说明

第一步:安装 Docker 和 Docker-Compose

在安装 Docker 和 Docker-Compose 前先确认是否已经安装了 Git。安装完 Git 后,则可以利用 Git 下载 Docker 和 Docker-Compose 两个安装包,其中 Docker 是构建和发布容器的工具,Docker-Compose 是 Docker 的服务编排工具。

第二步:创建 Dockerfile

在创建 Dockerfile 时,需指明该镜像是基于官方 MySQL 镜像构建的,安装 Cana 环境的基础镜像,以及从服务器端将私有仓库镜像拉取到本地所需的环境等。

FROM mysql:5.7
MAINTAINER Canal-Server <Canal@example.com>

ADD ./sources.list /etc/apt/

RUN apt-get update && apt-get install -y supervisor && apt-get clean && rm -rf /var/lib/apt/lists/*

RUN mkdir -p /usr/local/service

COPY ./mysql.ini /usr/local/service/
COPY ./supervisor.conf /etc/supervisor/conf.d/
ADD ./cana-bin.tar.gz /usr/local/service/

CMD ["/usr/bin/supervisord","-n"]

第三步:编写 Docker-Compose 文件

使用 Docker-Compose 实现在容器内部启动 Canal-Server、MySQL 和 Canal-Client。Docker-Compose 使用 YAML 文件编写配置信息,将 Canai-Server、MySQL 和 Canal-Client 打包成三个服务。其中,Canal-Server 和 MySQL 服务在后台运行,而 Canai-Client 则需要获取待同步的表信息,再进行增量数据同步。

version: '3.4'
services:
  canal:
    container_name: canal
    build: .
    restart: always
    ports:
      - "11111:11111"
      - "11112:11112"
      - "3306:3306"
    volumes:
      - ./mysql-data:/var/lib/mysql
      - ./canal-server:/usr/local/service/
    depends_on:
      - mysql
    links:
      - mysql
    command: bash -c "cd /usr/local/service/bin && start.sh"

  mysql:
    image: mysql:5.7
    container_name: mysql
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: pwd
      TZ: Asia/Shanghai
    ports:
      - "3306:3306"
    volumes:
      - ./mysql-data:/var/lib/mysql
      - ./mysql-conf:/etc/mysql/conf.d
    command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci

  canal_client:
    container_name: canal_client
    build: ./canal-client
    working_dir: /usr/src/app
    command: python app.py
    environment:
      - PYTHONUNBUFFERED=0
      - DATA_PORT=11112
      - CANAL_IP=canal
      - CANAL_PORT=11111
      - MYSQL_HOST=mysql
      - MYSQL_PORT=3306
      - MYSQL_USER=root
      - MYSQL_PASSWORD=pwd
    volumes:
      - ./canal-client:/usr/src/app
    depends_on:
      - mysql
      - canal

第四步:创建 Canal-Client 和 MySQL 实例

通过 Canal-Client 将 MySQL 数据库表中的数据实时同步到 Canal RabbitMQ 中,并在 RabbitMQ 中处理数据,以便进行任意消费。

#!/usr/bin/python
import os

from src.clientCanal.canalApi import CanalApi
from src.clientCanal.format.format import filter_uncode
from src.clientCanal.extra.connect_rabbitmq import enqueue_rabbitmq

if __name__ == '__main__':
    canal_api = CanalApi(os.getenv("CANAL_IP"), os.getenv("CANAL_PORT"), os.getenv("MYSQL_HOST"),
                         os.getenv("MYSQL_PORT"),
                         os.getenv("MYSQL_USER"), os.getenv("MYSQL_PASSWORD"), os.getenv("DATA_PORT"))
    canal_api.init_client()

    while True:
        message = canal_api.get_batch_message()
        records = filter_uncode(message['data'])
        for record in records:
            if 'before' in record:
                record.pop('before')
            enqueue_rabbitmq(record, os.getenv("ENQUEUE_QUEUE"))

第五步:从 Canal Server 拉取增量数据

在我们实现数据同步之前,需要对 Canal-Server 进行数据同步处理。可以通过调用 Canal 官方提供的 API 接口获取待同步表的名称,从而创建 Canal 基本信息,并调用 Canal 官方提供的命令,拉取增量数据。在 config.yml 文件中配置待同步的表名称、RabbitMQ 地址和 RabbitMQ 用户名密码等信息。

canalServer: "172.31.3.131:11111"
MySQLConf:
  - host: "172.31.3.131"
    port: 3306
    db: "test"
    username: "root"
    password: "password"
rabbitmq:
  username: "root"
  password: "password"
  host: "172.31.3.131"
  virtualhost: "/"
  queue: "my_queue"
  exchange: "my_exchange"
  routingKey: "my_routingKey"
table: ["test_table1", "test_table2"]

第六步:查看同步数据

最后,我们在 RabbitMQ 上查看待同步的数据是否成功发送到该中间件中。

示例说明一

在本例中,我们将演示如何通过 Canai-Server 和 RabbitMQ 实现两个不同 MySQL 数据表之间的实时数据同步。首先,下载 Canai-Server 镜像和 RabbitMQ 镜像,并通过 Docker-Compose 启动容器。

$ mkdir mysql-data
$ sudo chmod 777 mysql-data
$ docker-compose up -d

接着,编辑 config.yml 文件,指定待同步的 MySQL 数据表名称、RabbitMQ 配置信息,并将 config.yml 文件和 Canal 数据库 jar 包挂载到 Canal-Server 容器内。

canalServer: "172.31.3.131:11111"
MySQLConf:
  - host: "172.31.3.131"
    port: 3306
    db: "test"
    username: "root"
    password: "password"
rabbitmq:
  username: "root"
  password: "password"
  host: "172.31.3.131"
  virtualhost: "/"
  queue: "my_queue"
  exchange: "my_exchange"
  routingKey: "my_routingKey"
table: ["test_table1", "test_table2"]

最后,在 Canal-Client 目录下启动数据同步服务。

$ cd canal-client
$ python app.py

示例说明二

我们将演示如何通过 Canal 完成 MySQL 数据库和 HBase 之间的实时增量数据传输。首先,启动 MySQL、Canal 和 HBase 三个服务。

$mkdir mysql-data
$sudo chmod 777 mysql-data
$ docker-compose up -d

接着,通过 Canal-Client 拉取 MySQL 数据表的增量数据,并调用 HBase 数据接口,将这些增量数据保存到 HBase 中。由于 HBase 的写入速度很慢,因此在 Canal-Client 中使用线程池实现并发写入。

#!/usr/bin/env python
# coding: utf-8
import os
import threading
import Queue
from src.clientCanal.canalApi import CanalApi
from src.clientCanal.format.format import filter_utf8
from src.clientCanal.extra.connect_hbase import get_hbase_conn, set_table_value, bulk_set_table_values

TABLE = ''
HBASE_FAMILY = 'info'


class WriteToHbaseThread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            row_key, data = self.queue.get()
            set_table_value(TABLE, row_key, data, HBASE_FAMILY)
            self.queue.task_done()


def process_data_queue(rows, queue):
    if rows:
        for row in rows:
            data = filter_utf8(row[1]['data'])  # 去除一些其他应用导致的乱码
            row_key = str(row[1]['table']) + "_" + str(row[1]['id'])
            queue.put((row_key, str(data)))


if __name__ == '__main__':
    conn = get_hbase_conn()
    TABLE = os.environ.get('TABLE', 'test')
    canal_api = CanalApi(os.getenv("CANAL_IP"), os.getenv("CANAL_PORT"), os.getenv("MYSQL_HOST"),
                         os.getenv("MYSQL_PORT"),
                         os.getenv("MYSQL_USER"), os.getenv("MYSQL_PASSWORD"),
                         os.getenv("CANAL_SYNC_TABLE_NAME"))
    canal_api.init_client()
    write_queue = Queue.Queue()
    for x in range(4):
        ith_thread = WriteToHbaseThread(write_queue)
        ith_thread.daemon = True
        ith_thread.start()
    while True:
        message = canal_api.get_batch_message()
        process_data_queue(message['rows'], write_queue)
        write_queue.join()

接着,在 config.yml 中配置待同步的信息。

canalServer: "172.31.3.131:11111"
MySQLConf:
  - host: "172.31.3.131"
    port: 3306
    db: "test"
    username: "root"
    password: "password"
rabbitmq:
  username: "root"
  password: "password"
  host: "172.31.3.131"
  virtualhost: "/"
  queue: "my_queue"
  exchange: "my_exchange"
  routingKey: "my_routingKey"
table: ["test_table1", "test_table2"]
hbase:
  host: "172.31.3.131"
  port: 9090

最后,启动 Canal-Client,将数据同步到 HBase 中。

$ cd canal-client
$ python app.py

结论

基于 Docker 结合 Canal 实现 MySQL 实时增量数据传输功能的完整攻略是一项非常实用的工作,我们希望本文介绍的所有示例都能够帮助读者更深入地了解 Canal 和 Docker 的应用,同时也希望读者能够通过本文的指导,掌握 Canal 的使用技能,提升数据处理效率,实现更好的业务价值。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:基于Docker结合Canal实现MySQL实时增量数据传输功能 - Python技术站

(0)
上一篇 2023年5月21日
下一篇 2023年5月21日

相关文章

  • Mybatis注解方式操作Oracle数据库详解

    Mybatis注解方式操作Oracle数据库详解 1. 前言 Mybatis 是一种优秀的持久化框架,可以使数据访问层代码变得简洁而有扩展性。在Mybatis中,有两种配置方式(xml映射和注解映射),其中注解配置方式简单,直观,快速,特别适合针对简单的语句进行编程。 本文主要介绍在Oracle数据库中使用 Mybatis 注解方式的详细步骤。 2. 添加依…

    database 2023年5月21日
    00
  • MongoDB正则表达式使用方法全攻略

    MongoDB正则表达式概述 正则表达式是用来匹配字符串的一种方式。在 MongoDB 中,正则表达式可以用来做字符串的匹配查询。 在 MongoDB 中,正则表达式的语法跟 Javascript 中的正则表达式语法基本相同,它们都是采用斜杠(/)包围正则表达式模式,并用可选的标记来修饰模式。 下面是 MongoDB 正则表达式的语法: /pattern/m…

    MongoDB 2023年3月14日
    00
  • Mysql合并结果接横向拼接字段的实现步骤

    实现Mysql合并结果接横向拼接字段需要使用到Mysql的联接查询和GROUP_CONCAT函数,具体步骤如下: 1.使用联接查询将需要合并的表联接起来,联接的条件为两个表中的一列或多列数据相同。 SELECT A.id, A.name, B.age FROM tableA A JOIN tableB B ON A.id = B.id; 以上示例中,假设ta…

    database 2023年5月22日
    00
  • Mysql常用命令 详细整理版

    MySQL是一款常见的关系型数据库管理系统,非常适合用于构建应用程序和Web网站。了解MySQL的基本命令非常重要,可以方便地管理和维护数据库。 登录MySQL 在终端或命令行中输入以下命令以登录MySQL: mysql -u username -p 其中,username是你的MySQL用户名,输入密码后即可进入MySQL。 创建数据库 使用以下命令创建一…

    database 2023年5月21日
    00
  • MySQL性能优化是什么,如何定位效率低下的SQL?

    MySQL性能优化是通过调整数据库的配置参数、SQL语句的优化以及硬件部署的优化等多方面综合提高MySQL数据库的性能,从而更好地支持应用程序的工作。MySQL性能的优化包含了很多方面,下面将从定位效率低下的SQL入手,深入探讨如何实现MySQL性能优化。 定位效率低下的SQL 使用explain命令分析SQL语句的执行计划 explain命令是MySQL自…

    MySQL 2023年3月10日
    00
  • SpringMVC与Mybatis集合实现调用存储过程、事务控制实例

    下面详细讲解SpringMVC与Mybatis集合实现调用存储过程、事务控制实例的攻略。 前置知识 在进行本次攻略前,请确保您已经掌握了以下内容: SpringMVC框架的基础知识 Mybatis框架的基础知识 存储过程的基础知识 事务控制的基础知识 实现步骤 接下来,我们来详细讲解如何实现SpringMVC与Mybatis集合实现调用存储过程、事务控制。 …

    database 2023年5月21日
    00
  • Mysql锁机制之行锁、表锁、死锁的实现

    Mysql锁机制是保证数据库并发访问的重要手段,它包括行锁和表锁两种形式,同时也存在死锁的情况。下面我们来一一讲解。 行锁 行锁指的是针对数据库表中的行,对其进行锁定。行锁机制的粒度很细,能够互不影响的锁定多个行。MySQL行级锁使用的是innodb引擎。 在MySQL中,行锁存在一种“共享锁”和“排它锁”的两种形式。 共享锁 共享锁是针对行级别的读加锁,多…

    database 2023年5月22日
    00
  • 如何使用Python查询某个列中的唯一值?

    以下是如何使用Python查询某个列中的唯一值的完整使用攻略。 步骤1:导入模块 在Python中,我们需要导入相应的模块来连接数据库和查询操作。以下是导入mysql-connector-python模块的基本语法: import mysql.connector 以下是导入psycopg2模块的基本语法: import psycopg2 步骤2:连接数据库 …

    python 2023年5月12日
    00
合作推广
合作推广
分享本页
返回顶部