Python中的Kafka:使用kafka-python

介绍
项目:dpkp/kafka-python

kafka-python试用新版的kafka(0.10 或 0.9),也支持旧的版本(比如0.8.0)。实际使用中当我更新kafka-python后,原来的代码使用kafka 0.8.2会出错,所以最好将kafka升级到最新版本。

发送消息

使用KafkaProducer发送消息。

from kafka import KafkaProducer kafka_host='127.0.0.1' # host kafka_port=9092 # port producer = KafkaProducer(bootstrap_servers=['{kafka_host}:{kafka_port}'.format( kafka_host=kafka_host, kafka_port=kafka_port )]) message_string = 'some message' response = producer.send(kafka_topic, message_string.encode('utf-8'))
1
2
3
4
5
6
7
8
9
10
11
12
from kafka import KafkaProducer
 
kafka_host = '127.0.0.1' # host
kafka_port = 9092 # port
 
producer = KafkaProducer ( bootstrap_servers = [ '{kafka_host}:{kafka_port}' . format (
     kafka_host = kafka_host ,
     kafka_port = kafka _port
) ] )
message_string = 'some message'
response = producer . send ( kafka_topic , message_string . encode ( 'utf-8' ) )
 

接收消息

使用KafkaComsuer接收消息。

from kafka import KafkaConsumer consumer = KafkaConsumer( 'collector', group_id='my-group', bootstrap_servers=['{kafka_host}:{kafka_port}'.format(kafka_host=KAFKA_HOST, kafka_port=KAFKA_PORT)] ) for message in consumer: content = json.loads(message.value) # ...
1
2
3
4
5
6
7
8
9
10
11
from kafka import KafkaConsumer
consumer = KafkaConsumer (
     'collector' ,
     group_id = 'my-group' ,
     bootstrap_servers = [ '{kafka_host}:{kafka_port}' . format ( kafka_host = KAFKA_HOST , kafka_port = KAFKA_PORT ) ]
)
for message in consumer :
     content = json . loads ( message . value )
     # ...
 
 

consumer可迭代,当队列中没有消息时,上面代码会一直等待。使用Control+C可以退出循环。

# 退出循环的处理方法

Control+C 会产生 KeyboardInterrupt 异常,在代码中捕获该异常,就可以执行必要的退出操作。

try: for message in consumer: content = json.loads(message.value) #... except KeyboardInterrupt, e: print "Catch Keyboard interrupt" #...
1
2
3
4
5
6
7
8
try :
     for message in consumer :
         content = json . loads ( message . value )
         #...
except KeyboardInterrupt , e :
     print "Catch Keyboard interrupt"
     #...
 
踩过的坑

已经废弃的Simple APIs
新版kafka-python已经废弃的Simple APIs,包括KafkaClient和SimpleConsumer、SimpleProducer。

kafka = KafkaClient("{kafka_host}:{kafka_port}".format(kafka_host=KAFKA_HOST, kafka_port=KAFKA_PORT)) consumer = SimpleConsumer(kafka, kafka_group, kafka_topic) for message in consumer: # continue content = json.loads(message.message.value) timestamp = content['timestamp'] t = datetime.datetime.fromtimestamp(timestamp) print t.strftime("%Y-%m-%d %H:%M:%S"), content['app'], content['data']['message']
1
2
3
4
5
6
7
8
9
kafka = KafkaClient ( "{kafka_host}:{kafka_port}" . format ( kafka_host = KAFKA_HOST , kafka_port = KAFKA_PORT ) )
consumer = SimpleConsumer ( kafka , kafka_group , kafka_topic )
for message in consumer :
     # continue
     content = json . loads ( message . message . value )
     timestamp = content [ 'timestamp' ]
     t = datetime . datetime . fromtimestamp ( timestamp )
     print t . strftime ( "%Y-%m-%d %H:%M:%S" ) , content [ 'app' ] , content [ 'data' ] [ 'message' ]
 
多次创建KafkaProducer产生的问题

我在一个Flask应用中提供一个API接口,该接口向Kafka发送一条消息。原来使用Simple API,在每个controller函数中创建一个SimpleProducer。切换到KafkaProducer后,依然在每个controller中创建新的KafkaProducer。如下所示:

