(3)kafka-python消费者
consumer.py
#!/bin/env python from kafka import KafkaConsumer # connect to Kafka server and pass the topic we want to consume consumer = KafkaConsumer('test_20181105',group_id = 'test_group2', bootstrap_servers='100.69.222.221:9092,100.69.222.222:9092,100.69.222.223:9092') try: for msg in consumer: print(msg) # print("%s:%d:%d: key=%s value=https://www.ycpai.cn/python/%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)) except KeyboardInterrupt as e: print(e)同样修改上面的Ip地址和端口号,就可以接收 topic: test_20181105上的消息:
ConsumerRecord(topic='test_20181105', partition=1, offset=951, timestamp=1541405600340, timestamp_type=0, key=None, value=https://www.ycpai.cn/python/b'{"id": 1663, "myuuid": "0f744021b2d9468886908ee6685a0fdb", "time": "20181105 16:13:20"}', checksum=1357895145, serialized_key_size=-1, serialized_value_size=87) ConsumerRecord(topic='test_20181105', partition=0, offset=935, timestamp=1541405600841, timestamp_type=0, key=None, value=b'{"id": 1664, "myuuid": "9379f68f656644bdb2d30911f06240e4", "time": "20181105 16:13:20"}', checksum=-715594646, serialized_key_size=-1, serialized_value_size=87) ConsumerRecord(topic='test_20181105', partition=1, offset=952, timestamp=1541405601341, timestamp_type=0, key=None, value=b'{"id": 1665, "myuuid": "f4a5fa5b32cd4b7991612b626bea4b0e", "time": "20181105 16:13:21"}', checksum=-2068072013, serialized_key_size=-1, serialized_value_size=87)可以通过设置不同的group_id 来实现消息队列或消息订阅:
如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.
如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者.
以上就是小编给大家带来的在python连接kafka的操作方法,希望大家通过阅读小编的文章之后能够有所收获!如果大家觉得小编的文章不错的话,可以多多分享给有需要的人 。
推荐阅读
- anaconda如何打开创建的虚拟环境?这篇文章教会你详细操作
- Python能不能用怎么测试?Python怎么样才算安装好了
- Python安装位置怎么去找?Python默认安装在哪里
- 怎么在命令提示符中运行python程序?超详细的方法来了
- 京东双十一能分期付款吗?如何分期付款?
- 京东双十一活动方案如何操作?附详细活动方案
- 拼多多商品上架了多久补单?如何补?
- 淘宝如何安全补单?附流程步骤
- 拼多多选词公式怎么计算?如何选词?
- ps如何制作绚烂的烟花文字效果?ps怎么设计烟花文字效果?