Springboot集成Kafka进行批量消费及踩坑点

下面我来详细讲解“Springboot集成Kafka进行批量消费及踩坑点”的完整攻略。

一、前言

Kafka是一款分布式消息队列系统,由Apache在2011年引入,其主要包括了生产者、消费者等API,用于实现消息的发送和接收等操作。而Springboot则是目前流行的一种开发框架,它可以简化Java应用的开发过程。本文将探讨如何在Springboot中集成Kafka进行批量消费,同时也会分享一些踩坑的经验。

二、集成Kafka

1. 引入Kafka依赖

pom.xml中引入Kafka依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

2. 添加Kafka配置

application.properties中添加如下Kafka配置:

#Kafka相关配置
kafka.producer.bootstrap-servers=localhost:9092
kafka.consumer.bootstrap-servers=localhost:9092
kafka.consumer.groupId=group-1
kafka.consumer.enable.auto.commit=true
kafka.consumer.auto.commit.interval.ms=1000
kafka.consumer.session.timeout.ms=30000
kafka.consumer.max.poll.records=10

3. 创建Kafka生产者

KafkaProducer中,我们要注入KafkaTemplate:

@Service
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String key, String message) throws Exception {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("发送消息成功," + result.getProducerRecord().topic() + "-" +
                        result.getProducerRecord().key() + ":" + result.getProducerRecord().value());
            }

            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("发送消息失败,原因:" + throwable.getMessage());
            }
        });
    }

}

4. 创建Kafka消费者

KafkaConsumer中,我们要通过@KafkaListener注解指定消费的topic和group,同时指定批量拉取消息的数量:

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "test-topic", groupId = "group-1", containerFactory = "kafkaListenerContainerFactory")
    public void listen(List<ConsumerRecord<String, String>> messages) {
        System.out.println("批量消费一次,消息数量:" + messages.size());
        for (ConsumerRecord<String, String> message : messages) {
            System.out.println("消费消息,topic:" + message.topic() + ",key:" + message.key() + ",value:" + message.value());
        }
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.setConcurrency(1);
        factory.getContainerProperties().setPollTimeout(3000);
        factory.setBatchListener(true); // 指定批量消费
        return factory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return properties;
    }
}

在上述代码中,我们将factory.setBatchListener(true)设置为true,表示启用批量消费。

三、示例

1. 生产消息

Controller中编写生产消息的接口:

@RestController
public class TestController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/send")
    public String send() throws Exception {
        String message = "Hello, Kafka!";
        kafkaProducer.sendMessage("test-topic", "key-" + System.currentTimeMillis(), message);
        return "success";
    }

}

2. 消费消息

启动应用后,访问http://localhost:8080/send接口生产一条消息,观察控制台输出。

四、踩坑点

1. Group ID重复

Kafka中的Group ID是用于标识消费者组的,同一个Group ID只允许存在一个消费者组。如果在多个消费者组中使用了同一个Group ID会导致消费者无法消费消息。因此,Group ID需要在多个应用中保持唯一。

2. Auto-commit被动触发

Kafka中的消费者组有自动提交和手动提交两种方式。在自动提交模式下,Kafka会定时自动提交已消费的消息偏移量,但是由于这个过程是异步的,因此可能会存在消费了但是消息偏移量未提交的情况。因此,对于需要保证消息不被重复消费的场景,建议使用手动提交模式。

3. 手动提交消息偏移量

在手动提交模式下,我们需要在消费消息后手动提交消息的偏移量,以避免重复消费。代码示例如下:

@KafkaListener(topics = "test-topic", groupId = "group-1")
public void listen(ConsumerRecord<String, String> message, Acknowledgment acknowledgment) {
    System.out.println("消费消息,topic:" + message.topic() + ",key:" + message.key() + ",value:" + message.value());
    acknowledgment.acknowledge(); // 手动提交消息偏移量
}

