Canal搭建 idea设置及采集数据到kafka的操作方法

Canal是一种基于MySQL的数据库增量订阅&消费框架,可用于数据同步、数据监控等应用场景。本篇攻略将详细介绍如何搭建Canal,并使用idea设置及采集数据到kafka的操作方法。

环境准备

在进行Canal搭建之前,请确保以下环境已经准备好:

  • Java环境:1.8及以上版本
  • MySQL数据库:5.6及以上版本
  • ZooKeeper:3.4.x版本
  • Kafka:0.10.x版本
  • Canal:1.1.x版本

Canal搭建

下载Canal

可以在Canal的官网下载最新版的Canal:https://github.com/alibaba/canal/releases

解压安装Canal

使用以下命令将Canal压缩包解压到指定目录:

tar -zxvf canal.deployer-x.x.x.tar.gz -C /home/canal/

其中,x.x.x表示Canal的版本号,/home/canal/表示解压的目录。

配置Canal

进入Canal的conf目录,修改instance.properties文件,配置MySQL和Kafka的相关信息:

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=.*\\..*
canal.instance.kafka.topic=canal_test
canal.instance.kafka.bootstrap.servers=127.0.0.1:9092
  • canal.instance.master.address:MySQL的地址和端口号
  • canal.instance.dbUsername:MySQL数据库的用户名
  • canal.instance.dbPassword:MySQL数据库的密码
  • canal.instance.kafka.topic:Canal将会向该Kafka topic发送增量数据
  • canal.instance.kafka.bootstrap.servers:Kafka的地址和端口号

启动Canal

进入Canal的bin目录,执行以下命令启动Canal:

sh startup.sh

使用以下命令查看Canal的启动情况:

jps | grep CanalLauncher

如果可以看到类似如下输出,则说明Canal启动成功:

CanalLauncher

验证Canal

使用以下命令可以连接到Canal的管理控制台:

telnet 127.0.0.1 11111

登录成功后,使用show命令可以显示Canal的连接情况,如下所示:

canal> show
Your destination(s) are:
| default |
Current active destination is: default

至此,Canal的搭建工作已经完成。

idea设置及采集数据到kafka

在使用Canal采集数据到kafka之前,需要添加相应的依赖包。

添加Canal依赖包

在idea中创建一个Maven项目,打开pom.xml文件,在dependencies标签中添加如下依赖:

<dependency>
    <groupId>com.alibaba.otter.canal</groupId>
    <artifactId>client</artifactId>
    <version>1.1.5</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>

使用Canal采集数据到kafka

Canal提供了一个Java客户端,可以实现将MySQL数据采集到kafka中。

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.common.utils.PropertiesManager;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.net.InetSocketAddress;
import java.util.Properties;

public class CanalKafkaClient {
    // Canal服务器地址
    private String canalServerAddress;
    // Canal服务器端口号
    private Integer canalServerPort;
    // Canal实例名称
    private String canalInstanceName;
    // 目标kafka topic名称
    private String kafkaTopicName;
    // 目标kafka服务器地址
    private String kafkaBootstrapServers;
    // 目标kafka消息序列化方式
    private String kafkaValueSerializer;
    // 目标kafka消息字节数据前缀
    private String kafkaValuePrefix;
    // Canal连接类型(单节点或集群)
    private String canalConnectType;
    // Canal连接方式对应的组件
    private CanalConnector canalConnector;

    public CanalKafkaClient(String canalServerAddress, Integer canalServerPort, String canalInstanceName,
                            String kafkaTopicName, String kafkaBootstrapServers, String kafkaValueSerializer,
                            String kafkaValuePrefix, String canalConnectType) {
        this.canalServerAddress = canalServerAddress;
        this.canalServerPort = canalServerPort;
        this.canalInstanceName = canalInstanceName;
        this.kafkaTopicName = kafkaTopicName;
        this.kafkaBootstrapServers = kafkaBootstrapServers;
        this.kafkaValueSerializer = kafkaValueSerializer;
        this.kafkaValuePrefix = kafkaValuePrefix;
        this.canalConnectType = canalConnectType;
    }

    public void start() {
        canalConnector = canalConnectType.equals("cluster") ? new ClusterCanalConnector(canalServerAddress,
                canalServerPort, canalInstanceName, null, null) :
                new SimpleCanalConnector(new InetSocketAddress(canalServerAddress, canalServerPort),
                        canalInstanceName, "", "");
        while (true) {
            try {
                canalConnector.connect();
                canalConnector.subscribe(".*\\..*");
                canalConnector.rollback();
                while (true) {
                    Properties properties = new Properties();
                    properties.setProperty("bootstrap.servers", kafkaBootstrapServers);
                    properties.setProperty("value.serializer", kafkaValueSerializer);
                    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("rows", canalConnector.getWithoutAck(1000).getEntries());
                    jsonObject.put("prefix", StringUtils.isNotBlank(kafkaValuePrefix) ? kafkaValuePrefix : "");
                    if (jsonObject.getJSONArray("rows").size() > 0) {
                        producer.send(new ProducerRecord<>(kafkaTopicName, null, jsonObject.toJSONString()));
                    }
                    producer.close();
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                }
            } finally {
                canalConnector.disconnect();
            }
        }
    }

