【Python】【Kafka】kafka读取所有数据——使用消费群组+offset
kafka为分布式消息队列,队列中的消息可以保存7天。当使用消费者读取kafka中数据时,会面临这样的问题:当消费者进程中断后,再次进行消费时会发现,读取的位置当前获取的新数据,如果配置offsetauto_offset_reset='earliest'会从kafka初始的数据消费,重复消费之前的数据。如果想要使消费者像下载文件一样,可以“设置断点继续重传”我们可以在定义consumer时候这样做
·
kafka为分布式消息队列,队列中的消息可以保存7天。当使用消费者读取kafka中数据时,会面临这样的问题:
当消费者进程中断后,再次进行消费时会发现,读取的位置当前获取的新数据,如果配置offset
auto_offset_reset='earliest'
会从kafka初始的数据消费,重复消费之前的数据。如果想要使消费者像下载文件一样,可以“设置断点继续重传”我们可以在定义
consumer时候这样做:
from kafka import KafkaConsumer
consumer = KafkaConsumer('stratum', group_id='my_group_new', auto_offset_reset='earliest', bootstrap_servers='kafka-001:9092','kafka-002:9092','kafka-003:9092'])
#consumer.subscribe(topics=('stratum','stratum1'))
for message in consumer:
#print (message)
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
用于测试的producer代码:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='kafka-001:9092','kafka-002:9092','kafka-003:9092'])
for _ in range(1):
#producer.send('my_favorite_topic', b'some_message_bytes')
value = b'{"user":"caohaitao"}'
producer.send('stratum', value)
value = b'{"user":"hehe"}'
producer.send('stratum', value)
producer.flush()
小结:
1.kafka中使用消费群组,配置group_id可以实现,从上次中断的地方开始消费
2.使用auto_offset_reset='earliest'可以实现,在第一次使用group_id时可以从数据起始点开始消费
更多推荐
已为社区贡献3条内容
所有评论(0)