Kafka源码系列教程之删除topic
删除Kafka中的topic需要以下步骤:
- 关闭topic的所有消费者
- 停止所有的producer,确保没有新的消息被发布到该topic
- 从zookeeper中删除topic目录
- 从broker集群中删除该topic的所有分区
示例 1
假设我们要删除名为foo的topic。首先,我们需要查看哪些消费者正在订阅该topic,可以使用以下命令:
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list | grep foo
如果返回了消费者组ID,我们需要关闭该group所有的消费者,可以使用以下命令:
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group foo-group --delete
如果没有返回消费者组ID,则没有消费者正在订阅该topic。
接下来,我们需要停止任何正在发布到该topic的producer。然后,我们可以使用以下命令从zookeeper中删除该topic:
./zookeeper-shell.sh localhost:2181 delete /brokers/topics/foo
最后,我们需要将所有broker中该topic分区的目录删除。我们可以使用brokers API来获取该topic的所有分区:
./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo | awk '{if (NR!=1) {print}}' | while read line; do echo "${line}" | awk '{print $1}'; done | sort | uniq | sed "s/^/\\/brokers\\/topics\\/foo\\/partitions\\//g" | xargs -I % ./zookeeper-shell.sh localhost:2181 delete %
以上命令会以逐个分区的方式删除该topic中每个分区的目录。
示例 2
假设我们要删除名为bar的topic。同样,我们将使用以下命令检查是否有消费者正在订阅该topic:
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list | grep bar
如果有消费者组正在订阅该topic,则需要使用以下命令关闭该group所有消费者:
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group bar-group --delete
接着,我们停止所有正在发布到该topic的producer。
然后,我们从zookeeper中删除该topic:
./zookeeper-shell.sh localhost:2181 delete /brokers/topics/bar
最后,我们在每个broker上删除该topic的所有分区:
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic bar
我们可以在brokers API中使用以下命令确保分区已删除:
./kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic bar
以上命令应该返回:
Error while executing topic command : Topic 'bar' either doesn't exist or is not described on this broker.
.
(Exiting because of --exit-on-error specified)
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Kafka源码系列教程之删除topic - Python技术站