try: producer = KafkaProducer(bootstrap_servers=['{kafka_host}:{kafka_port}'.format( kafka_host=app.config['KAFKA_HOST'], kafka_port=app.config['KAFKA_PORT'] )]) message_string = json.dumps(message) response = producer.send(kafka_topic, message_string.encode('utf-8')) producer.close()
1
2
3
4
5
6
7
8
9
try :
     producer = KafkaProducer ( bootstrap_servers = [ '{kafka_host}:{kafka_port}' . format (
         kafka_host = app . config [ 'KAFKA_HOST' ] ,
         kafka_port = app . config [ 'KAFKA_PORT' ]
     ) ] )
     message_string = json . dumps ( message )
     response = producer . send ( kafka_topic , message_string . encode ( 'utf-8' ) )
     producer . close ( )
 

但随后发生如下错误:

Traceback (most recent call last): File "/usr/lib64/python2.7/site-packages/flask/app.py", line 1836, in __call__ File "/usr/lib64/python2.7/site-packages/flask/app.py", line 1820, in wsgi_app File "/usr/lib64/python2.7/site-packages/flask/app.py", line 1403, in handle_exception File "/usr/lib64/python2.7/site-packages/flask/app.py", line 1817, in wsgi_app File "/usr/lib64/python2.7/site-packages/flask/app.py", line 1477, in full_dispatch_request File "/usr/lib64/python2.7/site-packages/flask/app.py", line 1381, in handle_user_exception File "/usr/lib64/python2.7/site-packages/flask/app.py", line 1475, in full_dispatch_request File "/usr/lib64/python2.7/site-packages/flask/app.py", line 1461, in dispatch_request File "/vagrant/nwpc_log_data_pipeline/producer_agent/agent/agent_controller.py", line 49, in get_collector_log File "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py", line 272, in __init__ File "/usr/lib/python2.7/site-packages/kafka/client_async.py", line 129, in __init__ File "/usr/lib/python2.7/site-packages/kafka/selectors34.py", line 422, in __init__ IOError: [Errno 24] Too many open files
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Traceback ( most recent call last ) :
   File "/usr/lib64/python2.7/site-packages/flask/app.py" , line 1836 , in __call__
 
   File "/usr/lib64/python2.7/site-packages/flask/app.py" , line 1820 , in wsgi_app
 
   File "/usr/lib64/python2.7/site-packages/flask/app.py" , line 1403 , in handle_exception
 
   File "/usr/lib64/python2.7/site-packages/flask/app.py" , line 1817 , in wsgi_app
 
   File "/usr/lib64/python2.7/site-packages/flask/app.py" , line 1477 , in full_dispatch_request
 
   File "/usr/lib64/python2.7/site-packages/flask/app.py" , line 1381 , in handle_user_exception
 
   File "/usr/lib64/python2.7/site-packages/flask/app.py" , line 1475 , in full_dispatch_request
 
   File "/usr/lib64/python2.7/site-packages/flask/app.py" , line 1461 , in dispatch_request
 
   File "/vagrant/nwpc_log_data_pipeline/producer_agent/agent/agent_controller.py" , line 49 , in get_collector_log
 
   File "/usr/lib/python2.7/site-packages/kafka/producer/kafka.py" , line 272 , in __init__
 
   File "/usr/lib/python2.7/site-packages/kafka/client_async.py" , line 129 , in __init__
 
   File "/usr/lib/python2.7/site-packages/kafka/selectors34.py" , line 422 , in __init__
 
IOError : [ Errno 24 ] Too many open files
 

原因是每次创建KafkaProducer都会占用一个文件符号,controller结束时,没有释放,导致后面无法继续创建新的KafkaProducer。

解决方法是创建全局KafkaProducer,供所有controller使用。

慎用RecordMetadata.get()
官方例子中有如下的代码

producer = KafkaProducer(bootstrap_servers=['broker1:1234']) # Asynchronous by default future = producer.send('my-topic', b'raw_bytes') # Block for 'synchronous' sends try: record_metadata = future.get(timeout=10) except KafkaError: # Decide what to do if produce request failed... log.exception() pass
1
2
3
4
5
6
7
8
9
10
11
12
13
producer = KafkaProducer ( bootstrap_servers = [ 'broker1:1234' ] )
 
# Asynchronous by default
future = producer . send ( 'my-topic' , b 'raw_bytes' )
 
# Block for 'synchronous' sends
try :
     record_metadata = future . get ( timeout = 10 )
except KafkaError :
     # Decide what to do if produce request failed...
     log . exception ( )
     pass
 

KafkaProducer.send 返回 RecordMetadata 对象,RecordMetadata.get 可以获取 record 的信息。但在发送大量消息时,get方法可能会造成明显的延时。所以当我们不关心消息是否发送成功时,就不要调用get方法了。




  • zeropython 微信公众号 5868037 QQ号 5868037@qq.com QQ邮箱
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