SpringBoot整合Kafka工具类的详细代码

下面是SpringBoot整合Kafka工具类的详细代码攻略。

环境准备

  1. 确认已经安装JDK、Maven和Kafka
  2. 在Maven中添加Kafka依赖
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.1.13.RELEASE</version>
</dependency>

工具类设计

Kafka工具类应该包含:
1. KafkaProducer生产者
2. KafkaConsumer消费者
3. ProducerRecord消息对象
4. ConsumerRecord消息对象
5. 初始化方法

import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

public class KafkaUtils {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaUtils.class);
    private static KafkaProducer<String, String> producer = null;
    private static KafkaConsumer<String, String> consumer = null;
    private static final String TOPIC = "test-topic"; // 消费和生产的主题名

    /**
     * 获取生产者
     */
    public static KafkaProducer<String, String> getProducer() {
        if (producer == null) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer<>(props);
        }
        return producer;
    }

    /**
     * 获取消费者
     */
    public static KafkaConsumer<String, String> getConsumer() {
        if (consumer == null) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "test-group");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singleton(TOPIC));
        }
        return consumer;
    }

    /**
     * 发送消息
     */
    public static void sendMessage(String message) {
        KafkaProducer<String, String> producer = getProducer();
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
        producer.send(record, new ListenableFutureCallback<RecordMetadata>() {
            @Override
            public void onFailure(Throwable ex) {
                LOGGER.error("Send message failed.", ex);
            }

            @Override
            public void onSuccess(RecordMetadata metadata) {
                LOGGER.info("Send message Success: {}", metadata.offset());
            }
        });
        producer.flush();
    }

    /**
     * 接收消息
     */
    public static void receiveMessage() {
        KafkaConsumer<String, String> consumer = getConsumer();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                LOGGER.info("Received message: partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value());
            }
        }
    }
}

使用示例

生产者示例

public class KafkaProducerDemo {
    public static void main(String[] args) throws InterruptedException {
        String message = "Hello World";
        KafkaUtils.sendMessage(message);
    }
}

消费者示例

public class KafkaConsumerDemo {
    public static void main(String[] args) throws InterruptedException {
        KafkaUtils.receiveMessage();
    }
}

以上就是SpringBoot整合Kafka工具类的详细代码攻略和两条示例,希望能帮到你。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:SpringBoot整合Kafka工具类的详细代码 - Python技术站

(0)
上一篇 2023年5月19日
下一篇 2023年5月19日

相关文章

  • Java的Struts框架报错“ActionNotFoundException”的原因与解决办法

    当使用Java的Struts框架时,可能会遇到“ActionNotFoundException”错误。这个错误通常由以下原因之一起: Action配置问题:如果Action配置不正确,则可能会出现此。在这种情况下,需要检查Action配置以解决此问题。 URL路径问题:如果URL路径不正确,则可能会出现此。在这种情况下,需要检查URL路径以解决此问题。 以下…

    Java 2023年5月5日
    00
  • jackson在springboot中的使用方式-自定义参数转换器

    在Spring Boot中使用Jackson进行序列化和反序列化是很常见的。Jackson是一个非常流行的Java库,提供了高效的JSON序列化和反序列化。在这篇攻略中,我们将学习如何在Spring Boot中使用Jackson的自定义参数转换器。自定义参数转换器的作用是在请求参数与Controller接收参数之间进行转换,以处理来自客户端的任意格式的数据,…

    Java 2023年5月26日
    00
  • spring-cloud-stream的手动消息确认问题

    Spring Cloud Stream是一个用于构建基于事件驱动的微服务的框架。可使用其发现和连接分布式系统中的消息代理,同时提供一些便捷的特性。 在使用Spring Cloud Stream的过程中,手动消息确认是重要的一个问题。手动确认就是指当我们消费了消息后需要向消息队列发送一个确认消息来告诉队列已经处理完消息,可以将消息从队列中删除。否则,队列会一直…

    Java 2023年6月2日
    00
  • Hibernate悲观锁和乐观锁实例详解

    下面是“Hibernate悲观锁和乐观锁实例详解”的完整攻略: 一、悲观锁的概念 悲观锁是一种传统的锁处理方式,其核心思想是对于所操作的数据持有独占锁,避免其他线程在同一时间对该数据进行修改,以达到保证数据操作的完整性和一致性的目的。为了实现对数据的独占性,悲观锁会在数据操作时将其锁定,从而其他线程无法对该数据进行修改,直到该线程完成操作并释放锁为止。 Hi…

    Java 2023年5月31日
    00
  • 在windows下揪出java程序占用cpu很高的线程并完美解决

    以下是针对“在 Windows 下揪出 Java 程序占用 CPU 很高的线程并完美解决”的完整攻略: 1. 使用 Java 可视化工具揪出占用 CPU 较高的线程 步骤1:下载 VisualVM VisualVM 是一款 Java 虚拟机监控和性能分析工具,可以在 Windows 等多个平台上使用,具有良好的界面和体验。可以到以下网址下载 VisualVM…

    Java 2023年5月19日
    00
  • Linux 安装JDK Tomcat MySQL的教程(使用Mac远程访问)

    Linux 安装 JDK Tomcat MySQL 的教程(使用 Mac 远程访问) 前置条件 基本的 Linux 操作知识 一台远程 Linux 服务器 本地 macOS 系统 安装 JDK 从官网下载jdk-8u251-linux-x64.tar.gz文件。(根据系统版本选择对应文件) 将下载的文件上传到服务器,并解压到 /usr/local/jdk8 …

    Java 2023年5月20日
    00
  • java 格式化输出数字的方法

    当我们用Java编写程序时,经常需要将数字以指定格式输出。Java中提供了一些方法来格式化输出数字,这些方法包括使用String.format()和System.out.printf()等。 使用String.format()方法 使用String.format()方法可以使代码更简洁,通常使用以下的语法格式: String formattedString …

    Java 2023年5月26日
    00
  • Java中值传递的深度分析

    Java中值传递的深度分析 在Java中,参数传递有两种方式:值传递和引用传递。本文将主要讲解Java中的值传递,以及值传递的相关知识点。 值传递的定义 值传递,指的是当数据类型为基本数据类型的时候,方法调用时传递的是该基本数据类型的值的一份拷贝,而不是该变量所指向的地址。因此在函数内改变基本数据类型的值不会影响外部变量的值。 值传递的示例 以下是一个简单的…

    Java 2023年5月19日
    00
合作推广
合作推广
分享本页
返回顶部