Kafka学习之二:Python客户端
参考:kafka-pythonkafka-python–Python官网kafka-python–Githubkafka 学习笔记(四)之Python客户端kafka-python安装Linux下的压缩zip,解压缩unzip命令详解及实例1 安装kafka-python从 https://github.com/dpkp/kafka-python.git 下载,传到...
·
参考:
kafka-python
kafka-python–Python官网
kafka-python–Github
kafka 学习笔记(四)之Python客户端
kafka-python安装
Linux下的压缩zip,解压缩unzip命令详解及实例
1 安装kafka-python
从 https://github.com/dpkp/kafka-python.git 下载,传到服务器
执行如下命令安装:
unzip kafka-python-master.zip
cd kafka-python-master
python setup.py install
下面程序应用到kafka中test,请参考Kafka学习之一 :安装启动
2 查看Kafka-python安装情况
执行下面一些命令:
[hao973@localhost ~]$ python
Python 2.7.9 (default, Dec 18 2017, 01:29:09)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from kafka import KafkaClient, SimpleProducer, SimpleConsumer
>>> kafka = KafkaClient("localhost:9092")
>>> producer = SimpleProducer(kafka)
>>> producer.send_messages("test","Hello world!")
[ProduceResponsePayload(topic=u'test', partition=0, error=0, offset=5)]
>>>
值得注意的是,Python客户端不需要想Java客户端一样连接zookeeper,Python客户端的运行和Java版的是两个project,所以方法不一样。
用kafka自带的consumer console,消费者输出结果:
使用python的consumer如下:
>>> consumer = SimpleConsumer(kafka,"python","test")
>>> for msg in consumer:
... print (msg)
退出命令:
kafka.close()
exit()
3 Python简单代码示例:
类java的kafka API:KafkaConsumer和KafkaProducer.
KafkaProducer示例:
#!/usr/bin/python
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# Asynchronous by default
future = producer.send('test', b'raw_bytes')
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
# produce keyed messages to enable hashed partitioning
ret = producer.send('test', key=b'foo', value=b'bar')
# Block for 'synchronous' sends
try:
record = ret.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass
# Successful result returns assigned partition and offset
print (record.topic)
print (record.partition)
print (record.offset)
# produce asynchronously
for i in range(100):
producer.send('test', b'message %d' % i)
# block until all async messages are sent
producer.flush()
KafkaConsumer示例:
#!/usr/bin/python
from kafka import KafkaConsumer
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('test',
bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
更多推荐
已为社区贡献1条内容
所有评论(0)