Springboot集成Kafka进行批量消费及踩坑点

yizhihongxing

下面我来详细讲解“Springboot集成Kafka进行批量消费及踩坑点”的完整攻略。

一、前言

Kafka是一款分布式消息队列系统,由Apache在2011年引入,其主要包括了生产者、消费者等API,用于实现消息的发送和接收等操作。而Springboot则是目前流行的一种开发框架,它可以简化Java应用的开发过程。本文将探讨如何在Springboot中集成Kafka进行批量消费,同时也会分享一些踩坑的经验。

二、集成Kafka

1. 引入Kafka依赖

pom.xml中引入Kafka依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

2. 添加Kafka配置

application.properties中添加如下Kafka配置:

#Kafka相关配置
kafka.producer.bootstrap-servers=localhost:9092
kafka.consumer.bootstrap-servers=localhost:9092
kafka.consumer.groupId=group-1
kafka.consumer.enable.auto.commit=true
kafka.consumer.auto.commit.interval.ms=1000
kafka.consumer.session.timeout.ms=30000
kafka.consumer.max.poll.records=10

3. 创建Kafka生产者

KafkaProducer中,我们要注入KafkaTemplate:

@Service
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String key, String message) throws Exception {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("发送消息成功," + result.getProducerRecord().topic() + "-" +
                        result.getProducerRecord().key() + ":" + result.getProducerRecord().value());
            }

            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("发送消息失败,原因:" + throwable.getMessage());
            }
        });
    }

}

4. 创建Kafka消费者

KafkaConsumer中,我们要通过@KafkaListener注解指定消费的topic和group,同时指定批量拉取消息的数量:

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "test-topic", groupId = "group-1", containerFactory = "kafkaListenerContainerFactory")
    public void listen(List<ConsumerRecord<String, String>> messages) {
        System.out.println("批量消费一次,消息数量:" + messages.size());
        for (ConsumerRecord<String, String> message : messages) {
            System.out.println("消费消息,topic:" + message.topic() + ",key:" + message.key() + ",value:" + message.value());
        }
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.setConcurrency(1);
        factory.getContainerProperties().setPollTimeout(3000);
        factory.setBatchListener(true); // 指定批量消费
        return factory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return properties;
    }
}

在上述代码中,我们将factory.setBatchListener(true)设置为true,表示启用批量消费。

三、示例

1. 生产消息

Controller中编写生产消息的接口:

@RestController
public class TestController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/send")
    public String send() throws Exception {
        String message = "Hello, Kafka!";
        kafkaProducer.sendMessage("test-topic", "key-" + System.currentTimeMillis(), message);
        return "success";
    }

}

2. 消费消息

启动应用后,访问http://localhost:8080/send接口生产一条消息,观察控制台输出。

四、踩坑点

1. Group ID重复

Kafka中的Group ID是用于标识消费者组的,同一个Group ID只允许存在一个消费者组。如果在多个消费者组中使用了同一个Group ID会导致消费者无法消费消息。因此,Group ID需要在多个应用中保持唯一。

2. Auto-commit被动触发

Kafka中的消费者组有自动提交和手动提交两种方式。在自动提交模式下,Kafka会定时自动提交已消费的消息偏移量,但是由于这个过程是异步的,因此可能会存在消费了但是消息偏移量未提交的情况。因此,对于需要保证消息不被重复消费的场景,建议使用手动提交模式。

3. 手动提交消息偏移量

在手动提交模式下,我们需要在消费消息后手动提交消息的偏移量,以避免重复消费。代码示例如下:

@KafkaListener(topics = "test-topic", groupId = "group-1")
public void listen(ConsumerRecord<String, String> message, Acknowledgment acknowledgment) {
    System.out.println("消费消息,topic:" + message.topic() + ",key:" + message.key() + ",value:" + message.value());
    acknowledgment.acknowledge(); // 手动提交消息偏移量
}

以上就是我关于“Springboot集成Kafka进行批量消费及踩坑点”的详细讲解,希望对你有所帮助。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Springboot集成Kafka进行批量消费及踩坑点 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • 使用DataGrip连接Hive的详细步骤

    使用DataGrip连接Hive需要以下步骤: 在DataGrip中安装Hive插件。 打开DataGrip,点击File -> Settings -> Plugins,搜索Hive,点击Install安装插件。 安装成功后,需要重启DataGrip。 配置Hive数据源 点击File -> New -> Data Source -&…

    Java 2023年6月16日
    00
  • RestTemplate添加HTTPS证书全过程解析

    首先我们来介绍一下RestTemplate,它是Spring Framework的一个类,可以简化HTTP客户端的编程工作。通俗地说,它可以让我们轻松使用Java代码发送HTTP请求,接收响应等操作。但是如果要使用HTTPS协议,则需要添加证书。下面就为大家详细介绍一下添加HTTPS证书的全过程。 第一步:获取证书文件 首先,我们需要获取HTTPS证书的文件…

    Java 2023年5月19日
    00
  • SpringMVC后端返回数据到前端代码示例

    SpringMVC后端返回数据到前端代码示例的完整攻略如下: 1. 定义Controller类 首先要定义一个Controller类,用于处理前端的请求,然后返回数据给前端。以下是示例代码: @RestController @RequestMapping("/api") public class UserController { @Aut…

    Java 2023年6月15日
    00
  • 浅谈SpringSecurity基本原理

    浅谈SpringSecurity基本原理 什么是SpringSecurity SpringSecurity是一个基于Spring框架的安全框架,它提供了完善的认证(authentication)和授权(authorization)机制,可用于保护Web应用程序中的敏感资源。 SpringSecurity的基本原理 SpringSecurity的主要组件 Sp…

    Java 2023年5月20日
    00
  • Maven管理SpringBoot Profile详解

    Maven管理SpringBoot Profile详解 简介 Spring Boot是一款基于Spring框架,更快地启动、开发和部署单独的Java应用程序的工具。在使用Spring Boot的过程中,我们经常需要使用到不同的配置和环境,而这些配置和环境可以通过Profile的方式进行管理。 本文将讲解如何利用Maven对Spring Boot的Profil…

    Java 2023年5月19日
    00
  • 基于html5+java实现大文件上传实例代码

    让我详细为您介绍一下“基于html5+java实现大文件上传实例代码”的完整攻略和代码实现。 简介 为了解决传统文件上传方式在处理大文件上传时所面临的性能瓶颈和功能缺失,我们需要使用一些新的技术手段。html5提供了File API来处理客户端文件读取,而java的高性能能力则可以处理并发上传和分片上传等复杂操作,两者结合起来,就能够实现一套优秀的大文件上传…

    Java 2023年5月19日
    00
  • 一文带你认识java中的String类

    String类在Java中是一个非常重要的类,它用来表示字符串,下面就一文带你认识Java中的String类。 1. String类的概述 在Java中,字符串是一个非常常见的数据类型。而String类则是Java提供的处理字符串的主要类。String类是不可变的,也就是说一旦创建了一个String对象,便不能再进行修改。每进行一次字符串的操作,都会创建一个…

    Java 2023年5月26日
    00
  • 组织树查询-Jvava实现(递归)

    1.首先查询出组织机构 就是一个简单的查询 List<Dept> deptList = mapper.getDeptList(); Map<Long, OrgNode> nodeMap = new HashMap<>(); List<Long> rootIds = new ArrayList<>()…

    Java 2023年4月19日
    00
合作推广
合作推广
分享本页
返回顶部