    public static void main(String[] args) {
        String canalServerAddress = AddressUtils.getHostIp();
        Integer canalServerPort = 11111;
        String canalInstanceName = "example";
        String kafkaTopicName = "canal_test";
        String kafkaBootstrapServers = "127.0.0.1:9092";
        String kafkaValueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
        String kafkaValuePrefix = "c_";
        String canalConnectType = "single";
        CanalKafkaClient canalKafkaClient = new CanalKafkaClient(canalServerAddress, canalServerPort, canalInstanceName,
                kafkaTopicName, kafkaBootstrapServers, kafkaValueSerializer, kafkaValuePrefix, canalConnectType);
        canalKafkaClient.start();
    }
}

该代码可以将采集到的MySQL增量数据转换成JSON格式,并发送到指定的kafka topic中。

示例说明

下面是一个实际示例,演示如何使用Canal采集数据到kafka。

  1. 首先启动ZooKeeper和Kafka。

  2. 下载并解压Canal,并按照上述步骤进行配置和启动。确认Canal已经启动成功。

  3. 在kafka中创建一个topic,例如test。

  4. 使用上面提到的CanalKafkaClient将采集到的数据发送到kafka中。

  5. 启动CanalKafkaClient,即可将MySQL中的数据采集到kafka的test topic中。

  6. 使用kafka的命令行客户端,可以查看到被采集到的数据。

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Canal搭建 idea设置及采集数据到kafka的操作方法 - Python技术站

(0)
上一篇 2023年6月2日
下一篇 2023年6月2日

相关文章

  • springboot整合security和vue的实践

    下面我将详细讲解“springboot整合security和vue的实践”的完整攻略。 准备工作 首先我们需要准备好以下工具和软件: Java JDK 1.8 或以上版本 Maven 3.0 或以上版本 Vue CLI Node.js 创建Spring Boot项目 在intelliJ IDEA中创建一个新的Spring Boot项目 在pom.xml中添加…

    Java 2023年5月19日
    00
  • java开发就业信息管理系统

    Java开发就业信息管理系统攻略 1. 确认需求和功能 在开发Java开发就业信息管理系统之前,需要明确系统的需求和功能,例如: 用户管理:包括用户注册、用户登录、用户信息管理等; 招聘信息管理:包括发布招聘信息、浏览招聘信息、投递简历等; 简历管理:包括填写个人简历、上传附件等; 等等。 2. 构建数据库 根据系统的需求和功能,设计相应的数据库结构,包括多…

    Java 2023年5月30日
    00
  • Android异常 java.lang.IllegalStateException解决方法

    下面是详细讲解”Android异常java.lang.IllegalStateException解决方法”的攻略。 1. 异常介绍 IllegalStateException是Java中一个类型为RuntimeException的异常,这是一个运行时异常,它表示当前的状态或操作是非法或不与对象状态相一致。 在Android应用程序中,这个异常通常与生命周期方…

    Java 2023年5月27日
    00
  • tomcat自定义Web部署文件中docBase和workDir的区别介绍

    当我们将Web应用部署到Tomcat服务器上时,可以在Tomcat配置文件中自定义Web应用。在Tomcat配置文件中,有两个重要的属性:docBase和workDir。这两个属性在Tomcat上非常重要,因为它们决定了Web应用的部署位置和缓存位置。 docBase属性 docBase属性指定了Web应用的根目录。Tomcat会在docBase路径下查找W…

    Java 2023年6月15日
    00
  • java 二维数组矩阵乘法的实现方法

    Java二维数组矩阵的乘法实现 矩阵的乘法是一种重要的运算,它是许多计算机程序中的基本操作之一。在Java中,我们可以使用二维数组来表示矩阵,并通过循环来实现矩阵的乘法运算。 矩阵乘法的基本原理 假设我们有两个矩阵A和B: A = [a11 a12 a13] [a21 a22 a23] B = [b11 b12] [b21 b22] [b31 b32] 这里…

    Java 2023年5月26日
    00
  • 解决json字符串序列化后的顺序问题

    关于“解决json字符串序列化后的顺序问题”的问题,我们可以通过以下方法来解决: 方法一:使用有序字典(OrderedDict)进行序列化 在Python的json库中,有序字典(OrderedDict)可以帮助我们保持json字符串序列化后的顺序。在使用json.dumps()方法进行序列化时,我们可以传入参数sort_keys=False,并在json.…

    Java 2023年5月26日
    00
  • java从字符串中提取数字的简单实例

    关于“java从字符串中提取数字的简单实例”,以下是完整攻略: 1. 前言 在开发Java应用程序时,经常需要从字符串中提取数字,这种情况很常见。本文将介绍使用Java代码如何从字符串中提取数字。 2. 通过正则表达式实现 使用正则表达式可以很方便地从字符串中提取数字。代码示例1如下: import java.util.regex.Matcher; impo…

    Java 2023年5月26日
    00
  • Spring jcl及spring core源码深度解析

    首先,我们需要理解“Spring JCL”和“Spring Core”这两个概念。 “Spring JCL”是Spring框架中的一个日志抽象框架,它提供了简单的接口以及灵活的配置方式,使得开发者可以用不同的日志框架进行日志的操作与管理。Spring JCL本身并不提供具体的实现,而是通过SLF4J、Commons Logging、Log4j 2等其他日志框…

    Java 2023年5月19日
    00
合作推广
合作推广
分享本页
返回顶部