spring kafka @KafkaListener详解与使用过程

Spring Kafka @KafkaListener详解与使用过程

简介

Spring Kafka 为 Kafka 提供了 Producer 和 Consumer 的封装,提供了方便的API让我们在Spring Boot项目中使用Kafka。其中 @KafkaListener 的注解为我们编写 Kafka Consumer 提供便利。

使用步骤

使用 Spring Kafka @KafkaListener 实现 Kafka Consumer 的步骤如下:

  1. 引入 Maven 依赖

在 pom.xml 文件中添加如下依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.2</version>
</dependency>
  1. 编写 Kafka Consumer

编写一个 Consumer 类,使用 @KafkaListener 注解标注方法,用于监听特定的 topic 。方法中使用 ConsumerRecord 对象来接收消息。

@Component
public class MyConsumer {

    @KafkaListener(topics = "mytopic")
    public void onMessage(ConsumerRecord<String, String> record) {
        // 处理消息
        String message = record.value();
        // do something
    }
}
  1. 配置 Kafka 消费者

配置 Kafka 消费者属性,根据需要设置一些消费者的参数,比如bootstrap.servers, group.id等等。

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
  1. 编写Kafka Producer

编写一个 Producer 类,使用 KafkaTemplate 实现消息发送,代码如下:

@Service
public class MyProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}
  1. 发送消息
@Service
public class MyService {
    private final MyProducer myProducer;

    public MyService(MyProducer myProducer) {
        this.myProducer = myProducer;
    }

    public void sendMessage(String topic, String message) {
        myProducer.sendMessage(topic, message);
    }
}
  1. 运行应用程序,监听消息

在应用程序启动时,Spring 容器会自动扫描带有 @KafkaListener 注解的方法,并启动相应的消费者线程监听 topic 的消息。消息到达时,onMessage()方法会被回调。

例子

下面简单介绍两个使用 KafkaListener 注解的实例。

实例1

发送消息到指定 topic ,并监听该 topic 的消息

@RestController
@RequestMapping("/api/kafka")
public class KafkaController {
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @PostMapping("/send/{topic}")
    public String sendMessage(@PathVariable String topic, @RequestParam String message) {
        kafkaTemplate.send(topic, message);
        return "Message sent successfully to topic: " + topic;
    }
}

@Component
public class MyConsumer {
    @KafkaListener(topics = "mytopic")
    public void onMessage(ConsumerRecord<String, String> record) {
        String message = record.value();
        System.out.println("接收到的消息:" + message);
    }
}

当消息发送成功后,Consumer 的 onMessage 方法会监听到消息,输出包含消息内容的日志。

实例2

监听多个 topic,使用groupId来协同处理消息。

@Component
public class MyConsumer {

    @KafkaListener(id = "myGroup", topics = {"topic1", "topic2"})
    public void onMessage(ConsumerRecord<String, String> record) {
        // 处理消息
        String topic = record.topic();
        String message = record.value();
        System.out.println("接收到 " + topic + " 消息:" + message);
    }
}

通过在注解中使用id属性来指定groupId,即可让多个 Consumer 实例共同协作处理消息,实现消息的高可用性。

总结

使用 Spring Kafka 的 @KafkaListener 注解,即可快速编写 Kafka Consumer,实现消息的消费。在实际开发过程中,可根据需求配置 Spring Kafka 的各项参数,以实现对消息的更加细粒度地处理和控制。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:spring kafka @KafkaListener详解与使用过程 - Python技术站

(0)
上一篇 2023年6月2日
下一篇 2023年6月2日

相关文章

  • 详解SpringBoot通过restTemplate实现消费服务

    在Spring Boot中,我们可以使用RestTemplate来消费RESTful服务。RestTemplate是Spring框架提供的一个HTTP客户端,它可以发送HTTP请求并接收HTTP响应。在本攻略中,我们将详细介绍如何使用RestTemplate来消费服务,并提供两个示例来说明其用法。 以下是两个示例,介绍如何使用RestTemplate来消费服…

    Java 2023年5月15日
    00
  • springboot ehcache 配置使用方法代码详解

    来讲一下“springboot ehcache 配置使用方法代码详解”的完整攻略。 一、什么是Ehcache? Ehcache是一种开源的Java分布式缓存框架,可以在进程内或进程间缓存任意类型的对象,具有内存缓存、磁盘缓存、持久化缓存等多种缓存策略。 二、Spring Boot中配置Ehcache 1.添加Maven依赖 在Spring Boot项目中使用…

    Java 2023年5月20日
    00
  • 理解Java面向对象编程设计

    理解Java面向对象编程设计的完整攻略 1. 理解什么是面向对象编程设计 面向对象编程设计(Object-Oriented Programming,OOP)是一种软件开发范式,基于对象的概念进行编程。其重点在于数据和行为的封装,通过封装来降低耦合度。面向对象的语言中,一切皆为对象。通过对象的封装、继承、多态等特性,编写出高效、灵活、可维护的程序。 2. Ja…

    Java 2023年5月30日
    00
  • 打卡每日10道面试题——JVM篇

    打卡每日10道面试题——JVM篇攻略 简介 本打卡活动旨在通过每天解答10道JVM面试题来加深JVM的理解和应用,提高应聘者面试成功率。本文将为大家提供一个完整的JVM打卡攻略,包括学习路线、注意点和解答示例等。 学习路线 第一阶段:JVM基础知识学习 在这个阶段,你需要学习JVM的基本概念和原理,掌握Java类的加载、链接和初始化过程,了解JVM的内存模型…

    Java 2023年5月20日
    00
  • 重入锁的作用是什么?

    重入锁是一种高级锁,也叫可重入锁或递归锁。它允许线程如同拥有某个资源而不被其他线程所interrupt而阻塞。重入锁为控制多个线程互斥访问共享资源提供了更加高级的功能,相较于传统的synchronized锁,它具有更高的并发性和更强的扩展性。 为了更好的说明重入锁的作用,我们需要先理解重入锁的几个特性: 可重入性:线程可以再次获取已经持有的锁。 公平/非公平…

    Java 2023年5月10日
    00
  • Java StackOverflowError详解

    Java StackOverflowError详解 什么是StackOverflowError? StackOverflowError是在Java虚拟机内存不足时抛出的错误之一,通常是由于方法调用栈溢出而引起的。当我们递归调用一个方法时,每次调用都会将方法运行时需要的一些数据压入调用栈中,包括方法参数、局部变量以及返回地址等,当调用栈已经满了而仍需要入栈时就…

    Java 2023年5月27日
    00
  • 详解Java8 CompletableFuture的并行处理用法

    详解Java8 CompletableFuture的并行处理用法 前言 CompletableFuture 是 Java 8 中新增的一个非常强大的异步编程工具。它提供了非常完善的异步编程配套方案,让 Java 开发人员能够在不使用传统的回调编程方式的前提下,编写出高效、可读、可维护的异步代码。 CompletableFuture 的强大体现在它不仅仅支持异…

    Java 2023年5月19日
    00
  • Spring Data JPA的Audit功能审计数据库的变更

    下面我来详细讲解Spring Data JPA的Audit功能审计数据库的变更的完整攻略。 什么是Spring Data JPA的Audit功能 Spring Data JPA的Audit功能是指将数据的变更操作记录下来,包括数据的新增、删除、修改等操作。通过Audit功能,我们可以了解数据的变更历史,并且可以追溯数据的操作者、操作时间等信息。 如何使用Sp…

    Java 2023年5月20日
    00
合作推广
合作推广
分享本页
返回顶部