以上就是我关于“Springboot集成Kafka进行批量消费及踩坑点”的详细讲解,希望对你有所帮助。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Springboot集成Kafka进行批量消费及踩坑点 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • HTML页面自动清理js、css文件的缓存(自动添加版本号)

    为了解决用户访问网站时,由于浏览器缓存而读取了旧版本的js、css文件而导致网页无法正确渲染的问题,需要对网站中的js、css文件进行版本控制,并自动清理浏览器缓存。 1. 添加版本控制 在引用js、css文件时,添加版本号。可以采用以下两种方式: 1.1 引用文件名添加版本号 在引用js、css文件时,在文件名后面添加?v=版本号 <link rel…

    Java 2023年6月16日
    00
  • 用bat批处理实现163邮箱自动登陆的代码[已测]

    使用bat批处理实现网页自动化是一种高效的实现方式。下面是使用bat批处理实现163邮箱自动登陆的完整攻略及示例说明: 1. 准备工作 首先,需要在电脑上安装好以下两个工具: 安装好Chrome浏览器 安装好Chrome浏览器的Driver 其中,Chrome浏览器的Driver需要根据自己安装的Chrome版本来选择,可以在 https://npm.tao…

    Java 2023年6月16日
    00
  • Springboot快速入门教程

    下面是关于“Springboot快速入门教程”的完整攻略。 1. 前置条件 在开始学习Springboot之前,需要具备一定的Java基础知识,并熟悉Spring框架的基本概念。 2. 学习步骤 2.1 创建项目 在开始使用Springboot开发项目前,需要先创建一个基础的Springboot项目。在这里以使用Maven创建项目为例: <groupI…

    Java 2023年5月15日
    00
  • Java分布式锁的三种实现方案

    让我来详细讲解“Java分布式锁的三种实现方案”的完整攻略。 什么是分布式锁? 分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,多个节点会竞争同一个锁,这个锁可以是基于数据库或者基于缓存等其他方式实现的。 Java分布式锁的三种实现方案 基于数据库的分布式锁 这种锁的实现方式比较简单,通过数据库的行锁来实现分布式锁,通过insert或…

    Java 2023年5月26日
    00
  • Java实现在线考试系统与设计(学生功能)

    Java实现在线考试系统与设计(学生功能) 系统概述 在线考试系统是基于Web的应用系统,主要是为了方便学生进行在线考试。该系统可以实现学生在线测试、查看成绩等功能。此系统采用Java EE技术,使用SpringMVC框架作为基础框架,使用MyBatis作为ORM框架,使用MySQL数据库进行数据存储。 学生功能 系统设计的学生功能分为以下几个模块: 1. …

    Java 2023年5月19日
    00
  • SpringSecurity学习之自定义过滤器的实现代码

    我会尽力详细讲解。 首先介绍一下Spring Security,它是一个开源框架,用于为基于Spring的应用程序提供身份验证和授权管理功能。Spring Security是一个功能强大,使用广泛的安全框架,已经成为企业级应用领域的标准选择之一。本文将通过实战示例,详细讲解如何在Spring Security中自定义过滤器。 1. 自定义过滤器的概念 在Sp…

    Java 2023年5月20日
    00
  • 图文详解Java的反射机制

    图文详解Java的反射机制 什么是反射机制 Java中的一个重要概念就是反射机制。简单的来说,反射机制是指在运行时动态地获取类的信息以及使用类的信息的能力。通过反射,我们可以在运行时获取类的属性、方法、构造函数等信息,并且可以在运行时动态地进行类的实例化等操作。有了这些能力,我们可以更加灵活地使用Java编写程序。 反射机制的基本用法 获取类对象 我们首先需…

    Java 2023年5月26日
    00
  • SpringBoot2.x 整合Spring-Session实现Session共享功能

    下面我将详细讲解“SpringBoot2.x 整合Spring-Session实现Session共享功能”的完整攻略。 1. 什么是Spring Session Spring Session是Spring框架提供的一个解决方案,用于替换Java Web中使用的HttpSession。 Spring Session将HttpSession存储在集中式存储中,如…

    Java 2023年5月19日
    00
合作推广
合作推广
分享本页
返回顶部