下面是Kafka使用Java客户端进行访问的示例代码的完整攻略。
环境搭建
首先要确保本地环境已经安装了以下软件:
- JDK 1.8+
- Apache Kafka 2.7.0+
- Maven 3.0+
在确保以上软件环境配置完成后,开始进行Kafka使用Java客户端进行访问的示例代码的操作。
示例一:发送消息到Kafka
- 创建maven项目
首先,在本地创建一个maven项目,引入Kafka相关依赖,pom.xml文件如下:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
</dependencies>
- 编写Java生产者代码
在项目的src/main/java目录下新建一个名为KafkaProducer的Java类,代码如下:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka服务端的主机名和端口号
props.put("acks", "all"); // 等待所有副本节点的应答
props.put("retries", 0);
props.put("batch.size", 16384); // 消息批次提交的大小
props.put("linger.ms", 1); // 等待时间,单位是毫秒
props.put("buffer.memory", 33554432); // Producer可以用来缓存消息的缓冲区大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 序列化器,将key序列化成字节数组
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 序列化器,将value序列化成字节数组
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("test", Integer.toString(i), "message-" + i));
}
producer.close();
}
}
- 运行Java生产者代码
运行KafkaProducer类,即可将消息发送到Kafka对应的主题“test”中。
示例二:从Kafka消费消息
- 编写Java消费者代码
在项目的src/main/java目录下新建一个名为KafkaConsumer的Java类,代码如下:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
- 运行Java消费者代码
运行KafkaConsumer类,即可从Kafka的主题“test”中消费到之前生产者发送的消息。
至此,Kafka使用Java客户端进行访问的示例代码已经完成。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Kafka使用Java客户端进行访问的示例代码 - Python技术站