spring boot 与kafka集成的示例代码

下面就给您讲解Spring Boot与Kafka集成的示例代码攻略。

1. 引入依赖

首先,在pom.xml文件中添加Kafka相关的依赖:

<!--kafka-->
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>2.3.1.RELEASE</version>
</dependency>

2. 添加Kafka配置

application.properties文件中添加Kafka的配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group

3. 发送消息

在代码中发送消息,创建MessageProducer.java

@Service
public class MessageProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送消息失败:" + ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                        + result.getRecordMetadata().partition());
            }
        });
    }

}

4. 接收消息

创建MessageConsumer.java

@Service
public class MessageConsumer {

    @KafkaListener(topics = "${spring.kafka.consumer.topic}")
    public void receive(ConsumerRecord<?, ?> consumer) {
        System.out.println("收到消息:" + consumer.value());
    }

}

注解@KafkaListener可以自动绑定消息消费者。

这样,在启动项目之后,可以通过MessageProducer发送消息到kafka,并通过MessageConsumer接收消息。

这里再给您提供一个完整的示例代码,可以仔细研读:

application.properties

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group
spring.kafka.consumer.topic=test

MessageProducer.java

@Service
public class MessageProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送消息失败:" + ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                        + result.getRecordMetadata().partition());
            }
        });
    }

}

MessageConsumer.java

@Service
public class MessageConsumer {

    @KafkaListener(topics = "${spring.kafka.consumer.topic}")
    public void receive(ConsumerRecord<?, ?> consumer) {
        System.out.println("收到消息:" + consumer.value());
    }

}

KafkaDemoApplication.java

@SpringBootApplication
public class KafkaDemoApplication implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(KafkaDemoApplication.class);

    @Autowired
    private MessageProducer messageProducer;

    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        messageProducer.sendMessage("test", "Hello, Kafka!");
    }

}

这是一个简单的例子,您可以根据自己的需求进行修改和扩展。

另外,您还可以参考第二个示例:

1. 引入依赖

<!--kafka-->
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>2.3.1.RELEASE</version>
</dependency>

2. 添加Kafka配置

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: test_group1
      auto-offset-reset: earliest
      enable-auto-commit: true
    producer:
      retries: 3
      batch-size: 4096
      buffer-memory: 409600
      compression-type: gzip

3. 发送消息

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("发送消息失败:{}", ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("发送消息成功:{}-{}-{}", result.getRecordMetadata().topic(),
                        result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
            }
        });
    }

}

4. 接收消息

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "${spring.kafka.consumer.topic}")
    public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        try {
            log.info("消费消息:{}-{}-{}-{}", record.topic(), record.partition(), record.offset(), record.value());
            acknowledgment.acknowledge();
        } catch (Exception ex) {
            log.error("消费消息失败:{}", ex.getMessage());
        }
    }

}

这个示例比较完整,包含了配置文件、生产者、消费者,可以作为参考。

希望这份攻略可以对您有所帮助,亲身体验一下Spring Boot与Kafka的集成,一定会让您有更深的理解。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:spring boot 与kafka集成的示例代码 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • Java8新特性之空指针异常的克星Optional类的实现

    Java8新特性之空指针异常的克星Optional类的实现 前言 在 Java 中,我们经常会遇到空指针异常(NullPointerException),尤其是在处理数据集合或者从接口返回数据时,如果拿到了 null 值,程序就会抛出异常。 Java 8 中提供了一个克星空指针异常的类 Optional,它可以有效地解决 null 值的问题。 Optiona…

    Java 2023年5月27日
    00
  • JVM知识总结之垃圾收集算法

    JVM知识总结之垃圾收集算法 什么是垃圾收集算法 垃圾收集算法(Garbage Collection Algorithm)是指垃圾收集器(Garbage Collector,GC)在执行“垃圾收集”操作时,所采用的具体算法。垃圾收集器的作用是自动释放内存中不再被使用的对象。 常见的垃圾收集算法 1. 标记-清除算法(Mark-Sweep) 标记-清除算法(M…

    Java 2023年5月26日
    00
  • java实现屏幕共享功能实例分析

    Java实现屏幕共享功能实例分析 屏幕共享是一种在多人在线协作或远程协作中常见的功能。Java可以用来实现屏幕共享功能。本篇文章将从以下三个方面讲解Java实现屏幕共享功能的攻略: 什么是屏幕共享 屏幕共享实现方式 Java实现屏幕共享功能的具体步骤 什么是屏幕共享 屏幕共享是指一个用户的桌面及其上的应用程序可以在多个用户的计算机上同步显示。通常情况下,屏幕…

    Java 2023年5月18日
    00
  • 一篇文章带你入门Java基本概念

    一篇文章带你入门Java基本概念 Java是一个广泛应用的高级编程语言,它是一种面向对象的语言,体现了一些在C++中经过多年开发和实践所获得的经验,避免了其它更早的面向对象的语言的一些不足,是一个功能强大且通用性很高的编程语言。 基本概念 Java具有丰富的基本概念,其中一些需要初学者掌握: 类 Java中的类是一个蓝图或者模板,它定义了对象包含的属性和方法…

    Java 2023年5月23日
    00
  • Ubuntu14.04 安装配置Tomcat7教程

    下面是Ubuntu 14.04安装配置Tomcat7的完整攻略: 1. 安装JAVA Tomcat是基于Java的,因此我们需要先安装JDK。 可以按照以下步骤安装OpenJDK: 更新软件包列表: sudo apt-get update 安装OpenJDK: sudo apt-get install openjdk-7-jdk 安装完成后,通过以下命令检查…

    Java 2023年5月19日
    00
  • java多线程模拟交通灯管理系统

    下面我将详细讲解如何编写一个Java多线程模拟交通灯管理系统。 前言 交通灯是城市中必不可少的重要设施之一,能帮助路面交通管理变得更加有序。为了更好地理解交通灯的工作原理,我们可以开发一个Java多线程模拟交通灯管理系统来模拟交通灯的运行过程。 设计思路 我们的系统需要设计两个交通灯对象,即红绿灯和绿红灯,交替更替地工作。为了实现此目的,我们可以使用多线程的…

    Java 2023年5月19日
    00
  • Struts2在打包json格式的懒加载异常问题

    当使用Struts2进行json数据懒加载时,有可能会遇到打包json格式的异常问题。这种异常通常是由于Struts2缺少正确的json转换器或配置参数导致的。在本文中,将为您详细讲解如何解决这个问题。 1.检查json-lib库 首先要检查的事项是 json-lib 库, 您需要检查您项目中的 json-lib 包是否正常。 json-lib 库是 Str…

    Java 2023年5月20日
    00
  • maven项目打包上传到私有仓库

    下面是“Maven项目打包上传到私有仓库”的完整攻略: 1. 创建maven项目 首先我们需要创建一个maven项目,这里就不多赘述了,可以通过以下命令在终端中创建一个maven项目: mvn archetype:generate -DgroupId=com.example -DartifactId=my-webapp -DarchetypeArtifact…

    Java 2023年5月19日
    00
合作推广
合作推广
分享本页
返回顶部