背景介绍
Docker 是一种容器化技术,具有可移植性、扩展性和可靠性等优点,能够帮助开发者更方便地构建、发布和运行应用程序。而 Canal 是阿里巴巴开源的一套 MySQL 数据库增量订阅和消费组件,它通过解析 MySQL 的 binlog,实时将增量数据同步到 RabbitMQ、Kafka 和 Canal Server 等中间件中。在本文中,我们将分享如何基于 Docker 结合 Canal 实现 MySQL 实时增量数据传输功能的完整攻略。
步骤说明
第一步:安装 Docker 和 Docker-Compose
在安装 Docker 和 Docker-Compose 前先确认是否已经安装了 Git。安装完 Git 后,则可以利用 Git 下载 Docker 和 Docker-Compose 两个安装包,其中 Docker 是构建和发布容器的工具,Docker-Compose 是 Docker 的服务编排工具。
第二步:创建 Dockerfile
在创建 Dockerfile 时,需指明该镜像是基于官方 MySQL 镜像构建的,安装 Cana 环境的基础镜像,以及从服务器端将私有仓库镜像拉取到本地所需的环境等。
FROM mysql:5.7
MAINTAINER Canal-Server <Canal@example.com>
ADD ./sources.list /etc/apt/
RUN apt-get update && apt-get install -y supervisor && apt-get clean && rm -rf /var/lib/apt/lists/*
RUN mkdir -p /usr/local/service
COPY ./mysql.ini /usr/local/service/
COPY ./supervisor.conf /etc/supervisor/conf.d/
ADD ./cana-bin.tar.gz /usr/local/service/
CMD ["/usr/bin/supervisord","-n"]
第三步:编写 Docker-Compose 文件
使用 Docker-Compose 实现在容器内部启动 Canal-Server、MySQL 和 Canal-Client。Docker-Compose 使用 YAML 文件编写配置信息,将 Canai-Server、MySQL 和 Canal-Client 打包成三个服务。其中,Canal-Server 和 MySQL 服务在后台运行,而 Canai-Client 则需要获取待同步的表信息,再进行增量数据同步。
version: '3.4'
services:
canal:
container_name: canal
build: .
restart: always
ports:
- "11111:11111"
- "11112:11112"
- "3306:3306"
volumes:
- ./mysql-data:/var/lib/mysql
- ./canal-server:/usr/local/service/
depends_on:
- mysql
links:
- mysql
command: bash -c "cd /usr/local/service/bin && start.sh"
mysql:
image: mysql:5.7
container_name: mysql
restart: always
environment:
MYSQL_ROOT_PASSWORD: pwd
TZ: Asia/Shanghai
ports:
- "3306:3306"
volumes:
- ./mysql-data:/var/lib/mysql
- ./mysql-conf:/etc/mysql/conf.d
command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
canal_client:
container_name: canal_client
build: ./canal-client
working_dir: /usr/src/app
command: python app.py
environment:
- PYTHONUNBUFFERED=0
- DATA_PORT=11112
- CANAL_IP=canal
- CANAL_PORT=11111
- MYSQL_HOST=mysql
- MYSQL_PORT=3306
- MYSQL_USER=root
- MYSQL_PASSWORD=pwd
volumes:
- ./canal-client:/usr/src/app
depends_on:
- mysql
- canal
第四步:创建 Canal-Client 和 MySQL 实例
通过 Canal-Client 将 MySQL 数据库表中的数据实时同步到 Canal RabbitMQ 中,并在 RabbitMQ 中处理数据,以便进行任意消费。
#!/usr/bin/python
import os
from src.clientCanal.canalApi import CanalApi
from src.clientCanal.format.format import filter_uncode
from src.clientCanal.extra.connect_rabbitmq import enqueue_rabbitmq
if __name__ == '__main__':
canal_api = CanalApi(os.getenv("CANAL_IP"), os.getenv("CANAL_PORT"), os.getenv("MYSQL_HOST"),
os.getenv("MYSQL_PORT"),
os.getenv("MYSQL_USER"), os.getenv("MYSQL_PASSWORD"), os.getenv("DATA_PORT"))
canal_api.init_client()
while True:
message = canal_api.get_batch_message()
records = filter_uncode(message['data'])
for record in records:
if 'before' in record:
record.pop('before')
enqueue_rabbitmq(record, os.getenv("ENQUEUE_QUEUE"))
第五步:从 Canal Server 拉取增量数据
在我们实现数据同步之前,需要对 Canal-Server 进行数据同步处理。可以通过调用 Canal 官方提供的 API 接口获取待同步表的名称,从而创建 Canal 基本信息,并调用 Canal 官方提供的命令,拉取增量数据。在 config.yml 文件中配置待同步的表名称、RabbitMQ 地址和 RabbitMQ 用户名密码等信息。
canalServer: "172.31.3.131:11111"
MySQLConf:
- host: "172.31.3.131"
port: 3306
db: "test"
username: "root"
password: "password"
rabbitmq:
username: "root"
password: "password"
host: "172.31.3.131"
virtualhost: "/"
queue: "my_queue"
exchange: "my_exchange"
routingKey: "my_routingKey"
table: ["test_table1", "test_table2"]
第六步:查看同步数据
最后,我们在 RabbitMQ 上查看待同步的数据是否成功发送到该中间件中。
示例说明一
在本例中,我们将演示如何通过 Canai-Server 和 RabbitMQ 实现两个不同 MySQL 数据表之间的实时数据同步。首先,下载 Canai-Server 镜像和 RabbitMQ 镜像,并通过 Docker-Compose 启动容器。
$ mkdir mysql-data
$ sudo chmod 777 mysql-data
$ docker-compose up -d
接着,编辑 config.yml 文件,指定待同步的 MySQL 数据表名称、RabbitMQ 配置信息,并将 config.yml 文件和 Canal 数据库 jar 包挂载到 Canal-Server 容器内。
canalServer: "172.31.3.131:11111"
MySQLConf:
- host: "172.31.3.131"
port: 3306
db: "test"
username: "root"
password: "password"
rabbitmq:
username: "root"
password: "password"
host: "172.31.3.131"
virtualhost: "/"
queue: "my_queue"
exchange: "my_exchange"
routingKey: "my_routingKey"
table: ["test_table1", "test_table2"]
最后,在 Canal-Client 目录下启动数据同步服务。
$ cd canal-client
$ python app.py
示例说明二
我们将演示如何通过 Canal 完成 MySQL 数据库和 HBase 之间的实时增量数据传输。首先,启动 MySQL、Canal 和 HBase 三个服务。
$mkdir mysql-data
$sudo chmod 777 mysql-data
$ docker-compose up -d
接着,通过 Canal-Client 拉取 MySQL 数据表的增量数据,并调用 HBase 数据接口,将这些增量数据保存到 HBase 中。由于 HBase 的写入速度很慢,因此在 Canal-Client 中使用线程池实现并发写入。
#!/usr/bin/env python
# coding: utf-8
import os
import threading
import Queue
from src.clientCanal.canalApi import CanalApi
from src.clientCanal.format.format import filter_utf8
from src.clientCanal.extra.connect_hbase import get_hbase_conn, set_table_value, bulk_set_table_values
TABLE = ''
HBASE_FAMILY = 'info'
class WriteToHbaseThread(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
def run(self):
while True:
row_key, data = self.queue.get()
set_table_value(TABLE, row_key, data, HBASE_FAMILY)
self.queue.task_done()
def process_data_queue(rows, queue):
if rows:
for row in rows:
data = filter_utf8(row[1]['data']) # 去除一些其他应用导致的乱码
row_key = str(row[1]['table']) + "_" + str(row[1]['id'])
queue.put((row_key, str(data)))
if __name__ == '__main__':
conn = get_hbase_conn()
TABLE = os.environ.get('TABLE', 'test')
canal_api = CanalApi(os.getenv("CANAL_IP"), os.getenv("CANAL_PORT"), os.getenv("MYSQL_HOST"),
os.getenv("MYSQL_PORT"),
os.getenv("MYSQL_USER"), os.getenv("MYSQL_PASSWORD"),
os.getenv("CANAL_SYNC_TABLE_NAME"))
canal_api.init_client()
write_queue = Queue.Queue()
for x in range(4):
ith_thread = WriteToHbaseThread(write_queue)
ith_thread.daemon = True
ith_thread.start()
while True:
message = canal_api.get_batch_message()
process_data_queue(message['rows'], write_queue)
write_queue.join()
接着,在 config.yml 中配置待同步的信息。
canalServer: "172.31.3.131:11111"
MySQLConf:
- host: "172.31.3.131"
port: 3306
db: "test"
username: "root"
password: "password"
rabbitmq:
username: "root"
password: "password"
host: "172.31.3.131"
virtualhost: "/"
queue: "my_queue"
exchange: "my_exchange"
routingKey: "my_routingKey"
table: ["test_table1", "test_table2"]
hbase:
host: "172.31.3.131"
port: 9090
最后,启动 Canal-Client,将数据同步到 HBase 中。
$ cd canal-client
$ python app.py
结论
基于 Docker 结合 Canal 实现 MySQL 实时增量数据传输功能的完整攻略是一项非常实用的工作,我们希望本文介绍的所有示例都能够帮助读者更深入地了解 Canal 和 Docker 的应用,同时也希望读者能够通过本文的指导,掌握 Canal 的使用技能,提升数据处理效率,实现更好的业务价值。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:基于Docker结合Canal实现MySQL实时增量数据传输功能 - Python技术站