Kafka安装部署超详细步骤
Apache Kafka是一个开源流处理平台,由Apache软件基金会开发。它是用Scala和Java编写的,并具有高吞吐量,高可靠性和可扩展性等特性。在这里,我们将详细讲解如何在Linux系统上安装和部署Kafka。
步骤一:安装Java
Kafka是Java编写的,因此,首先需要安装Java。
在终端中输入以下命令:
sudo apt update
sudo apt install openjdk-11-jdk
步骤二:下载和解压kafka
在终端中输入以下命令来下载Kafka:
wget http://apache.mirrors.pair.com/kafka/2.8.0/kafka_2.13-2.8.0.tgz
下载完成后,使用以下命令解压Kafka文件:
tar -xzf kafka_2.13-2.8.0.tgz
步骤三:启动kafka服务器
在终端中进入Kafka文件夹,并运行以下命令启动Kafka服务器:
cd kafka_2.13-2.8.0
bin/kafka-server-start.sh config/server.properties
步骤四:创建一个Topic
在终端中运行以下命令来在Kafka中创建一个Topic:
bin/kafka-topics.sh --create --topic test_topic --bootstrap-server localhost:9092
步骤五:创建一个生产者
在终端中运行以下命令来创建一个Kafka生产者:
bin/kafka-console-producer.sh --topic test_topic --bootstrap-server localhost:9092
步骤六:创建一个消费者
在终端中运行以下命令来创建一个Kafka消费者:
bin/kafka-console-consumer.sh --topic test_topic --from-beginning --bootstrap-server localhost:9092
输入的任何消息都将被推送到生产者,然后被消费者接收。
示例一:使用Python生产和消费消息
Kafka提供了许多客户端库,使得使用各种编程语言发送和接收Kafka消息变得更加容易。以下是一个使用Python发送和接收消息的示例:
from kafka import KafkaProducer, KafkaConsumer
# 生产消息
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
producer.send('test_topic', b'Hello, Kafka!')
# 消费消息
consumer = KafkaConsumer('test_topic', auto_offset_reset='earliest',
bootstrap_servers=['localhost:9092'], api_version=(0, 10), consumer_timeout_ms=1000)
for msg in consumer:
print(msg)
示例二:使用Java生产和消费消息
下面是一个使用Java发送和接收消息的示例:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
public class KafkaExample {
private final static String TOPIC = "test_topic";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private static KafkaProducer<String, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer<>(props);
}
private static void sendMessage(String message) {
final Producer<String, String> producer = createProducer();
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.flush();
producer.close();
}
}
public static void main(String[] args) {
sendMessage("Hello, Kafka!");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
这个示例中,我们使用Java实现了一个Kafka生产者和消费者。生产者发送一条消息,消费者收到并打印到标准输出。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:kafka安装部署超详细步骤 - Python技术站