Docker部署Kafka以及Spring Kafka实现

下面就是Docker部署Kafka以及Spring Kafka实现的完整攻略:

准备工作

首先,需要安装DockerDocker Compose

然后,创建一个文件夹,名为docker-kafka-spring,用于存放本示例代码和配置文件。

Docker部署Kafka

  1. 在该文件夹下,创建一个名为docker-compose.yml的文件,用于定义所需的Docker容器。
version: '3.8'
services:
  zookeeper:
    image: zookeeper
    container_name: zookeeper
    ports:
      - 2181:2181
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "test-topic:1:1"
    depends_on:
      - zookeeper
  1. 拉取所需的Docker镜像。
docker-compose pull
  1. 启动容器。
docker-compose up -d

现在,Docker中已经启动了ZookeeperKafka容器。

Spring Kafka实现

  1. 添加依赖。

pom.xml文件中添加如下依赖:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.6.7</version>
</dependency>
  1. 编写配置。

application.yml文件中添加如下配置:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: test-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
  1. 发送消息。

编写代码,在main函数中添加如下代码,向test-topic发送消息:

@SpringBootApplication
public class DemoApplication {
  public static void main(String[] args) {
    ConfigurableApplicationContext context = SpringApplication.run(DemoApplication.class, args);
    KafkaTemplate<String, String> kafkaTemplate = context.getBean(KafkaTemplate.class);
    kafkaTemplate.send("test-topic", "test message");
  }
}
  1. 接收消息。

编写代码,创建一个KafkaListener,接收test-topic中的消息。

@Component
public class KafkaConsumer {
  @KafkaListener(topics = "test-topic", groupId = "test-group")
  public void consume(String message) {
    System.out.println("Received message: " + message);
  }
}

现在,使用Maven编译并运行DemoApplication类,可在控制台看到test-topic中的消息被成功消费。

示例

以上是完整的Docker部署Kafka以及Spring Kafka实现的攻略。下面给出两个较为简单的示例。

示例1

  1. 添加依赖。

pom.xml文件中添加如下依赖:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.6.7</version>
</dependency>
  1. 编写配置。

application.yml文件中添加如下配置:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: test-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
  1. 发送消息。

编写代码,在main函数中添加如下代码,向test-topic发送消息:

@SpringBootApplication
public class DemoApplication {
  public static void main(String[] args) {
    ConfigurableApplicationContext context = SpringApplication.run(DemoApplication.class, args);
    KafkaTemplate<String, String> kafkaTemplate = context.getBean(KafkaTemplate.class);
    kafkaTemplate.send("test-topic", "test message 1");
    kafkaTemplate.send("test-topic", "test message 2");
    kafkaTemplate.send("test-topic", "test message 3");
  }
}
  1. 接收消息。

编写代码,创建一个KafkaListener,接收test-topic中的消息。

@Component
public class KafkaConsumer {
  @KafkaListener(topics = "test-topic", groupId = "test-group")
  public void consume(String message) {
    System.out.println("Received message: " + message);
  }
}

现在,使用Maven编译并运行DemoApplication类,可在控制台看到test-topic中的消息被成功消费。

示例2

  1. 添加依赖。

pom.xml文件中添加如下依赖:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.6.7</version>
</dependency>
  1. 编写配置。

application.yml文件中添加如下配置:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: test-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
  1. 发送消息。

编写代码,在main函数中添加如下代码,向test-topic发送消息:

@SpringBootApplication
public class DemoApplication {
  public static void main(String[] args) throws InterruptedException {
    ConfigurableApplicationContext context = SpringApplication.run(DemoApplication.class, args);
    KafkaTemplate<String, String> kafkaTemplate = context.getBean(KafkaTemplate.class);
    Map<String, Object> headers = new HashMap<>();
    headers.put(KafkaHeaders.TOPIC, "test-topic");
    headers.put(KafkaHeaders.MESSAGE_KEY, "test-key");
    headers.put(KafkaHeaders.TIMESTAMP, System.currentTimeMillis());
    kafkaTemplate.send(MessageBuilder
        .createMessage("test message 1", new MessageHeaders(headers)));
    Thread.sleep(2000);
    kafkaTemplate.send(MessageBuilder
        .createMessage("test message 2", new MessageHeaders(headers)));
  }
}
  1. 接收消息。

编写代码,创建一个KafkaListener,接收test-topic中的消息。

