Canal是一种基于MySQL的数据库增量订阅&消费框架,可用于数据同步、数据监控等应用场景。本篇攻略将详细介绍如何搭建Canal,并使用idea设置及采集数据到kafka的操作方法。
环境准备
在进行Canal搭建之前,请确保以下环境已经准备好:
- Java环境:1.8及以上版本
- MySQL数据库:5.6及以上版本
- ZooKeeper:3.4.x版本
- Kafka:0.10.x版本
- Canal:1.1.x版本
Canal搭建
下载Canal
可以在Canal的官网下载最新版的Canal:https://github.com/alibaba/canal/releases
解压安装Canal
使用以下命令将Canal压缩包解压到指定目录:
tar -zxvf canal.deployer-x.x.x.tar.gz -C /home/canal/
其中,x.x.x表示Canal的版本号,/home/canal/表示解压的目录。
配置Canal
进入Canal的conf目录,修改instance.properties文件,配置MySQL和Kafka的相关信息:
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=.*\\..*
canal.instance.kafka.topic=canal_test
canal.instance.kafka.bootstrap.servers=127.0.0.1:9092
- canal.instance.master.address:MySQL的地址和端口号
- canal.instance.dbUsername:MySQL数据库的用户名
- canal.instance.dbPassword:MySQL数据库的密码
- canal.instance.kafka.topic:Canal将会向该Kafka topic发送增量数据
- canal.instance.kafka.bootstrap.servers:Kafka的地址和端口号
启动Canal
进入Canal的bin目录,执行以下命令启动Canal:
sh startup.sh
使用以下命令查看Canal的启动情况:
jps | grep CanalLauncher
如果可以看到类似如下输出,则说明Canal启动成功:
CanalLauncher
验证Canal
使用以下命令可以连接到Canal的管理控制台:
telnet 127.0.0.1 11111
登录成功后,使用show命令可以显示Canal的连接情况,如下所示:
canal> show
Your destination(s) are:
| default |
Current active destination is: default
至此,Canal的搭建工作已经完成。
idea设置及采集数据到kafka
在使用Canal采集数据到kafka之前,需要添加相应的依赖包。
添加Canal依赖包
在idea中创建一个Maven项目,打开pom.xml文件,在dependencies标签中添加如下依赖:
<dependency>
<groupId>com.alibaba.otter.canal</groupId>
<artifactId>client</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
使用Canal采集数据到kafka
Canal提供了一个Java客户端,可以实现将MySQL数据采集到kafka中。
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.common.utils.PropertiesManager;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.net.InetSocketAddress;
import java.util.Properties;
public class CanalKafkaClient {
// Canal服务器地址
private String canalServerAddress;
// Canal服务器端口号
private Integer canalServerPort;
// Canal实例名称
private String canalInstanceName;
// 目标kafka topic名称
private String kafkaTopicName;
// 目标kafka服务器地址
private String kafkaBootstrapServers;
// 目标kafka消息序列化方式
private String kafkaValueSerializer;
// 目标kafka消息字节数据前缀
private String kafkaValuePrefix;
// Canal连接类型(单节点或集群)
private String canalConnectType;
// Canal连接方式对应的组件
private CanalConnector canalConnector;
public CanalKafkaClient(String canalServerAddress, Integer canalServerPort, String canalInstanceName,
String kafkaTopicName, String kafkaBootstrapServers, String kafkaValueSerializer,
String kafkaValuePrefix, String canalConnectType) {
this.canalServerAddress = canalServerAddress;
this.canalServerPort = canalServerPort;
this.canalInstanceName = canalInstanceName;
this.kafkaTopicName = kafkaTopicName;
this.kafkaBootstrapServers = kafkaBootstrapServers;
this.kafkaValueSerializer = kafkaValueSerializer;
this.kafkaValuePrefix = kafkaValuePrefix;
this.canalConnectType = canalConnectType;
}
public void start() {
canalConnector = canalConnectType.equals("cluster") ? new ClusterCanalConnector(canalServerAddress,
canalServerPort, canalInstanceName, null, null) :
new SimpleCanalConnector(new InetSocketAddress(canalServerAddress, canalServerPort),
canalInstanceName, "", "");
while (true) {
try {
canalConnector.connect();
canalConnector.subscribe(".*\\..*");
canalConnector.rollback();
while (true) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", kafkaBootstrapServers);
properties.setProperty("value.serializer", kafkaValueSerializer);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
JSONObject jsonObject = new JSONObject();
jsonObject.put("rows", canalConnector.getWithoutAck(1000).getEntries());
jsonObject.put("prefix", StringUtils.isNotBlank(kafkaValuePrefix) ? kafkaValuePrefix : "");
if (jsonObject.getJSONArray("rows").size() > 0) {
producer.send(new ProducerRecord<>(kafkaTopicName, null, jsonObject.toJSONString()));
}
producer.close();
}
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
}
} finally {
canalConnector.disconnect();
}
}
}
public static void main(String[] args) {
String canalServerAddress = AddressUtils.getHostIp();
Integer canalServerPort = 11111;
String canalInstanceName = "example";
String kafkaTopicName = "canal_test";
String kafkaBootstrapServers = "127.0.0.1:9092";
String kafkaValueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
String kafkaValuePrefix = "c_";
String canalConnectType = "single";
CanalKafkaClient canalKafkaClient = new CanalKafkaClient(canalServerAddress, canalServerPort, canalInstanceName,
kafkaTopicName, kafkaBootstrapServers, kafkaValueSerializer, kafkaValuePrefix, canalConnectType);
canalKafkaClient.start();
}
}
该代码可以将采集到的MySQL增量数据转换成JSON格式,并发送到指定的kafka topic中。
示例说明
下面是一个实际示例,演示如何使用Canal采集数据到kafka。
-
首先启动ZooKeeper和Kafka。
-
下载并解压Canal,并按照上述步骤进行配置和启动。确认Canal已经启动成功。
-
在kafka中创建一个topic,例如test。
-
使用上面提到的CanalKafkaClient将采集到的数据发送到kafka中。
-
启动CanalKafkaClient,即可将MySQL中的数据采集到kafka的test topic中。
-
使用kafka的命令行客户端,可以查看到被采集到的数据。
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Canal搭建 idea设置及采集数据到kafka的操作方法 - Python技术站