Canal搭建 idea设置及采集数据到kafka的操作方法

Canal是一种基于MySQL的数据库增量订阅&消费框架,可用于数据同步、数据监控等应用场景。本篇攻略将详细介绍如何搭建Canal,并使用idea设置及采集数据到kafka的操作方法。

环境准备

在进行Canal搭建之前,请确保以下环境已经准备好:

  • Java环境:1.8及以上版本
  • MySQL数据库:5.6及以上版本
  • ZooKeeper:3.4.x版本
  • Kafka:0.10.x版本
  • Canal:1.1.x版本

Canal搭建

下载Canal

可以在Canal的官网下载最新版的Canal:https://github.com/alibaba/canal/releases

解压安装Canal

使用以下命令将Canal压缩包解压到指定目录:

tar -zxvf canal.deployer-x.x.x.tar.gz -C /home/canal/

其中,x.x.x表示Canal的版本号,/home/canal/表示解压的目录。

配置Canal

进入Canal的conf目录,修改instance.properties文件,配置MySQL和Kafka的相关信息:

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=.*\\..*
canal.instance.kafka.topic=canal_test
canal.instance.kafka.bootstrap.servers=127.0.0.1:9092
  • canal.instance.master.address:MySQL的地址和端口号
  • canal.instance.dbUsername:MySQL数据库的用户名
  • canal.instance.dbPassword:MySQL数据库的密码
  • canal.instance.kafka.topic:Canal将会向该Kafka topic发送增量数据
  • canal.instance.kafka.bootstrap.servers:Kafka的地址和端口号

启动Canal

进入Canal的bin目录,执行以下命令启动Canal:

sh startup.sh

使用以下命令查看Canal的启动情况:

jps | grep CanalLauncher

如果可以看到类似如下输出,则说明Canal启动成功:

CanalLauncher

验证Canal

使用以下命令可以连接到Canal的管理控制台:

telnet 127.0.0.1 11111

登录成功后,使用show命令可以显示Canal的连接情况,如下所示:

canal> show
Your destination(s) are:
| default |
Current active destination is: default

至此,Canal的搭建工作已经完成。

idea设置及采集数据到kafka

在使用Canal采集数据到kafka之前,需要添加相应的依赖包。

添加Canal依赖包

在idea中创建一个Maven项目,打开pom.xml文件,在dependencies标签中添加如下依赖:

<dependency>
    <groupId>com.alibaba.otter.canal</groupId>
    <artifactId>client</artifactId>
    <version>1.1.5</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>

使用Canal采集数据到kafka

Canal提供了一个Java客户端,可以实现将MySQL数据采集到kafka中。

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.impl.ClusterCanalConnector;
import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.common.utils.PropertiesManager;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.net.InetSocketAddress;
import java.util.Properties;

public class CanalKafkaClient {
    // Canal服务器地址
    private String canalServerAddress;
    // Canal服务器端口号
    private Integer canalServerPort;
    // Canal实例名称
    private String canalInstanceName;
    // 目标kafka topic名称
    private String kafkaTopicName;
    // 目标kafka服务器地址
    private String kafkaBootstrapServers;
    // 目标kafka消息序列化方式
    private String kafkaValueSerializer;
    // 目标kafka消息字节数据前缀
    private String kafkaValuePrefix;
    // Canal连接类型(单节点或集群)
    private String canalConnectType;
    // Canal连接方式对应的组件
    private CanalConnector canalConnector;

    public CanalKafkaClient(String canalServerAddress, Integer canalServerPort, String canalInstanceName,
                            String kafkaTopicName, String kafkaBootstrapServers, String kafkaValueSerializer,
                            String kafkaValuePrefix, String canalConnectType) {
        this.canalServerAddress = canalServerAddress;
        this.canalServerPort = canalServerPort;
        this.canalInstanceName = canalInstanceName;
        this.kafkaTopicName = kafkaTopicName;
        this.kafkaBootstrapServers = kafkaBootstrapServers;
        this.kafkaValueSerializer = kafkaValueSerializer;
        this.kafkaValuePrefix = kafkaValuePrefix;
        this.canalConnectType = canalConnectType;
    }

    public void start() {
        canalConnector = canalConnectType.equals("cluster") ? new ClusterCanalConnector(canalServerAddress,
                canalServerPort, canalInstanceName, null, null) :
                new SimpleCanalConnector(new InetSocketAddress(canalServerAddress, canalServerPort),
                        canalInstanceName, "", "");
        while (true) {
            try {
                canalConnector.connect();
                canalConnector.subscribe(".*\\..*");
                canalConnector.rollback();
                while (true) {
                    Properties properties = new Properties();
                    properties.setProperty("bootstrap.servers", kafkaBootstrapServers);
                    properties.setProperty("value.serializer", kafkaValueSerializer);
                    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
                    JSONObject jsonObject = new JSONObject();
                    jsonObject.put("rows", canalConnector.getWithoutAck(1000).getEntries());
                    jsonObject.put("prefix", StringUtils.isNotBlank(kafkaValuePrefix) ? kafkaValuePrefix : "");
                    if (jsonObject.getJSONArray("rows").size() > 0) {
                        producer.send(new ProducerRecord<>(kafkaTopicName, null, jsonObject.toJSONString()));
                    }
                    producer.close();
                }
            } catch (Exception e) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                }
            } finally {
                canalConnector.disconnect();
            }
        }
    }

    public static void main(String[] args) {
        String canalServerAddress = AddressUtils.getHostIp();
        Integer canalServerPort = 11111;
        String canalInstanceName = "example";
        String kafkaTopicName = "canal_test";
        String kafkaBootstrapServers = "127.0.0.1:9092";
        String kafkaValueSerializer = "org.apache.kafka.common.serialization.StringSerializer";
        String kafkaValuePrefix = "c_";
        String canalConnectType = "single";
        CanalKafkaClient canalKafkaClient = new CanalKafkaClient(canalServerAddress, canalServerPort, canalInstanceName,
                kafkaTopicName, kafkaBootstrapServers, kafkaValueSerializer, kafkaValuePrefix, canalConnectType);
        canalKafkaClient.start();
    }
}

