kafka为分布式消息队列,队列中的消息可以保存7天。当使用消费者读取kafka中数据时,会面临这样的问题:

当消费者进程中断后,再次进行消费时会发现,读取的位置当前获取的新数据,如果配置offset

auto_offset_reset='earliest'

会从kafka初始的数据消费,重复消费之前的数据。如果想要使消费者像下载文件一样,可以“设置断点继续重传”我们可以在定义

consumer时候这样做:

from kafka import KafkaConsumer

consumer = KafkaConsumer('stratum', group_id='my_group_new', auto_offset_reset='earliest', bootstrap_servers='kafka-001:9092','kafka-002:9092','kafka-003:9092'])
#consumer.subscribe(topics=('stratum','stratum1'))
for message in consumer:
    #print (message)
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

用于测试的producer代码:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='kafka-001:9092','kafka-002:9092','kafka-003:9092'])
for _ in range(1):
    #producer.send('my_favorite_topic', b'some_message_bytes')
    value = b'{"user":"caohaitao"}'
    producer.send('stratum', value)
    value = b'{"user":"hehe"}'
    producer.send('stratum', value)
producer.flush()


小结:

1.kafka中使用消费群组,配置group_id可以实现,从上次中断的地方开始消费

2.使用auto_offset_reset='earliest'可以实现,在第一次使用group_id时可以从数据起始点开始消费

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