Docker部署Kafka以及Spring Kafka实现

下面就是Docker部署Kafka以及Spring Kafka实现的完整攻略:

准备工作

首先,需要安装DockerDocker Compose

然后,创建一个文件夹,名为docker-kafka-spring,用于存放本示例代码和配置文件。

Docker部署Kafka

  1. 在该文件夹下,创建一个名为docker-compose.yml的文件,用于定义所需的Docker容器。
version: '3.8'
services:
  zookeeper:
    image: zookeeper
    container_name: zookeeper
    ports:
      - 2181:2181
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "test-topic:1:1"
    depends_on:
      - zookeeper
  1. 拉取所需的Docker镜像。
docker-compose pull
  1. 启动容器。
docker-compose up -d

现在,Docker中已经启动了ZookeeperKafka容器。

Spring Kafka实现

  1. 添加依赖。

pom.xml文件中添加如下依赖:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.6.7</version>
</dependency>
  1. 编写配置。

application.yml文件中添加如下配置:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: test-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
  1. 发送消息。

编写代码,在main函数中添加如下代码,向test-topic发送消息:

@SpringBootApplication
public class DemoApplication {
  public static void main(String[] args) {
    ConfigurableApplicationContext context = SpringApplication.run(DemoApplication.class, args);
    KafkaTemplate<String, String> kafkaTemplate = context.getBean(KafkaTemplate.class);
    kafkaTemplate.send("test-topic", "test message");
  }
}
  1. 接收消息。

编写代码,创建一个KafkaListener,接收test-topic中的消息。

@Component
public class KafkaConsumer {
  @KafkaListener(topics = "test-topic", groupId = "test-group")
  public void consume(String message) {
    System.out.println("Received message: " + message);
  }
}

现在,使用Maven编译并运行DemoApplication类,可在控制台看到test-topic中的消息被成功消费。

示例

以上是完整的Docker部署Kafka以及Spring Kafka实现的攻略。下面给出两个较为简单的示例。

示例1

  1. 添加依赖。

pom.xml文件中添加如下依赖:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.6.7</version>
</dependency>
  1. 编写配置。

application.yml文件中添加如下配置:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: test-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
  1. 发送消息。

编写代码,在main函数中添加如下代码,向test-topic发送消息:

@SpringBootApplication
public class DemoApplication {
  public static void main(String[] args) {
    ConfigurableApplicationContext context = SpringApplication.run(DemoApplication.class, args);
    KafkaTemplate<String, String> kafkaTemplate = context.getBean(KafkaTemplate.class);
    kafkaTemplate.send("test-topic", "test message 1");
    kafkaTemplate.send("test-topic", "test message 2");
    kafkaTemplate.send("test-topic", "test message 3");
  }
}
  1. 接收消息。

编写代码,创建一个KafkaListener,接收test-topic中的消息。

@Component
public class KafkaConsumer {
  @KafkaListener(topics = "test-topic", groupId = "test-group")
  public void consume(String message) {
    System.out.println("Received message: " + message);
  }
}

现在,使用Maven编译并运行DemoApplication类,可在控制台看到test-topic中的消息被成功消费。

示例2

  1. 添加依赖。

pom.xml文件中添加如下依赖:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.6.7</version>
</dependency>
  1. 编写配置。

application.yml文件中添加如下配置:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: test-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
  1. 发送消息。

编写代码,在main函数中添加如下代码,向test-topic发送消息:

@SpringBootApplication
public class DemoApplication {
  public static void main(String[] args) throws InterruptedException {
    ConfigurableApplicationContext context = SpringApplication.run(DemoApplication.class, args);
    KafkaTemplate<String, String> kafkaTemplate = context.getBean(KafkaTemplate.class);
    Map<String, Object> headers = new HashMap<>();
    headers.put(KafkaHeaders.TOPIC, "test-topic");
    headers.put(KafkaHeaders.MESSAGE_KEY, "test-key");
    headers.put(KafkaHeaders.TIMESTAMP, System.currentTimeMillis());
    kafkaTemplate.send(MessageBuilder
        .createMessage("test message 1", new MessageHeaders(headers)));
    Thread.sleep(2000);
    kafkaTemplate.send(MessageBuilder
        .createMessage("test message 2", new MessageHeaders(headers)));
  }
}
  1. 接收消息。

