Springboot集成Kafka进行批量消费及踩坑点

下面我来详细讲解“Springboot集成Kafka进行批量消费及踩坑点”的完整攻略。

一、前言

Kafka是一款分布式消息队列系统,由Apache在2011年引入,其主要包括了生产者、消费者等API,用于实现消息的发送和接收等操作。而Springboot则是目前流行的一种开发框架,它可以简化Java应用的开发过程。本文将探讨如何在Springboot中集成Kafka进行批量消费,同时也会分享一些踩坑的经验。

二、集成Kafka

1. 引入Kafka依赖

pom.xml中引入Kafka依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

2. 添加Kafka配置

application.properties中添加如下Kafka配置:

#Kafka相关配置
kafka.producer.bootstrap-servers=localhost:9092
kafka.consumer.bootstrap-servers=localhost:9092
kafka.consumer.groupId=group-1
kafka.consumer.enable.auto.commit=true
kafka.consumer.auto.commit.interval.ms=1000
kafka.consumer.session.timeout.ms=30000
kafka.consumer.max.poll.records=10

3. 创建Kafka生产者

KafkaProducer中,我们要注入KafkaTemplate:

@Service
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String key, String message) throws Exception {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("发送消息成功," + result.getProducerRecord().topic() + "-" +
                        result.getProducerRecord().key() + ":" + result.getProducerRecord().value());
            }

            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("发送消息失败,原因:" + throwable.getMessage());
            }
        });
    }

}

4. 创建Kafka消费者

KafkaConsumer中,我们要通过@KafkaListener注解指定消费的topic和group,同时指定批量拉取消息的数量:

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "test-topic", groupId = "group-1", containerFactory = "kafkaListenerContainerFactory")
    public void listen(List<ConsumerRecord<String, String>> messages) {
        System.out.println("批量消费一次,消息数量:" + messages.size());
        for (ConsumerRecord<String, String> message : messages) {
            System.out.println("消费消息,topic:" + message.topic() + ",key:" + message.key() + ",value:" + message.value());
        }
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.setConcurrency(1);
        factory.getContainerProperties().setPollTimeout(3000);
        factory.setBatchListener(true); // 指定批量消费
        return factory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return properties;
    }
}

在上述代码中,我们将factory.setBatchListener(true)设置为true,表示启用批量消费。

三、示例

1. 生产消息

Controller中编写生产消息的接口:

@RestController
public class TestController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/send")
    public String send() throws Exception {
        String message = "Hello, Kafka!";
        kafkaProducer.sendMessage("test-topic", "key-" + System.currentTimeMillis(), message);
        return "success";
    }

}

2. 消费消息

启动应用后,访问http://localhost:8080/send接口生产一条消息,观察控制台输出。

四、踩坑点

1. Group ID重复

Kafka中的Group ID是用于标识消费者组的,同一个Group ID只允许存在一个消费者组。如果在多个消费者组中使用了同一个Group ID会导致消费者无法消费消息。因此,Group ID需要在多个应用中保持唯一。

2. Auto-commit被动触发

Kafka中的消费者组有自动提交和手动提交两种方式。在自动提交模式下,Kafka会定时自动提交已消费的消息偏移量,但是由于这个过程是异步的,因此可能会存在消费了但是消息偏移量未提交的情况。因此,对于需要保证消息不被重复消费的场景,建议使用手动提交模式。

3. 手动提交消息偏移量

在手动提交模式下,我们需要在消费消息后手动提交消息的偏移量,以避免重复消费。代码示例如下:

@KafkaListener(topics = "test-topic", groupId = "group-1")
public void listen(ConsumerRecord<String, String> message, Acknowledgment acknowledgment) {
    System.out.println("消费消息,topic:" + message.topic() + ",key:" + message.key() + ",value:" + message.value());
    acknowledgment.acknowledge(); // 手动提交消息偏移量
}

