Java使用kafka发送和生产消息的示例

下面是使用Java发送和生产消息的示例攻略。

准备工作

  1. 安装Kafka
  2. 创建一个主题(Topic)
  3. 引入Kafka和zookeeper依赖

在pom.xml中添加以下依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.5.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.9</version>
        <scope>test</scope>
    </dependency>
</dependencies>

生产者示例

一个生产者向Kafka发送消息的示例代码如下:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;


public class ProducerDemo {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));

        producer.close();
    }

}

这个示例代码中,我们使用了Kafka的Java API来创建生产者对象,并向my-topic主题发送了一些消息。

消费者示例

一个消费者从Kafka接收消息的示例代码如下:

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;


public class ConsumerDemo {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }

}

这个示例代码中,我们使用了Kafka的Java API来创建消费者对象,并订阅了my-topic主题,一旦有新的消息产生,我们就会把消息的offset、key和value输出到控制台上。

希望这个示例能够帮助你更好地理解Java如何使用Kafka发送和生产消息。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java使用kafka发送和生产消息的示例 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • java字符串数组进行大小排序的简单实现

    下面是“java字符串数组进行大小排序的简单实现”的完整攻略: 1.前置知识 在进行字符串数组排序之前,需要了解以下知识点: 1.1 比较器 Java中的比较器定义了对对象进行排序的规则,在比较器中实现排序规则后可以调用 Collections.sort() 方法或 Arrays.sort() 方法进行排序。 1.2 字符串比较 当需要对字符串进行比较时,可…

    Java 2023年5月26日
    00
  • 深入了解Springboot核心知识点之数据访问配置

    深入了解Spring Boot核心知识点之数据访问配置 Spring Boot是一个非常流行的Java框架,它提供了许多便利的功能,其中包括数据访问。在本文中,我们将深入了解Spring Boot的数据访问配置,包括如何配置数据源、如何使用JdbcTemplate和如何使用Spring Data JPA。 配置数据源 在Spring Boot中,我们可以使用…

    Java 2023年5月15日
    00
  • Java数据结构之位图的简单实现和使用

    Java数据结构之位图的简单实现和使用 随着数据量的快速增长,数据结构的高效率已经变得越来越重要。而位图是一个简单而高效率的用于数据存储与查询的数据结构。本文将详细介绍位图的概念、实现过程以及使用方法。 什么是位图? 位图(Bit Map) 是一种非常简单的存储数据结构,它使用一个或多个二进制位来表示一个数据的状态。位图的本质是一个大整数,其中的每个二进制位…

    Java 2023年5月26日
    00
  • 解析Spring 漏洞及其修复方案

    解析Spring 漏洞及其修复方案 Spring框架是一款非常流行的Java应用程序框架,广泛应用于企业级应用程序开发中。然而,Spring框架中也有一些漏洞风险,这些漏洞可能会被黑客利用来攻击应用程序。以下是关于Spring漏洞及其修复方案的详细攻略。 Spring 漏洞类型 Spring框架中的漏洞风险主要分为以下几类: 注入漏洞:包括SQL注入和代码注…

    Java 2023年5月19日
    00
  • java实现HmacSHA256算法进行加密方式

    Java实现HmacSHA256算法进行加密方式 算法描述 HmacSHA256算法是一种基于哈希函数的加密算法,它采用SHA256加密算法和密钥来实现加密。HMAC全称是“Hash-based Message Authentication Code”,即基于哈希函数的消息认证码。它可以用于验证数据的完整性和真实性,避免数据被篡改和伪造。 Java实现 我们…

    Java 2023年5月19日
    00
  • 详细介绍SpringCloud之Ribbon

    详细介绍SpringCloud之Ribbon 什么是Ribbon? Ribbon是Netflix开源项目之一,主要功能是提供客户端的负载均衡算法及服务调用。它是Spring Cloud体系中较为重要的组件,可以与Eureka、Consul、Zookeeper等注册中心组合使用,实现服务间的调用与负载均衡。 Ribbon的负载均衡算法 Ribbon提供了多种负…

    Java 2023年6月16日
    00
  • java解析dbf之通过javadbf包生成和读取dbf文件

    下面是“java解析dbf之通过javadbf包生成和读取dbf文件”的完整攻略,包含以下主要内容: javadbf包是什么,如何引入 生成dbf文件 读取dbf文件 1. javadbf包是什么,如何引入 javadbf包是用于处理dbf文件格式的Java库,它支持生成、读取和写入dbf文件。这个库的最新版本是2.0.0-beta,你可以在github上找…

    Java 2023年5月19日
    00
  • Java 7大常见排序方法实例详解

    Java 7大常见排序方法实例详解 排序算法是计算机科学中的重要技能之一,Java为开发者提供了多种常见的排序方法,本文将介绍Java 7大常见排序方法并提供详细的示例说明。 1. 冒泡排序(Bubble Sort) 冒泡排序是最简单的排序算法之一,它的思想是依次比较相邻的两个元素,如果前面的元素比后面的元素大,则交换这两个元素的位置,通过多次比较和交换,将…

    Java 2023年5月26日
    00
合作推广
合作推广
分享本页
返回顶部