spring boot 与kafka集成的示例代码

下面就给您讲解Spring Boot与Kafka集成的示例代码攻略。

1. 引入依赖

首先,在pom.xml文件中添加Kafka相关的依赖:

<!--kafka-->
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>2.3.1.RELEASE</version>
</dependency>

2. 添加Kafka配置

application.properties文件中添加Kafka的配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group

3. 发送消息

在代码中发送消息,创建MessageProducer.java

@Service
public class MessageProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送消息失败:" + ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                        + result.getRecordMetadata().partition());
            }
        });
    }

}

4. 接收消息

创建MessageConsumer.java

@Service
public class MessageConsumer {

    @KafkaListener(topics = "${spring.kafka.consumer.topic}")
    public void receive(ConsumerRecord<?, ?> consumer) {
        System.out.println("收到消息:" + consumer.value());
    }

}

注解@KafkaListener可以自动绑定消息消费者。

这样,在启动项目之后,可以通过MessageProducer发送消息到kafka,并通过MessageConsumer接收消息。

这里再给您提供一个完整的示例代码,可以仔细研读:

application.properties

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group
spring.kafka.consumer.topic=test

MessageProducer.java

@Service
public class MessageProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送消息失败:" + ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                        + result.getRecordMetadata().partition());
            }
        });
    }

}

MessageConsumer.java

@Service
public class MessageConsumer {

    @KafkaListener(topics = "${spring.kafka.consumer.topic}")
    public void receive(ConsumerRecord<?, ?> consumer) {
        System.out.println("收到消息:" + consumer.value());
    }

}

KafkaDemoApplication.java

@SpringBootApplication
public class KafkaDemoApplication implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(KafkaDemoApplication.class);

    @Autowired
    private MessageProducer messageProducer;

    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        messageProducer.sendMessage("test", "Hello, Kafka!");
    }

}

这是一个简单的例子,您可以根据自己的需求进行修改和扩展。

另外,您还可以参考第二个示例:

1. 引入依赖

<!--kafka-->
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>2.3.1.RELEASE</version>
</dependency>

2. 添加Kafka配置

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: test_group1
      auto-offset-reset: earliest
      enable-auto-commit: true
    producer:
      retries: 3
      batch-size: 4096
      buffer-memory: 409600
      compression-type: gzip

3. 发送消息

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("发送消息失败:{}", ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("发送消息成功:{}-{}-{}", result.getRecordMetadata().topic(),
                        result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
            }
        });
    }

}

4. 接收消息

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "${spring.kafka.consumer.topic}")
    public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        try {
            log.info("消费消息:{}-{}-{}-{}", record.topic(), record.partition(), record.offset(), record.value());
            acknowledgment.acknowledge();
        } catch (Exception ex) {
            log.error("消费消息失败:{}", ex.getMessage());
        }
    }

}

这个示例比较完整,包含了配置文件、生产者、消费者,可以作为参考。

希望这份攻略可以对您有所帮助,亲身体验一下Spring Boot与Kafka的集成,一定会让您有更深的理解。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:spring boot 与kafka集成的示例代码 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • Maven引用自定义jar包方式

    以下是使用 Maven 引用自定义 JAR 包的完整攻略: 1. 使用项目本地依赖库 如果你的 JAR 包已经是 Maven 项目,可以使用 Maven 提供的本地依赖库功能。在项目中,将 JAR 包命名为 <artifactId>-<version>.jar,并放在项目的 /lib 目录下,这样 Maven 就会将其加入本地依赖库中…

    Java 2023年5月19日
    00
  • Java Struts图片上传至指定文件夹并显示图片功能

    下面是详细讲解Java Struts图片上传至指定文件夹并显示图片功能的完整攻略: 1. 概述 本文将介绍如何在Java Struts框架下实现图片上传至指定文件夹并显示图片的功能。在实现过程中,我们将使用commons-fileupload和commons-io等第三方库来实现图片上传,通过Struts的Action来处理上传请求,并将上传的图片保存至指定…

    Java 2023年5月20日
    00
  • java基础的详细了解第三天

    Java基础的详细了解第三天 欢迎来到Java基础的详细了解第三天。今天我们将深入了解Java的循环结构、数组、面向对象编程等知识点。 1. 循环结构 Java提供了三种循环结构:while循环、do-while循环和for循环。其中while循环和do-while循环是条件循环,而for循环则是计数循环。以下是它们的基本语法: // while循环 whi…

    Java 2023年5月20日
    00
  • Java中进程与线程的区别

    Java中进程与线程的区别 在Java中,进程(Process)和线程(Thread)都是常见的概念。虽然它们的功能类似,但它们之间存在明显的不同。了解它们的区别对我们正确地设计和编写多线程程序非常重要。 进程和线程的定义 进程是操作系统操作的基本单位,它是程序执行时的一个实例。它拥有自己的内存空间、系统资源和进程上下文等。每个进程都有一个或多个线程,线程是…

    Java 2023年5月19日
    00
  • 解析在Tomcat中启用虚拟线程特性

    解析在Tomcat中启用虚拟线程特性的完整攻略 什么是虚拟线程? 虚拟线程是一种优化Java Web服务器性能的一种技术,虚拟线程的实现不完全依赖于物理线程,而是通过线程池去模拟实现,这样就可以比物理线程更灵活的、更充分的利用服务器的资源,提高性能。 启用Tomcat虚拟线程特性 要启用Tomcat的虚拟线程特性,需要遵循以下步骤: 步骤1:修改server…

    Java 2023年5月19日
    00
  • win10怎么安装java?win10系统安装java的方法

    下面是安装 Java 的完整攻略。 准备工作 在开始安装 Java 之前,需要确认你的计算机上尚未安装 Java 环境,可以通过以下方式检查: 打开命令行窗口,输入命令 java -version,如果显示类似 “java version 1.8.0_241” 的信息,则表示已安装 Java 环境; 如果没有显示版本信息,或提示未找到 java 命令,则需要…

    Java 2023年5月24日
    00
  • Java ArrayList中存放引用数据类型的方式

    Java的ArrayList是一种动态数组类型,它可以存储引用数据类型,即存储对象的引用。下面是Java ArrayList存放引用数据类型的方式的完整攻略。 1. 创建一个ArrayList 使用Java中的ArrayList类来创建一个ArrayList,代码如下: ArrayList<Object> arrayList = new Arra…

    Java 2023年5月26日
    00
  • JavaSpringBoot报错“PreconditionFailedException”的原因和处理方法

    原因 “PreconditionFailedException” 错误通常是以下原因引起的: 请求头问题:如果请求头中包含不受支持的条件,则可能会出现此错误。在这种情况下,需要检查请求头并确保它们正确。 控制器问题:如果控制器中存在问题,则可能会出现此错误。在这种情况下,需要检查控制器并确保它们正确。 解决办法 以下是解决 “PreconditionFail…

    Java 2023年5月4日
    00
合作推广
合作推广
分享本页
返回顶部