Canal搭建 idea设置及采集数据到kafka的操作方法

Canal是一种基于MySQL的数据库增量订阅&消费框架,可用于数据同步、数据监控等应用场景。本篇攻略将详细介绍如何搭建Canal,并使用idea设置及采集数据到kafka的操作方法。

环境准备

在进行Canal搭建之前,请确保以下环境已经准备好:

  • Java环境:1.8及以上版本
  • MySQL数据库:5.6及以上版本
  • ZooKeeper:3.4.x版本
  • Kafka:0.10.x版本
  • Canal:1.1.x版本

Canal搭建

下载Canal

可以在Canal的官网下载最新版的Canal:https://github.com/alibaba/canal/releases

解压安装Canal

使用以下命令将Canal压缩包解压到指定目录:

tar -zxvf canal.deployer-x.x.x.tar.gz -C /home/canal/

其中,x.x.x表示Canal的版本号,/home/canal/表示解压的目录。

配置Canal

进入Canal的conf目录,修改instance.properties文件,配置MySQL和Kafka的相关信息:

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=.*\\..*
canal.instance.kafka.topic=canal_test
canal.instance.kafka.bootstrap.servers=127.0.0.1:9092
  • canal.instance.master.address:MySQL的地址和端口号
  • canal.instance.dbUsername:MySQL数据库的用户名
  • canal.instance.dbPassword:MySQL数据库的密码
  • canal.instance.kafka.topic:Canal将会向该Kafka topic发送增量数据
  • canal.instance.kafka.bootstrap.servers:Kafka的地址和端口号

启动Canal

进入Canal的bin目录,执行以下命令启动Canal:

sh startup.sh

使用以下命令查看Canal的启动情况:

jps | grep CanalLauncher

如果可以看到类似如下输出,则说明Canal启动成功:

CanalLauncher

验证Canal

使用以下命令可以连接到Canal的管理控制台:

telnet 127.0.0.1 11111

登录成功后,使用show命令可以显示Canal的连接情况,如下所示:

canal> show
Your destination(s) are:
| default |
Current active destination is: default

至此,Canal的搭建工作已经完成。

idea设置及采集数据到kafka

在使用Canal采集数据到kafka之前,需要添加相应的依赖包。

添加Canal依赖包

在idea中创建一个Maven项目,打开pom.xml文件,在dependencies标签中添加如下依赖:

<dependency>
    <groupId>com.alibaba.otter.canal</groupId>
    <artifactId>client</artifactId>
    <version>1.1.5</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>

使用Canal采集数据到kafka

Canal提供了一个Java客户端,可以实现将MySQL数据采集到kafka中。

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.common.utils.PropertiesManager;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.net.InetSocketAddress;
import java.util.Properties;

public class CanalKafkaClient {
    // Canal服务器地址
    private String canalServerAddress;
    // Canal服务器端口号
    private Integer canalServerPort;
    // Canal实例名称
    private String canalInstanceName;
    // 目标kafka topic名称
    private String kafkaTopicName;
    // 目标kafka服务器地址
    private String kafkaBootstrapServers;
    // 目标kafka消息序列化方式
    private String kafkaValueSerializer;
    // 目标kafka消息字节数据前缀
    private String kafkaValuePrefix;
    // Canal连接类型(单节点或集群)
    private String canalConnectType;
    // Canal连接方式对应的组件
    private CanalConnector canalConnector;

    public CanalKafkaClient(String canalServerAddress, Integer canalServerPort, String canalInstanceName,
                            String kafkaTopicName, String kafkaBootstrapServers, String kafkaValueSerializer,
                            String kafkaValuePrefix, String canalConnectType) {
        this.canalServerAddress = canalServerAddress;
        this.canalServerPort = canalServerPort;
        this.canalInstanceName = canalInstanceName;
        this.kafkaTopicName = kafkaTopicName;
        this.kafkaBootstrapServers = kafkaBootstrapServers;
        this.kafkaValueSerializer = kafkaValueSerializer;
        this.kafkaValuePrefix = kafkaValuePrefix;
        this.canalConnectType = canalConnectType;
    }

    public void start() {
        canalConnector = canalConnectType.equals("cluster") ? new ClusterCanalConnector(canalServerAddress,
                canalServerPort, canalInstanceName, null, null) :
                new SimpleCanalConnector(new InetSocketAddress(canalServerAddress, canalServerPort),
                        canalInstanceName, "", "");
        while (true) {
            try {
                canalConnector.connect();
                canalConnector.subscribe(".*\\..*");
                canalConnector.rollback();
                while (true) {
                    Properties properties = new Properties();
                    properties.setProperty("bootstrap.servers", kafkaBootstrapServers);
                    properties.setProperty("value.serializer", kafkaValueSerializer);
                    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("rows", canalConnector.getWithoutAck(1000).getEntries());
                    jsonObject.put("prefix", StringUtils.isNotBlank(kafkaValuePrefix) ? kafkaValuePrefix : "");
                    if (jsonObject.getJSONArray("rows").size() > 0) {
                        producer.send(new ProducerRecord<>(kafkaTopicName, null, jsonObject.toJSONString()));
                    }
                    producer.close();
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                }
            } finally {
                canalConnector.disconnect();
            }
        }
    }

