下面就是Docker部署Kafka以及Spring Kafka实现的完整攻略:
准备工作
首先,需要安装Docker
及Docker Compose
。
然后,创建一个文件夹,名为docker-kafka-spring
,用于存放本示例代码和配置文件。
Docker部署Kafka
- 在该文件夹下,创建一个名为
docker-compose.yml
的文件,用于定义所需的Docker
容器。
version: '3.8'
services:
zookeeper:
image: zookeeper
container_name: zookeeper
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "test-topic:1:1"
depends_on:
- zookeeper
- 拉取所需的
Docker
镜像。
docker-compose pull
- 启动容器。
docker-compose up -d
现在,Docker
中已经启动了Zookeeper
和Kafka
容器。
Spring Kafka实现
- 添加依赖。
在pom.xml
文件中添加如下依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.7</version>
</dependency>
- 编写配置。
在application.yml
文件中添加如下配置:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: test-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
- 发送消息。
编写代码,在main
函数中添加如下代码,向test-topic
发送消息:
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(DemoApplication.class, args);
KafkaTemplate<String, String> kafkaTemplate = context.getBean(KafkaTemplate.class);
kafkaTemplate.send("test-topic", "test message");
}
}
- 接收消息。
编写代码,创建一个KafkaListener
,接收test-topic
中的消息。
@Component
public class KafkaConsumer {
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void consume(String message) {
System.out.println("Received message: " + message);
}
}
现在,使用Maven
编译并运行DemoApplication
类,可在控制台看到test-topic
中的消息被成功消费。
示例
以上是完整的Docker
部署Kafka
以及Spring Kafka
实现的攻略。下面给出两个较为简单的示例。
示例1
- 添加依赖。
在pom.xml
文件中添加如下依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.7</version>
</dependency>
- 编写配置。
在application.yml
文件中添加如下配置:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: test-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
- 发送消息。
编写代码,在main
函数中添加如下代码,向test-topic
发送消息:
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(DemoApplication.class, args);
KafkaTemplate<String, String> kafkaTemplate = context.getBean(KafkaTemplate.class);
kafkaTemplate.send("test-topic", "test message 1");
kafkaTemplate.send("test-topic", "test message 2");
kafkaTemplate.send("test-topic", "test message 3");
}
}
- 接收消息。
编写代码,创建一个KafkaListener
,接收test-topic
中的消息。
@Component
public class KafkaConsumer {
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void consume(String message) {
System.out.println("Received message: " + message);
}
}
现在,使用Maven
编译并运行DemoApplication
类,可在控制台看到test-topic
中的消息被成功消费。
示例2
- 添加依赖。
在pom.xml
文件中添加如下依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.7</version>
</dependency>
- 编写配置。
在application.yml
文件中添加如下配置:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: test-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
- 发送消息。
编写代码,在main
函数中添加如下代码,向test-topic
发送消息:
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext context = SpringApplication.run(DemoApplication.class, args);
KafkaTemplate<String, String> kafkaTemplate = context.getBean(KafkaTemplate.class);
Map<String, Object> headers = new HashMap<>();
headers.put(KafkaHeaders.TOPIC, "test-topic");
headers.put(KafkaHeaders.MESSAGE_KEY, "test-key");
headers.put(KafkaHeaders.TIMESTAMP, System.currentTimeMillis());
kafkaTemplate.send(MessageBuilder
.createMessage("test message 1", new MessageHeaders(headers)));
Thread.sleep(2000);
kafkaTemplate.send(MessageBuilder
.createMessage("test message 2", new MessageHeaders(headers)));
}
}
- 接收消息。
编写代码,创建一个KafkaListener
,接收test-topic
中的消息。
@Component
public class KafkaConsumer {
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void consume(ConsumerRecord<String, String> record) {
System.out.println("Received message Key: " + record.key());
System.out.println("Received message Value: " + record.value());
}
}
现在,使用Maven
编译并运行DemoApplication
类,可在控制台看到test-topic
中的消息被成功消费。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Docker部署Kafka以及Spring Kafka实现 - Python技术站