spring kafka @KafkaListener详解与使用过程

Spring Kafka @KafkaListener详解与使用过程

简介

Spring Kafka 为 Kafka 提供了 Producer 和 Consumer 的封装,提供了方便的API让我们在Spring Boot项目中使用Kafka。其中 @KafkaListener 的注解为我们编写 Kafka Consumer 提供便利。

使用步骤

使用 Spring Kafka @KafkaListener 实现 Kafka Consumer 的步骤如下:

  1. 引入 Maven 依赖

在 pom.xml 文件中添加如下依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.2</version>
</dependency>
  1. 编写 Kafka Consumer

编写一个 Consumer 类,使用 @KafkaListener 注解标注方法,用于监听特定的 topic 。方法中使用 ConsumerRecord 对象来接收消息。

@Component
public class MyConsumer {

    @KafkaListener(topics = "mytopic")
    public void onMessage(ConsumerRecord<String, String> record) {
        // 处理消息
        String message = record.value();
        // do something
    }
}
  1. 配置 Kafka 消费者

配置 Kafka 消费者属性,根据需要设置一些消费者的参数,比如bootstrap.servers, group.id等等。

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
  1. 编写Kafka Producer

编写一个 Producer 类,使用 KafkaTemplate 实现消息发送,代码如下:

@Service
public class MyProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}
  1. 发送消息
@Service
public class MyService {
    private final MyProducer myProducer;

    public MyService(MyProducer myProducer) {
        this.myProducer = myProducer;
    }

    public void sendMessage(String topic, String message) {
        myProducer.sendMessage(topic, message);
    }
}
  1. 运行应用程序,监听消息

在应用程序启动时,Spring 容器会自动扫描带有 @KafkaListener 注解的方法,并启动相应的消费者线程监听 topic 的消息。消息到达时,onMessage()方法会被回调。

例子

下面简单介绍两个使用 KafkaListener 注解的实例。

实例1

发送消息到指定 topic ,并监听该 topic 的消息

@RestController
@RequestMapping("/api/kafka")
public class KafkaController {
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @PostMapping("/send/{topic}")
    public String sendMessage(@PathVariable String topic, @RequestParam String message) {
        kafkaTemplate.send(topic, message);
        return "Message sent successfully to topic: " + topic;
    }
}

@Component
public class MyConsumer {
    @KafkaListener(topics = "mytopic")
    public void onMessage(ConsumerRecord<String, String> record) {
        String message = record.value();
        System.out.println("接收到的消息:" + message);
    }
}

当消息发送成功后,Consumer 的 onMessage 方法会监听到消息,输出包含消息内容的日志。

实例2

监听多个 topic,使用groupId来协同处理消息。

@Component
public class MyConsumer {

    @KafkaListener(id = "myGroup", topics = {"topic1", "topic2"})
    public void onMessage(ConsumerRecord<String, String> record) {
        // 处理消息
        String topic = record.topic();
        String message = record.value();
        System.out.println("接收到 " + topic + " 消息:" + message);
    }
}

通过在注解中使用id属性来指定groupId,即可让多个 Consumer 实例共同协作处理消息,实现消息的高可用性。

总结

使用 Spring Kafka 的 @KafkaListener 注解,即可快速编写 Kafka Consumer,实现消息的消费。在实际开发过程中,可根据需求配置 Spring Kafka 的各项参数,以实现对消息的更加细粒度地处理和控制。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:spring kafka @KafkaListener详解与使用过程 - Python技术站

(0)
上一篇 2023年6月2日
下一篇 2023年6月2日

相关文章

  • java批量解析微信dat文件

    下面是“java批量解析微信dat文件”的完整攻略。 背景 如果你用过微信,你就会知道微信的消息保存在.dat文件中。这些文件包含了聊天记录、联系人、群组等等信息。为了方便地查看这些数据,我们可以使用Java编写程序,批量解析这些.dat文件。 准备工作 在编写程序之前,我们需要一些准备工作。首先,我们需要下载微信的.apk文件,并将其解压。然后进入解压后的…

    Java 2023年5月20日
    00
  • 详解SpringBoot项目整合Vue做一个完整的用户注册功能

    我们来详细讲解一下“详解SpringBoot项目整合Vue做一个完整的用户注册功能”。这个攻略分两个部分:服务器端和客户端。我们分别来讲解。 服务器端 1. 创建SpringBoot项目 首先,我们需要在IDE中创建一个SpringBoot项目。可以使用Spring Initializr创建一个简单的Java Web项目,或者自己使用Maven创建。 2. …

    Java 2023年5月20日
    00
  • Java中计算时间差的方法

    当我们使用Java进行开发时,有时需要计算两个时间之间的时间差。在Java中计算时间差可以使用以下常用方式。 1.使用Date类 import java.text.SimpleDateFormat; import java.util.Date; public class TimeDifference { public static void main(Str…

    Java 2023年5月20日
    00
  • 基于Ajax技术实现文件上传带进度条

    实现基于Ajax技术的文件上传带进度条,需要进行以下步骤: 1.引入jQuery和jQuery.form插件 在HTML文件中通过script标签引入jQuery和jQuery.form插件,可以通过CDN地址引入,也可以将文件下载到本地后引入。 示例: <script src="https://cdn.bootcdn.net/ajax/li…

    Java 2023年6月15日
    00
  • java采用中文方式显示时间的方法

    为了让Java程序中以中文方式显示时间,我们可以采用以下两种方法: 使用java.util.Date和java.text.DateFormat 我们可以用java.util.Date类获取当前的日期和时间,并使用java.text.DateFormat类将日期格式化为中文。下面是一个示例: import java.util.Date; import java…

    Java 2023年5月20日
    00
  • Java实现文件的分割与合并

    下面是详细的讲解: 1.需求分析 在很多情况下,我们需要将大文件拆分成多个小文件进行存储或传输。因此需要实现一个文件分割与合并的工具。Java提供的File类可以很好地操作文件,但并不提供文件分割和合并的功能。下面我们就来讲讲如何在Java中实现文件分割与合并。 2.文件分割 文件分割就是将一个大文件切割成若干个小文件,方便存储和传输。Java中实现文件分割…

    Java 2023年5月20日
    00
  • Java操作FreeMarker模板引擎的基本用法示例小结

    要在Java中使用FreeMarker模板引擎进行模板渲染,需要经历以下几个步骤: 引入FreeMarker依赖 在Maven项目中,可以在pom.xml文件中添加以下依赖项: <dependency> <groupId>org.freemarker</groupId> <artifactId>freemark…

    Java 2023年6月15日
    00
  • java对象序列化操作实例分析

    Java对象序列化操作 简介 Java对象序列化是指将Java对象转换为字节流,以便于数据传输、持久化和分布式应用等场景下的使用。其作用是将Java对象序列化为数据流方便在网络间传输或在本地存储,以及反序列化操作使其还原为Java对象。 序列化对象 对于待序列化的Java对象,需要实现 Serializable 接口。以下是一个示例: import java…

    Java 2023年5月26日
    00
合作推广
合作推广
分享本页
返回顶部