    public static void main(String[] args) {
        String canalServerAddress = AddressUtils.getHostIp();
        Integer canalServerPort = 11111;
        String canalInstanceName = "example";
        String kafkaTopicName = "canal_test";
        String kafkaBootstrapServers = "127.0.0.1:9092";
        String kafkaValueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
        String kafkaValuePrefix = "c_";
        String canalConnectType = "single";
        CanalKafkaClient canalKafkaClient = new CanalKafkaClient(canalServerAddress, canalServerPort, canalInstanceName,
                kafkaTopicName, kafkaBootstrapServers, kafkaValueSerializer, kafkaValuePrefix, canalConnectType);
        canalKafkaClient.start();
    }
}

该代码可以将采集到的MySQL增量数据转换成JSON格式,并发送到指定的kafka topic中。

示例说明

下面是一个实际示例,演示如何使用Canal采集数据到kafka。

  1. 首先启动ZooKeeper和Kafka。

  2. 下载并解压Canal,并按照上述步骤进行配置和启动。确认Canal已经启动成功。

  3. 在kafka中创建一个topic,例如test。

  4. 使用上面提到的CanalKafkaClient将采集到的数据发送到kafka中。

  5. 启动CanalKafkaClient,即可将MySQL中的数据采集到kafka的test topic中。

  6. 使用kafka的命令行客户端,可以查看到被采集到的数据。

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Canal搭建 idea设置及采集数据到kafka的操作方法 - Python技术站

(0)
上一篇 2023年6月2日
下一篇 2023年6月2日

相关文章

  • 使用JDBC在MySQL数据库中如何快速批量插入数据

    使用JDBC在MySQL数据库中进行批量插入数据可以大大提高数据插入的效率。以下是详细步骤: 1.导入MySQL JDBC驱动 首先需要在Java项目中导入MySQL JDBC驱动包,这里以MySQL 8为例,可以从以下链接中下载:https://dev.mysql.com/downloads/connector/j/ 2.创建JDBC连接 使用JDBC连接…

    Java 2023年6月16日
    00
  • Java 常见的几种内存溢出异常的原因及解决

    Java 常见的几种内存溢出异常的原因及解决 简介 Java 是一门内存管理的语言,它自带了垃圾回收器能够自动地清理无用对象以释放内存空间。但是,在一些特定情况下(如长时间运行、大量对象创建等),Java 应用程序可能会出现内存溢出的异常,导致程序崩溃。这篇文章将会讲解 Java 中常见的几种内存溢出异常的原因及解决方法。 原因及解决方法 堆溢出 堆是 Ja…

    Java 2023年5月28日
    00
  • 深入解析kafka 架构原理

    当我们使用 Apache Kafka 作为消息中心时,需要了解它的架构原理,以便更好地在应用程序中使用它。 Kafka 架构 Kafka 是一个分布式发布订阅消息系统。它的主要组件包括: Broker – 处理传入和传出的消息并维护消息的存储 Topic – 是发布和订阅消息的名称 Partition – 一个主题可能被分成多个分区。每个分区都是一个有序的,…

    Java 2023年6月2日
    00
  • Java 关于时间复杂度和空间复杂度的深度刨析

    Java 关于时间复杂度和空间复杂度的深度刨析 时间复杂度和空间复杂度是算法中非常重要的概念,它们可以帮助我们衡量算法的效率。本文将对它们进行深度探讨,并用实例进行说明。 时间复杂度 时间复杂度是指算法执行所需要的时间,通常使用O(n)表示,其中n是输入数据的规模。常见的时间复杂度有: 常数时间复杂度:O(1),无论输入数据量的大小,算法的执行时间都保持不变…

    Java 2023年5月26日
    00
  • layui+jquery支持IE8的表格分页方法

    下面为您详细讲解“layui+jquery支持IE8的表格分页方法”的完整攻略。 简介 Layui是一款轻量级的前端UI框架,兼容性良好,但官方仅支持IE10以上版本。有时候我们需要兼容低版本的IE浏览器,本文就是讲解使用layui+jquery实现支持IE8的表格分页方法。 准备工作 引入Layui和jquery库; 定义表格结构,设置表头等; 引入分页组…

    Java 2023年6月15日
    00
  • 最好的Java 反编译工具的使用对比分析

    最好的Java 反编译工具的使用对比分析 背景 Java 程序开发与运行过程中,难免会遇到需要对已有的 .class 文件进行反编译的情况。这时候,选择一款好用的反编译工具就显得至关重要。本文将介绍目前市面上较为知名的Java 反编译工具并进行对比分析,以帮助读者在实际工作中作出合理的选择。 Java 反编译工具 JD-GUI JD-GUI 是一款免费的Ja…

    Java 2023年5月26日
    00
  • JSP 开发之servlet中调用注入spring管理的dao

    下面是关于 JSP 开发中在 Servlet 中调用注入 Spring 管理的 DAO 的完整攻略: 1. Maven 依赖 首先,在 pom.xml 文件中添加以下依赖: <!– Spring Framework –> <dependency> <groupId>org.springframework</gro…

    Java 2023年6月16日
    00
  • Java(基于Struts2) 分页实现代码

    下面就为您详细讲解“Java(基于Struts2) 分页实现代码”的完整攻略。 一、实现原理 Struts2框架提供了一个简单易用的分页标签库(pagetags),通过这个标签库可以非常方便地实现分页功能。具体实现流程如下: 在JSP页面上引用struts2分页标签库的tld文件。 <%@ taglib uri=”/struts-tags” prefi…

    Java 2023年5月20日
    00
合作推广
合作推广
分享本页
返回顶部