以上就是我关于“Springboot集成Kafka进行批量消费及踩坑点”的详细讲解,希望对你有所帮助。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Springboot集成Kafka进行批量消费及踩坑点 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • Java中的集合框架是什么?

    Java中的集合框架是一个内置的数据结构库,它提供了一组接口和类,用于处理和管理元素的类集合。Java集合框架有助于开发人员以更高效、更灵活和更可扩展的方式处理复杂数据。 Java集合框架中的类和接口被组织成三个主要的继承层次结构:Collection、Map 和 Iterator。其中,Collection是表示一组对象的根接口,Map是表示一组键值对(k…

    Java 2023年4月27日
    00
  • Java利用套接字实现应用程序对数据库的访问

    Java利用套接字实现应用程序对数据库的访问,需要经过以下步骤: 配置数据库信息:在Java应用程序中,我们可以通过配置文件比如Property文件来存储数据库信息,比如数据库名称、用户名、密码、地址、端口等等。 建立连接:使用Java JDBC API中的该库驱动连接数据库。 字段验证:避免SQL注入攻击,对输入的字段进行验证和过滤。 构建SQL语句:使用…

    Java 2023年6月1日
    00
  • Java杂谈之类和对象 封装 构造方法以及代码块详解

    Java杂谈之类和对象 封装 构造方法以及代码块详解 类和对象 Java是面向对象编程的语言,类是Java强大的概念之一。类是一组字段和方法的集合,用于表示某些相关的状态和行为。 在Java中,对象是类的实例。对象是通过类构造函数创建的,类构造函数定义了如何创建对象。按照惯例,类名应该以大写字母开头。 在Java中,类可以有任意数量的方法和成员,这些方法和成…

    Java 2023年5月26日
    00
  • Java基于Calendar类输出指定年份和月份的日历代码实例

    Java基于Calendar类输出指定年份和月份的日历代码实例如下: import java.util.*; public class CalendarExample { public static void main(String[] args) { // 声明并获取Calendar对象 Calendar calendar = Calendar.getIn…

    Java 2023年5月20日
    00
  • Java实现把两个数组合并为一个的方法总结

    针对“Java实现把两个数组合并为一个的方法总结”,我为您提供以下完整攻略。 1. 使用concat方法合并数组 Java提供了一个非常简单的函数concat来合并两个数组。但是,这种方法只适用于元素类型相同的数组。 具体操作步骤: 初始化两个需要合并的数组; 分别使用Arrays类的toString()方法将两个数组转换为字符串形式; 使用Arrays类的…

    Java 2023年5月26日
    00
  • java OOM内存泄漏原因及解决方法

    Java OOM内存泄漏原因及解决方法 前言 Java内存泄漏(Memory Leak)是指程序中已经不再用到的内存,因为某些原因没有被释放,导致这部分内存永远无法被使用,从而引起内存的浪费。内存泄漏会导致系统的性能降低,甚至会导致系统奔溃。下面将详细介绍Java OOM内存泄漏的原因及解决方法。 OOM内存泄漏原因 长生命周期对象持有短生命周期对象的引用 …

    Java 2023年6月15日
    00
  • 两种JAVA实现短网址服务算法

    下面是关于两种JAVA实现短网址服务算法的完整攻略。 一、算法介绍 在实现短网址服务时,通常需要将长URL转换为短字符串来实现,这时需要用到哈希算法。 解决方案一:MD5 MD5是一种广泛使用的哈希算法,它可以将任意长度的消息压缩为一个128位的哈希值。MD5哈希算法不可逆,因此可以很好地用来实现短网址服务。在此方案中,我们需要实现以下步骤: 获取长URL;…

    Java 2023年5月19日
    00
  • jsp实现textarea中的文字保存换行空格存到数据库的方法

    首先,需要使用Java后端编写一个SaveTextServlet,该Servlet接收来自前端页面的POST请求,将textarea中的文字保存到数据库中。 1.前端页面代码示例: <form action="SaveTextServlet" method="post"> <textarea name…

    Java 2023年6月15日
    00
合作推广
合作推广
分享本页
返回顶部