Java使用kafka发送和生产消息的示例

下面是使用Java发送和生产消息的示例攻略。

准备工作

  1. 安装Kafka
  2. 创建一个主题(Topic)
  3. 引入Kafka和zookeeper依赖

在pom.xml中添加以下依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.5.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.9</version>
        <scope>test</scope>
    </dependency>
</dependencies>

生产者示例

一个生产者向Kafka发送消息的示例代码如下:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;


public class ProducerDemo {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));

        producer.close();
    }

}

这个示例代码中,我们使用了Kafka的Java API来创建生产者对象,并向my-topic主题发送了一些消息。

消费者示例

一个消费者从Kafka接收消息的示例代码如下:

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;


public class ConsumerDemo {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }

}

这个示例代码中,我们使用了Kafka的Java API来创建消费者对象,并订阅了my-topic主题,一旦有新的消息产生,我们就会把消息的offset、key和value输出到控制台上。

希望这个示例能够帮助你更好地理解Java如何使用Kafka发送和生产消息。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java使用kafka发送和生产消息的示例 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • Java实现简单版贪吃蛇游戏

    Java实现简单版贪吃蛇游戏 简介 贪吃蛇是一款经典的游戏,通过控制蛇的移动方向和吃到食物来增加蛇的长度,直到蛇撞到墙壁或自己的身体,游戏结束。本文将介绍如何使用Java语言实现一个简单版的贪吃蛇游戏。 实现步骤 1. 设计游戏界面 游戏界面包括游戏画布、分数显示和游戏控制按钮等组件。可以使用Java Swing库来实现游戏界面的设计。 2. 实现蛇的移动 …

    Java 2023年5月26日
    00
  • Sentinel实现动态配置的集群流控的方法

    Sentinel是一个分布式系统的流量控制组件,其通过提供多种限流、降级、熔断等机制来保护系统的稳定性。Sentinel可以配合Spring Cloud、Dubbo等框架使用,而且其提供了动态配置的支持,通过动态更新规则实现流量控制策略的动态调整。本文将详细讲解Sentinel实现动态配置的集群流控的方法,具体过程如下: 步骤1:搭建Sentinel集群 首…

    Java 2023年6月15日
    00
  • Spring security密码加密实现代码实例

    下面我将为你详细讲解”Spring security密码加密实现代码实例”的完整攻略。 简介 Spring Security是Spring团队开发的一个安全框架,用于保护Web应用,管理身份验证和授权访问控制。其中重要的一部分就是密码加密,因为存储明文密码会带来严重的安全风险。Spring Security提供了多种密码加密算法,例如MD5、SHA-256、…

    Java 2023年5月20日
    00
  • Java生成PDF文件的实例代码

    下面是详细讲解Java生成PDF文件的实例代码的攻略。 步骤一:引入依赖 我们使用iText这个开源工具来生成PDF文件,所以我们需要在项目中引入iText的依赖。 <dependency> <groupId>com.itextpdf</groupId> <artifactId>itextpdf</art…

    Java 2023年5月20日
    00
  • 没有杯子的世界:OOP设计思想的应用实践

    最近看到一个有趣的问题:Person类具有Hand,Hand可以操作杯子Cup,但是在石器时代是没有杯子的,这个问题用编程怎么解决? 简单代码实现 我们先用简单代码实现原问题: @Data public class Person { private final String name; private Hand hand = new Hand(); priv…

    Java 2023年4月22日
    00
  • 详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别

    详解Java CountDownLatch和CyclicBarrier 在多线程编程中,我们常常会用到Java中的并发工具类CountDownLatch和CyclicBarrier,它们都是用于线程同步的一种工具。本文将从内部实现和场景上的区别来详细讲解这两种工具类。 CountDownLatch CountDownLatch在多线程中被用于等待一个或多个事…

    Java 2023年5月26日
    00
  • 为Java程序员准备的10分钟Perl教程

    为Java程序员准备的10分钟Perl教程是一份旨在通过简短的教学来为Java程序员介绍Perl的基础知识的文档。下面是一份完整攻略: 简介 在这份教程中,我们将学习Perl的基础知识。Perl是一种通用的脚本语言,特别适合快速开发。Perl有一个庞大的社区以及丰富的文档和库。 变量 在Perl中声明变量不需要指定类型。变量的类型会随着所存储的数据类型而变化…

    Java 2023年5月23日
    00
  • 你真的知道Java中对象的销毁吗

    当一个Java对象不再被程序使用时,它会被JVM自动回收,这个过程称为垃圾回收(Garbage Collection)。垃圾回收器会扫描堆中的所有对象,将未被引用的对象回收,腾出空间供其他对象使用。 Java 中对象的销毁与使用无关,取决于对象是否被垃圾回收器扫描到并回收,因此需要了解垃圾回收机制。 具体来说,Java 的垃圾回收器主要通过以下两个机制进行对…

    Java 2023年5月26日
    00
合作推广
合作推广
分享本页
返回顶部