一.部署

  1. pull wurstmeister/zookeeper
sudo docker pull wurstmeister/zookeeper
  1. pull wurstmeister/kafka
sudo docker pull wurstmeister/kafka
  1. 启动zookeeper
sudo docker run -d --name zookeeper -p 2181:2181 -t  wurstmeister/zookeeper
  1. 启动kafka
sudo docker run  -d -t --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.18.166:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.18.166:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092  wurstmeister/kafka
  1. server.properties 修改num.partitions=2 ,表示2个分区
num.partitions=2 
  1. 重启kafka container

二.两个group 消费分区消费

  1. product
import json
from kafka import KafkaProducer


def sendMsg(topic,msg_dict):
    producer = KafkaProducer(bootstrap_servers=["192.168.18.166:9092"],value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    '''send json  String to kafka  '''
    producer.send(topic, msg_dict)
    producer.close()
if __name__ == '__main__':
    for i in range(10):
        sendMsg("peter.test1",str(str(i)+'11'))
        print("over"+str(str(i)+'10'))
        sendMsg("json",msg_dict)
  1. 两个consumer指定分区消费,,如果不指定分区,则消费全部消息
#consumer 1
from kafka import KafkaConsumer
import logging 
import json
import datetime
from kafka import TopicPartition

def main():
        #consumer = KafkaConsumer( "peter.test_cluser",group_, max_poll_records=5, max_poll_interval_ms=600000, 
        consumer = KafkaConsumer( group_, max_poll_records=5, max_poll_interval_ms=600000, 
                            #enable_auto_commit =False,
                             bootstrap_servers=["192.168.18.166:9092"], value_deserializer=json.loads)
        print("start consumer",str(consumer)) 
        consumer.assign([TopicPartition('peter.test1', 0)]) # 指定topic 和指定分区消费
        for message in consumer:
            # print(str(message.offset()))
            print("receive label message")
            if message:
                try:
                    print("@@@@@ ---> consumer_cluser1  get new message ",str(message.value))
                    
                    #consumer.commit()
                except Exception as e:
                    logging.error("@@---->  Exception : ")
                    logging.error(e)
                    traceback.print_exc()


if __name__ == '__main__':
    main()

consumer2

from kafka import KafkaConsumer
import logging 
import json
import datetime
from kafka import TopicPartition

def main():
        #consumer = KafkaConsumer( "peter.test_cluser",group_, max_poll_records=5, max_poll_interval_ms=600000, 
        consumer = KafkaConsumer( group_, max_poll_records=5, max_poll_interval_ms=600000, 
                            #enable_auto_commit =False,
                             bootstrap_servers=["192.168.18.166:9092"], value_deserializer=json.loads)
        print("start consumer",str(consumer)) 
        consumer.assign([TopicPartition('peter.test1', 1)])# 指定topic 和指定分区消费
        for message in consumer:
            # print(str(message.offset()))
            print("receive label message")
            if message:
                try:
                    print("@@@@@ ---> consumer_cluser1  get new message ",str(message.value))
                    
                    #consumer.commit()
                except Exception as e:
                    logging.error("@@---->  Exception : ")
                    logging.error(e)
                    traceback.print_exc()


if __name__ == '__main__':
    main()