该代码可以将采集到的MySQL增量数据转换成JSON格式,并发送到指定的kafka topic中。

示例说明

下面是一个实际示例,演示如何使用Canal采集数据到kafka。

  1. 首先启动ZooKeeper和Kafka。

  2. 下载并解压Canal,并按照上述步骤进行配置和启动。确认Canal已经启动成功。

  3. 在kafka中创建一个topic,例如test。

  4. 使用上面提到的CanalKafkaClient将采集到的数据发送到kafka中。

  5. 启动CanalKafkaClient,即可将MySQL中的数据采集到kafka的test topic中。

  6. 使用kafka的命令行客户端,可以查看到被采集到的数据。

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Canal搭建 idea设置及采集数据到kafka的操作方法 - Python技术站

(0)
上一篇 2023年6月2日
下一篇 2023年6月2日

相关文章

  • jsp hibernate的分页代码第2/3页

    针对“jsp hibernate的分页代码第2/3页”,我将提供以下完整攻略: JSP Hibernate 分页代码攻略 什么是 JSP 分页? JSP 分页是指将大量数据分页展示在网页上,每页展示固定的数据量并提供用户进行翻页查看的方式,以便更好地展示数据和提升用户体验。 基于 Hibernate 的 JSP 分页 Hibernate 是一款开源的 ORM…

    Java 2023年5月31日
    00
  • 基于spring mvc请求controller访问方式

    基于Spring MVC请求Controller访问方式的完整攻略 Spring MVC是一种基于Java的Web框架,它可以帮助我们快速开发Web应用程序。在Spring MVC中,我们可以使用Controller来处理请求,并返回响应结果。本文将介绍如何使用Spring MVC请求Controller访问方式,并提供两个示例说明。 步骤一:创建Contr…

    Java 2023年5月17日
    00
  • Java基础之常用的命令行指令

    Java基础之常用的命令行指令 在使用Java开发中,经常需要在命令行中执行一些操作,例如编译、运行Java程序等。下面是常用的命令行指令及其作用。 javac javac是Java编译器,可以将Java源代码编译成Java字节码文件(.class文件)。使用方法如下: javac HelloWorld.java 上述指令将会编译HelloWorld.jav…

    Java 2023年5月19日
    00
  • Spring Boot 日志配置方法(超详细)

    Spring Boot日志配置方法(超详细) Spring Boot是一个非常流行的Java开发框架,它提供了多种日志框架,包括Logback、Log4j2、Java Util Logging等。本文将详细介绍Spring Boot日志配置方法,包括配置文件、注解、代码等。 1. 配置文件 Spring Boot的日志配置文件是application.pro…

    Java 2023年5月14日
    00
  • 详解SpringBoot如何开启异步编程

    详解SpringBoot如何开启异步编程 在SpringBoot中,开启异步编程可以大大提高应用程序的性能,提升用户体验。本文将详细介绍SpringBoot如何实现异步编程。 添加异步编程依赖 要使用异步编程,首先需要在项目的pom.xml文件中添加异步编程相关的依赖。 <!– 使用异步编程 –> <dependency> &lt…

    Java 2023年5月19日
    00
  • SpringBoot打印启动时异常堆栈信息详解

    讲解SpringBoot打印启动时异常堆栈信息的完整攻略,具体步骤如下: 1. 开启Debug模式 在SpringBoot启动类中,添加以下代码: @SpringBootApplication public class DemoApplication { public static void main(String[] args) { // 开启Debug模…

    Java 2023年5月27日
    00
  • Springboot工具类ReflectionUtils使用教程

    下面我将详细讲解“Springboot工具类ReflectionUtils使用教程”。 Springboot工具类ReflectionUtils使用教程 简介 在Java开发中,我们有时需要使用反射来获取或修改某些对象的属性或方法,而这个过程其实是比较繁琐的。Spring框架提供了一个工具类ReflectionUtils,能够方便地使用反射来快速获取或修改对…

    Java 2023年5月19日
    00
  • springmvc+shiro+maven 实现登录认证与权限授权管理

    接下来我将为您详细讲解“springmvc+shiro+maven 实现登录认证与权限授权管理”的完整攻略。 1. 环境准备 首先需要搭建好SpringMVC和Maven的环境,可使用IDEA等开发工具自行创建空白项目。 2. pom.xml配置 为项目引入SpringMVC和Shiro的依赖包,具体如下: <!–SpringMVC依赖包–>…

    Java 2023年5月19日
    00
合作推广
合作推广
分享本页
返回顶部