Python中的Kafka:使用kafka-python库
Python中的Kafka:使用kafka-python库介绍项目:dpkp/kafka-pythonkafka-python试用新版的kafka(0.10 或 0.9),也支持旧的版本(比如0.8.0)。实际使用中当我更新kafka-python后,原来的代码使用kafka 0.8.2会出错,所以最好将kafka升级...
Python中的Kafka:使用kafka-python库
介绍
项目:dpkp/kafka-python
kafka-python试用新版的kafka(0.10 或 0.9),也支持旧的版本(比如0.8.0)。实际使用中当我更新kafka-python后,原来的代码使用kafka 0.8.2会出错,所以最好将kafka升级到最新版本。
发送消息
使用KafkaProducer发送消息。
1
2
3
4
5
6
7
8
9
10
11
12
|
from
kafka
import
KafkaProducer
kafka_host
=
'127.0.0.1'
# host
kafka_port
=
9092
# port
producer
=
KafkaProducer
(
bootstrap_servers
=
[
'{kafka_host}:{kafka_port}'
.
format
(
kafka_host
=
kafka_host
,
kafka_port
=
kafka
_port
)
]
)
message_string
=
'some message'
response
=
producer
.
send
(
kafka_topic
,
message_string
.
encode
(
'utf-8'
)
)
|
接收消息
使用KafkaComsuer接收消息。
1
2
3
4
5
6
7
8
9
10
11
|
from
kafka
import
KafkaConsumer
consumer
=
KafkaConsumer
(
'collector'
,
group_id
=
'my-group'
,
bootstrap_servers
=
[
'{kafka_host}:{kafka_port}'
.
format
(
kafka_host
=
KAFKA_HOST
,
kafka_port
=
KAFKA_PORT
)
]
)
for
message
in
consumer
:
content
=
json
.
loads
(
message
.
value
)
# ...
|
consumer可迭代,当队列中没有消息时,上面代码会一直等待。使用Control+C可以退出循环。
# 退出循环的处理方法
Control+C 会产生 KeyboardInterrupt 异常,在代码中捕获该异常,就可以执行必要的退出操作。
1
2
3
4
5
6
7
8
|
try
:
for
message
in
consumer
:
content
=
json
.
loads
(
message
.
value
)
#...
except
KeyboardInterrupt
,
e
:
print
"Catch Keyboard interrupt"
#...
|
踩过的坑
已经废弃的Simple APIs
新版kafka-python已经废弃的Simple APIs,包括KafkaClient和SimpleConsumer、SimpleProducer。
1
2
3
4
5
6
7
8
9
|
kafka
=
KafkaClient
(
"{kafka_host}:{kafka_port}"
.
format
(
kafka_host
=
KAFKA_HOST
,
kafka_port
=
KAFKA_PORT
)
)
consumer
=
SimpleConsumer
(
kafka
,
kafka_group
,
kafka_topic
)
for
message
in
consumer
:
# continue
content
=
json
.
loads
(
message
.
message
.
value
)
timestamp
=
content
[
'timestamp'
]
t
=
datetime
.
datetime
.
fromtimestamp
(
timestamp
)
print
t
.
strftime
(
"%Y-%m-%d %H:%M:%S"
)
,
content
[
'app'
]
,
content
[
'data'
]
[
'message'
]
|
多次创建KafkaProducer产生的问题
我在一个Flask应用中提供一个API接口,该接口向Kafka发送一条消息。原来使用Simple API,在每个controller函数中创建一个SimpleProducer。切换到KafkaProducer后,依然在每个controller中创建新的KafkaProducer。如下所示:
1
2
3
4
5
6
7
8
9
|
try
:
producer
=
KafkaProducer
(
bootstrap_servers
=
[
'{kafka_host}:{kafka_port}'
.
format
(
kafka_host
=
app
.
config
[
'KAFKA_HOST'
]
,
kafka_port
=
app
.
config
[
'KAFKA_PORT'
]
)
]
)
message_string
=
json
.
dumps
(
message
)
response
=
producer
.
send
(
kafka_topic
,
message_string
.
encode
(
'utf-8'
)
)
producer
.
close
(
)
|
但随后发生如下错误:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
Traceback
(
most
recent
call
last
)
:
File
"/usr/lib64/python2.7/site-packages/flask/app.py"
,
line
1836
,
in
__call__
File
"/usr/lib64/python2.7/site-packages/flask/app.py"
,
line
1820
,
in
wsgi_app
File
"/usr/lib64/python2.7/site-packages/flask/app.py"
,
line
1403
,
in
handle_exception
File
"/usr/lib64/python2.7/site-packages/flask/app.py"
,
line
1817
,
in
wsgi_app
File
"/usr/lib64/python2.7/site-packages/flask/app.py"
,
line
1477
,
in
full_dispatch_request
File
"/usr/lib64/python2.7/site-packages/flask/app.py"
,
line
1381
,
in
handle_user_exception
File
"/usr/lib64/python2.7/site-packages/flask/app.py"
,
line
1475
,
in
full_dispatch_request
File
"/usr/lib64/python2.7/site-packages/flask/app.py"
,
line
1461
,
in
dispatch_request
File
"/vagrant/nwpc_log_data_pipeline/producer_agent/agent/agent_controller.py"
,
line
49
,
in
get_collector_log
File
"/usr/lib/python2.7/site-packages/kafka/producer/kafka.py"
,
line
272
,
in
__init__
File
"/usr/lib/python2.7/site-packages/kafka/client_async.py"
,
line
129
,
in
__init__
File
"/usr/lib/python2.7/site-packages/kafka/selectors34.py"
,
line
422
,
in
__init__
IOError
:
[
Errno
24
]
Too
many
open
files
|
原因是每次创建KafkaProducer都会占用一个文件符号,controller结束时,没有释放,导致后面无法继续创建新的KafkaProducer。
解决方法是创建全局KafkaProducer,供所有controller使用。
慎用RecordMetadata.get()
官方例子中有如下的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
|
producer
=
KafkaProducer
(
bootstrap_servers
=
[
'broker1:1234'
]
)
# Asynchronous by default
future
=
producer
.
send
(
'my-topic'
,
b
'raw_bytes'
)
# Block for 'synchronous' sends
try
:
record_metadata
=
future
.
get
(
timeout
=
10
)
except
KafkaError
:
# Decide what to do if produce request failed...
log
.
exception
(
)
pass
|
KafkaProducer.send 返回 RecordMetadata 对象,RecordMetadata.get 可以获取 record 的信息。但在发送大量消息时,get方法可能会造成明显的延时。所以当我们不关心消息是否发送成功时,就不要调用get方法了。
更多推荐
所有评论(0)