一文理解Kafka Rebalance负载均衡
在Kafka中,消费者组(Consumer Group)中的多个消费者(Consumer)会协同消费一个或多个Topic的分区(Partition)。消费者组通过Partition的分配策略来确定每个消费者负责消费哪些分区。当新的消费者加入或退出消费者组时,需要重新进行分区分配,这个过程被称为Rebalance(负载均衡)。
Rebalance的流程
Rebalance的过程大致如下:
- 当有新消费者加入或者有消费者退出消费者组时,该组中的每个消费者会发送一个Join Group请求给Kafka集群的协调者(Coordinated Group)。
- 协调者将当前所有可用的消费者分配到多个会话(Session)中,每个会话包含若干消费者,并负责分配里面消费者负责的分区。
- 通过消费者的调用栈来计算出被调用的消费者所属的会话会话,只有同一会话中的消费者才会参与到Rebalance的处理中。
- 如果有新消费者加入,或者有消费者退出,会触发Rebalance过程。协调者收到Join Group请求后会尝试分配新的Session,然后向每个会话中的消费者发送Rebalance请求。
- 每个消费者在收到Rebalance请求时,会停止消费并释放掉自己所负责的分区,然后在新的Session中重新进行分配,然后向协调者发送Complete Rebalance请求。
- 当每个消费者的Complete Rebalance请求被接收后,协调者向每个消费者发送Join Complete请求,激活消费。
Rebalance处理不当会导致一些问题,如消费者组内部的吞吐率降低,消息重复消费等。
Rebalance的优化
为了优化Rebalance的性能,Kafka引入了一个Partition的位移(leade offset)的概念。简单来讲,Rebalance的时候,下一个要被消费的Partition的位移会被记录在一个Cache中,这样在下一次的Rebalance中,就不需要再重新计算消费位移了。
另外,Kafka还引入了Consumer Group和Topic的缓存机制,将它们缓存在Zookeeper或者Kafka自身的元数据存储中,也就是之前提到的协调者。这样,每次Rebalance时不需要重复读取Zookeeper或元数据存储中的数据,提升了性能。
Rebalance的注意事项
在Kafka中,Rebalance是常态,所以我们需要考虑如何在Rebalance过程中尽量减少应用程序的停机时间。我们可以结合Kafka提供的Rebalance Listener监听器,在Rebalance过程的不同阶段执行钩子函数来处理Rebalance过程中的问题。例如,在Rebalance开始前,可以暂停消息的处理,避免Consumer同步对Topic进行写入造成的消息重复;在Rebalance完成后,可以立刻启动消费者消费消息,避免时间浪费。
示例1
我们可以通过设置Kafka Consumer的Rebalance Listener来实现一些特定的需求,例如在Rebalance前停止消费。
public class MyConsumerListener implements ConsumerRebalanceListener{
public void onPartitionsRevoked(Collection<TopicPartition> partitions){
//在Rebalance之前停止消费,避免造成消息的重复。
consumer.pause(partitions);
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions){
//在Rebalance完成后,立刻恢复消费。
consumer.resume(partitions);
}
}
示例2
假设我们的消费者程序是一个集群,它们共同订阅某一个Topic,如果一个消费者下线,其余消费者应该如何处理?
在Rebalance完成之后,Kafka会重新分配分区,因此有可能被下线的消费者负责的分区新分配到其余的消费者处理。为了避免这类消息重复消费问题,我们应该通过关闭Producer的ACK机制来降低副作用。在默认情况下,Producer等待Broker的ACK表示部分或全部的消息发送成功,如果没有收到ACK就会自动重试,这样可能会导致下线消费者原本已经处理过的消息被新的消费者重复消费。因此,我们可以将ACK机制的配置修改为0,避免重复消费的问题。
Properties props = new Properties();
//...
props.put("acks", 0);
//...
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "1", "message"), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:一文理解kafka rebalance负载均衡 - Python技术站