SpringBoot整合Kafka工具类的详细代码

yizhihongxing

下面是SpringBoot整合Kafka工具类的详细代码攻略。

环境准备

  1. 确认已经安装JDK、Maven和Kafka
  2. 在Maven中添加Kafka依赖
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.1.13.RELEASE</version>
</dependency>

工具类设计

Kafka工具类应该包含:
1. KafkaProducer生产者
2. KafkaConsumer消费者
3. ProducerRecord消息对象
4. ConsumerRecord消息对象
5. 初始化方法

import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

public class KafkaUtils {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaUtils.class);
    private static KafkaProducer<String, String> producer = null;
    private static KafkaConsumer<String, String> consumer = null;
    private static final String TOPIC = "test-topic"; // 消费和生产的主题名

    /**
     * 获取生产者
     */
    public static KafkaProducer<String, String> getProducer() {
        if (producer == null) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer<>(props);
        }
        return producer;
    }

    /**
     * 获取消费者
     */
    public static KafkaConsumer<String, String> getConsumer() {
        if (consumer == null) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "test-group");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singleton(TOPIC));
        }
        return consumer;
    }

    /**
     * 发送消息
     */
    public static void sendMessage(String message) {
        KafkaProducer<String, String> producer = getProducer();
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
        producer.send(record, new ListenableFutureCallback<RecordMetadata>() {
            @Override
            public void onFailure(Throwable ex) {
                LOGGER.error("Send message failed.", ex);
            }

            @Override
            public void onSuccess(RecordMetadata metadata) {
                LOGGER.info("Send message Success: {}", metadata.offset());
            }
        });
        producer.flush();
    }

    /**
     * 接收消息
     */
    public static void receiveMessage() {
        KafkaConsumer<String, String> consumer = getConsumer();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                LOGGER.info("Received message: partition = {}, offset = {}, key = {}, value = {}", record.partition(), record.offset(), record.key(), record.value());
            }
        }
    }
}

使用示例

生产者示例

public class KafkaProducerDemo {
    public static void main(String[] args) throws InterruptedException {
        String message = "Hello World";
        KafkaUtils.sendMessage(message);
    }
}

消费者示例

public class KafkaConsumerDemo {
    public static void main(String[] args) throws InterruptedException {
        KafkaUtils.receiveMessage();
    }
}

以上就是SpringBoot整合Kafka工具类的详细代码攻略和两条示例,希望能帮到你。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:SpringBoot整合Kafka工具类的详细代码 - Python技术站

(0)
上一篇 2023年5月19日
下一篇 2023年5月19日

相关文章

  • Java非侵入式API接口文档工具apigcc用法详解

    Java非侵入式API接口文档工具apigcc用法详解 概述 apigcc是一款非侵入式的API接口文档生成工具,可以帮助Java开发人员快速生成符合RESTful标准的API接口文档,同时支持多种API文档输出格式,包括HTML、Markdown、PDF等格式。 安装 apigcc可以通过npm安装,使用如下命令即可: npm install apigcc…

    Java 2023年5月20日
    00
  • asp.net Linq To Xml上手Descendants、Elements遍历节点

    ASP.NET是一套由微软公司开发的基于Web的应用程序框架,LINQ to XML则是一种用于处理XML文档的技术。如果想要在ASP.NET中使用LINQ to XML技术,可以通过使用Descendants和Elements方法来遍历XML文档。以下是使用ASP.NET LINQ to XML技术的完整攻略。 1. 创建XML文档 在使用LINQ to …

    Java 2023年5月20日
    00
  • Java反射之通过反射获取一个对象的方法信息(实例代码)

    使用Java反射可以在运行时获取一个类的各种信息,包括类的属性、方法、构造器等。本文将介绍如何通过反射获取一个对象的方法信息,并提供两个示例进行说明。 获取对象的方法信息 要获取一个对象的方法信息,需要使用Java反射中的Method类。Method类提供了关于类或接口中单独某个方法的信息和访问权限。 使用反射获取对象的方法信息的步骤如下: 获取该类的Cla…

    Java 2023年5月26日
    00
  • Kafka之kafka-topics.sh的使用解读

    介绍 kafka-topics.sh 是 Kafka 提供的命令行工具,常用于管理 Kafka 的主题。可以使用此工具创建、删除、查看主题信息,以及修改主题的配置等操作。 使用 首先需要进入kafka的bin目录,输入以下命令即可查询所有的命令: ./kafka-topics.sh 查询所有命令接口: ./kafka-topics.sh {-zookeepe…

    Java 2023年5月20日
    00
  • java与js代码互调示例代码

    当需要在Java与JavaScript之间进行代码互调时,可以使用Java在JavaScript中定义的函数来实现该功能。以下是使用Java代码在JavaScript中定义一个函数,并从JavaScript调用该函数的示例代码: 第一步是为Java代码编写一个类,在类中编写一个用于输出信息到控制台的函数: package com.example; publi…

    Java 2023年5月24日
    00
  • Java数据结构之选择排序算法的实现与优化

    Java数据结构之选择排序算法的实现与优化 选择排序算法的原理 选择排序是一种简单直观的排序算法,它的基本思想是:从待排序的数据中选出最小的数,将其放在首位;再从剩余的数据中选出最小的数,放在已排序数据的末尾;以此类推,直到所有数据均已排序完毕。 选择排序的时间复杂度为O(n²),空间复杂度为O(1)。相比于其他排序算法,选择排序的代码实现简单、易于理解。 …

    Java 2023年5月19日
    00
  • Spring Boot实现图片上传功能

    下面是关于“SpringBoot实现图片上传功能”的完整攻略: 1. 添加依赖 首先需要在 pom.xml 文件中添加依赖: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web&…

    Java 2023年5月15日
    00
  • Java之dao模式详解及代码示例

    Java 之 DAO 模式详解及代码示例 什么是 DAO 模式 DAO,即 Data Access Object,数据访问对象,是一种数据访问的设计模式。它的主要目的是将数据存储到持久化层(通常是数据库)并从数据库中检索数据。这样,就可以将业务逻辑层与数据访问层分离,从而提高系统的可维护性和可重用性。 DAO 模式主要包含以下几个组件: 持久层接口 (DAO…

    Java 2023年5月19日
    00
合作推广
合作推广
分享本页
返回顶部