@Component
public class KafkaConsumer {
  @KafkaListener(topics = "test-topic", groupId = "test-group")
  public void consume(ConsumerRecord<String, String> record) {
    System.out.println("Received message Key: " + record.key());
    System.out.println("Received message Value: " + record.value());
  }
}

现在,使用Maven编译并运行DemoApplication类,可在控制台看到test-topic中的消息被成功消费。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Docker部署Kafka以及Spring Kafka实现 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • JS注释所产生的bug 即使注释也会执行

    JS注释所产生的bug是指在一些情况下,即使代码中存在注释,这些注释也会被执行而导致程序出现问题。 该问题主要是因为在一些JS引擎中,被注释的代码可能在编译阶段和解析阶段都会被执行,因此如果注释中包含了有效的代码,则这些代码会被直接执行。这就引起了一定的安全隐患,也可能导致代码出现逻辑错误。 下面通过两个示例来说明该问题: 示例一: function tes…

    Java 2023年6月15日
    00
  • IDEA中创建maven项目引入相关依赖无法下载jar问题及解决方案

    下面是详细讲解“IDEA中创建maven项目引入相关依赖无法下载jar问题及解决方案”的完整攻略。 问题描述 在使用IntelliJ IDEA创建Maven项目时,通过编辑POM.XML文件引入相关依赖,但是发现IDEA无法下载所需的JAR包,导致项目无法编译运行。 可能原因 上述依赖库不存在。 依赖库被墙了。 IDEA配置问题。 解决方案 方案一:更改本地…

    Java 2023年5月19日
    00
  • Windows下Java+MyBatis框架+MySQL的开发环境搭建教程

    让我们来详细讲解一下“Windows下Java+MyBatis框架+MySQL的开发环境搭建教程”。 环境要求 在开始搭建之前,确保已经安装以下软件:1. JDK2. MySQL数据库3. Maven4. IDEA或Eclipse开发工具 步骤一:安装MySQL数据库 在官网上下载MySQL数据库的安装包,并根据提示进行安装。 步骤二:安装JDK 在官网上下…

    Java 2023年5月20日
    00
  • Java计算两个时间段的差的实例详解

    Java计算两个时间段的差的实例详解 在Java中,有时需要计算两个时间段之间的差值。例如,我们可能需要计算两个日期之间相差的天数、小时数、分钟数、秒数等等。 计算两个日期相差的天数 计算两个日期相差的天数可以通过以下步骤实现: 使用java.util.Calendar类获取两个日期所对应的Calendar对象。 使用java.util.Calendar类的…

    Java 2023年5月20日
    00
  • JSON字符串转换JSONObject和JSONArray的方法

    JSON字符串转换为JSONObject或JSONArray是前端开发中常用的操作,以下是使用JavaScript实现JSON字符串转换为JSONObject和JSONArray的方法: 1. JSON字符串转换为JSONObject 使用JSON.parse()方法把字符串转换成JSON对象。例如: let jsonString = ‘{"nam…

    Java 2023年5月26日
    00
  • Java日常练习题,每天进步一点点(3)

    让我来详细讲解“Java日常练习题,每天进步一点点(3)”的完整攻略。 1. 理解练习题的目的和基本要求 练习题的目的是帮助Java初学者提高编程能力,掌握常用的语法和数据结构。基本要求是: 按顺序完成每一个练习; 尽可能自己编写代码,不要复制粘贴; 根据题目要求输出正确的结果; 动手实践,理解代码背后的逻辑思维。 2. 学习Java的基础知识 在进行练习之…

    Java 2023年6月15日
    00
  • Spring Boot异步线程间数据传递的四种方式

    下面让我来详细解释一下Spring Boot异步线程间数据传递的四种方式。 1. 使用CompletableFuture CompletableFuture是Java8中推出的异步编程API,可以很好的处理异步任务,同时也提供了一些方法来实现线程间的数据传递。 使用CompletableFuture来传递数据,主要有以下两个方法: CompletableFu…

    Java 2023年5月26日
    00
  • 深入理解java异常处理机制的原理和开发应用

    深入理解Java异常处理机制的原理和开发应用攻略 前言 在Java编程中,异常处理是非常重要的一部分。我们知道,Java异常处理机制是通过try-catch语句块来实现的,但是,try-catch的实现原理是什么呢?我们应该如何在实际开发中更好的利用异常处理机制呢?下面我们将详细讲解Java异常处理机制的原理和开发应用的攻略。 Java异常处理机制的原理 J…

    Java 2023年5月26日
    00
合作推广
合作推广
分享本页
返回顶部