python如何连接kafka?这篇文章教会你连接方法( 二 )

(3)kafka-python消费者
consumer.py
#!/bin/env python from kafka import KafkaConsumer # connect to Kafka server and pass the topic we want to consume consumer = KafkaConsumer('test_20181105',group_id = 'test_group2', bootstrap_servers='100.69.222.221:9092,100.69.222.222:9092,100.69.222.223:9092') try:     for msg in consumer:         print(msg)         # print("%s:%d:%d: key=%s value=https://www.ycpai.cn/python/%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)) except KeyboardInterrupt as e: print(e)同样修改上面的Ip地址和端口号,就可以接收 topic: test_20181105上的消息:
ConsumerRecord(topic='test_20181105', partition=1, offset=951, timestamp=1541405600340, timestamp_type=0, key=None, value=https://www.ycpai.cn/python/b'{"id": 1663, "myuuid": "0f744021b2d9468886908ee6685a0fdb", "time": "20181105 16:13:20"}', checksum=1357895145, serialized_key_size=-1, serialized_value_size=87) ConsumerRecord(topic='test_20181105', partition=0, offset=935, timestamp=1541405600841, timestamp_type=0, key=None, value=b'{"id": 1664, "myuuid": "9379f68f656644bdb2d30911f06240e4", "time": "20181105 16:13:20"}', checksum=-715594646, serialized_key_size=-1, serialized_value_size=87) ConsumerRecord(topic='test_20181105', partition=1, offset=952, timestamp=1541405601341, timestamp_type=0, key=None, value=b'{"id": 1665, "myuuid": "f4a5fa5b32cd4b7991612b626bea4b0e", "time": "20181105 16:13:21"}', checksum=-2068072013, serialized_key_size=-1, serialized_value_size=87)可以通过设置不同的group_id 来实现消息队列或消息订阅:
如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.
如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者.
以上就是小编给大家带来的在python连接kafka的操作方法,希望大家通过阅读小编的文章之后能够有所收获!如果大家觉得小编的文章不错的话,可以多多分享给有需要的人 。

推荐阅读