SpringBoot整合Kafka工具类的详细代码

下面是SpringBoot整合Kafka工具类的详细代码攻略。

环境准备

  1. 确认已经安装JDK、Maven和Kafka
  2. 在Maven中添加Kafka依赖
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.1.13.RELEASE</version>
</dependency>

工具类设计

Kafka工具类应该包含:
1. KafkaProducer生产者
2. KafkaConsumer消费者
3. ProducerRecord消息对象
4. ConsumerRecord消息对象
5. 初始化方法

import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

public class KafkaUtils {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaUtils.class);
    private static KafkaProducer<String, String> producer = null;
    private static KafkaConsumer<String, String> consumer = null;
    private static final String TOPIC = "test-topic"; // 消费和生产的主题名

    /**
     * 获取生产者
     */
    public static KafkaProducer<String, String> getProducer() {
        if (producer == null) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer<>(props);
        }
        return producer;
    }

    /**
     * 获取消费者
     */
    public static KafkaConsumer<String, String> getConsumer() {
        if (consumer == null) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "test-group");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singleton(TOPIC));
        }
        return consumer;
    }

    /**
     * 发送消息
     */
    public static void sendMessage(String message) {
        KafkaProducer<String, String> producer = getProducer();
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
        producer.send(record, new ListenableFutureCallback<RecordMetadata>() {
            @Override
            public void onFailure(Throwable ex) {
                LOGGER.error("Send message failed.", ex);
            }

            @Override
            public void onSuccess(RecordMetadata metadata) {
                LOGGER.info("Send message Success: {}", metadata.offset());
            }
        });
        producer.flush();
    }

    /**
     * 接收消息
     */
    public static void receiveMessage() {
        KafkaConsumer<String, String> consumer = getConsumer();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                LOGGER.info("Received message: partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value());
            }
        }
    }
}

使用示例

生产者示例

public class KafkaProducerDemo {
    public static void main(String[] args) throws InterruptedException {
        String message = "Hello World";
        KafkaUtils.sendMessage(message);
    }
}

消费者示例

public class KafkaConsumerDemo {
    public static void main(String[] args) throws InterruptedException {
        KafkaUtils.receiveMessage();
    }
}

以上就是SpringBoot整合Kafka工具类的详细代码攻略和两条示例,希望能帮到你。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:SpringBoot整合Kafka工具类的详细代码 - Python技术站

(0)
上一篇 2023年5月19日
下一篇 2023年5月19日

相关文章

  • JSP用过滤器解决request getParameter中文乱码问题

    当提交的表单数据中包含中文字符时,在使用request.getParameter()方法获取参数时,可能会出现中文乱码现象。 为了解决这个问题,我们可以使用过滤器(Filter)来实现字符编码过滤。过滤器是一种可以在web服务器中对web应用程序进行过滤处理的组件,可以在http请求到达目标servlet或jsp之前对http请求进行处理,或在JSP将输出发…

    Java 2023年6月15日
    00
  • java使用JSONObject实例

    下面是关于“java使用JSONObject实例”的详细攻略: 什么是JSONObject JSONObject 是一个 Java 类,它是用来表示 JSON 对象的。我们可以通过构造方法或者添加属性的方式来创建一个 JSONObject 对象,然后可以通过 get 或者 opt 等方法获取里面的属性值。 JSONObject 常用方法 下面列举一些 JSO…

    Java 2023年5月23日
    00
  • 关于springboot的接口返回值统一标准格式

    让我详细讲解一下“关于springboot的接口返回值统一标准格式”的完整攻略。 1. 为什么需要接口返回值统一标准格式 在实际开发中,我们可能会使用不同的接口返回值格式,比如一些服务返回的是JSON格式,而另一些服务则返回的是XML格式。针对这样的情况,我们需要对接口返回值做一些规范化,以便于客户端对接口返回值进行处理。另外,如果服务端返回的数据格式不统一…

    Java 2023年5月20日
    00
  • java时间日期使用与查询代码详解

    Java时间日期使用与查询代码详解 介绍 在Java中,日期和时间是一个常见的需求。Java为我们提供了用于处理日期和时间的多个类和方法。本文将深入介绍Java的日期时间相关的类和方法,并提供使用示例和代码详解。 本文涉及以下类: java.time.LocalDate – 表示只用日期,不包含时间的类。 java.time.LocalTime – 表示只用…

    Java 2023年5月20日
    00
  • 浅谈Java动态代理的实现

    浅谈 Java 动态代理的实现 什么是动态代理? Java 中的代理分为静态代理和动态代理两种。静态代理需要事先写好代理类,通过程序员手动编写的方式,代理对象和目标对象之间的关系就已经确定了。而动态代理是在程序运行时动态生成的代理对象,不需要事先写好代理类。动态代理可以根据目标对象动态地生成代理对象,无需为每个目标对象都编写代理类,增强代码的可重用性。 实现…

    Java 2023年5月26日
    00
  • java生成可执行文件(制作可执行文件)

    Java是一门需要在JAVA虚拟机(JVM)上运行的语言,因此Java源代码无法直接转化为Windows或Linux操作系统上的可执行文件。不过,Java提供了一个工具——Java打包工具(jar工具),你可以使用它将Java代码、构成代码所需的依赖文件(如类库)、配置文件等打包成一个可执行的jar文件。接下来是我们提供的java生成可执行文件(制作可执行文…

    Java 2023年5月19日
    00
  • java文件处理工具类详解

    Java文件处理工具类详解 在Java编程中,我们经常涉及到文件的操作,例如读取文件、清空文件、写入文件、获取文件信息等等,如果每次都手写文件操作代码,那么非常费时费力。因此,编写一个Java文件处理工具类是很有必要的。下面,我将详细讲解如何编写一个Java文件处理工具类。 一、文件相关概念 在开始编写文件处理工具类之前,我们先来了解一些文件相关概念。 1.…

    Java 2023年5月20日
    00
  • JavaSpringBoot报错“NotFoundException”的原因和处理方法

    原因 “Not Found Exception” 错误通常是以下原因引起的: 路径错误:如果您的路径存在问题,则可能会出现此错误。在这种情况下,需要检查您的路径并确保它们正确。 数据库查询问题:如果您的数据库查询存在问题,则可能会出现此错误。在这种情况下,需要检查您的数据库查询并确保它们正确。 代码逻辑问题:如果您的代码逻辑存在问题,则可能会出现此错误。在这…

    Java 2023年5月4日
    00
合作推广
合作推广
分享本页
返回顶部