编写代码,创建一个KafkaListener,接收test-topic中的消息。

@Component
public class KafkaConsumer {
  @KafkaListener(topics = "test-topic", groupId = "test-group")
  public void consume(ConsumerRecord<String, String> record) {
    System.out.println("Received message Key: " + record.key());
    System.out.println("Received message Value: " + record.value());
  }
}

现在,使用Maven编译并运行DemoApplication类,可在控制台看到test-topic中的消息被成功消费。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Docker部署Kafka以及Spring Kafka实现 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • MySQL之JSON类型字段的使用技巧分享

    MySQL之JSON类型字段的使用技巧分享 在MySQL 5.7及以上版本中,除了常见的数据类型之外,还新增了一个JSON类型字段。JSON类型的字段可以存储JSON格式的数据,对于存储半结构化数据非常方便。本文将详细讲解JSON类型字段的使用技巧,包括JSON格式、创建、插入、更新、查询等操作。 1. JSON格式的数据 JSON(JavaScript O…

    Java 2023年5月26日
    00
  • spring boot如何基于JWT实现单点登录详解

    这里是关于如何基于JWT实现Spring Boot单点登录的攻略: 什么是JWT JWT(JSON Web Token),是一种用于身份验证的标准。它由三部分组成:Header(头部)、Payload(负载)和Signature(签名)。 Header部分一般用于描述Token的类型和 signature使用的算法,例如: { "alg"…

    Java 2023年5月20日
    00
  • 使用FileReader采用的默认编码

    使用FileReader对象默认采用的编码方式为UTF-8编码。但是,你也可以通过指定readAsText方法的第二个参数,来指定读取文件的编码方式。下面是使用FileReader对象进行文件读取的攻略: 步骤一:创建FileReader对象 在javascript中创建FileReader对象,可以使用下面的代码: var reader = new Fil…

    Java 2023年5月20日
    00
  • JAVA如何按字节截取字符串

    截取一个字符串的一部分可以使用 substring() 方法,但是这种方式只能按照字符的数量来截取。如果需要按照字节截取,可以先将字符串转换为字节数组,然后再截取指定的字节数组部分,最后将这个字节数组转换回字符串。 具体的步骤如下: 将字符串转换为字节数组。 可以使用 getBytes() 方法将字符串转换为字节数组,例如: java String str …

    Java 2023年5月27日
    00
  • Java Apache Commons报错“IOException”的原因与解决方法

    当使用Java的Apache Commons类库时,可能会遇到“IOException”错误。这个错误通常由以下原因之一起: I/O操作失败:如果I/O操作失败,则可能会出现此错误。在这种情况下,需要检查I/O操作以决此问题。 文件或目录不存在:如果文件或目录不存在,则可能会出现此错误。在这种情况下,需要确保文件或目录存在。 以下是两个实例: 例1 如果I/…

    Java 2023年5月5日
    00
  • 没有杯子的世界:OOP设计思想的应用实践

    最近看到一个有趣的问题:Person类具有Hand,Hand可以操作杯子Cup,但是在石器时代是没有杯子的,这个问题用编程怎么解决? 简单代码实现 我们先用简单代码实现原问题: @Data public class Person { private final String name; private Hand hand = new Hand(); priv…

    Java 2023年4月22日
    00
  • Spring项目运行依赖spring-contex解析

    Spring框架是个非常流行的Java开发框架,它通过使用依赖注入和面向切面编程等技术来简化Java开发过程。在Spring框架中,spring-context模块是一个非常重要的模块,它提供了一些关键的功能,如依赖注入、AOP和Java EE集成等。在本文中,我们将提供一份完整攻略,从基础到深入,让你了解Spring项目在运行中依赖spring-conte…

    Java 2023年5月20日
    00
  • Springboot工具类ReflectionUtils使用教程

    下面我将详细讲解“Springboot工具类ReflectionUtils使用教程”。 Springboot工具类ReflectionUtils使用教程 简介 在Java开发中,我们有时需要使用反射来获取或修改某些对象的属性或方法,而这个过程其实是比较繁琐的。Spring框架提供了一个工具类ReflectionUtils,能够方便地使用反射来快速获取或修改对…

    Java 2023年5月19日
    00
合作推广
合作推广
分享本页
返回顶部