参考:
kafka-python
kafka-python–Python官网
kafka-python–Github
kafka 学习笔记(四)之Python客户端
kafka-python安装
Linux下的压缩zip,解压缩unzip命令详解及实例

1 安装kafka-python

https://github.com/dpkp/kafka-python.git 下载,传到服务器
执行如下命令安装:

unzip  kafka-python-master.zip
cd kafka-python-master
python setup.py install

下面程序应用到kafka中test,请参考Kafka学习之一 :安装启动
2 查看Kafka-python安装情况
执行下面一些命令:

[hao973@localhost ~]$ python
Python 2.7.9 (default, Dec 18 2017, 01:29:09) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from kafka import KafkaClient, SimpleProducer, SimpleConsumer
>>> kafka = KafkaClient("localhost:9092")
>>> producer = SimpleProducer(kafka)
>>> producer.send_messages("test","Hello world!")
[ProduceResponsePayload(topic=u'test', partition=0, error=0, offset=5)]
>>> 

值得注意的是,Python客户端不需要想Java客户端一样连接zookeeper,Python客户端的运行和Java版的是两个project,所以方法不一样。

用kafka自带的consumer console,消费者输出结果:
这里写图片描述

使用python的consumer如下:

>>> consumer = SimpleConsumer(kafka,"python","test")
>>> for msg in consumer:
...  print (msg)

这里写图片描述

退出命令:

kafka.close()
exit()

3 Python简单代码示例:

类java的kafka API:KafkaConsumer和KafkaProducer.
KafkaProducer示例:

#!/usr/bin/python
from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

# Asynchronous by default
future = producer.send('test', b'raw_bytes')

# Block for 'synchronous' sends
try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass

# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

# produce keyed messages to enable hashed partitioning
ret = producer.send('test', key=b'foo', value=b'bar')

# Block for 'synchronous' sends
try:
    record = ret.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass

# Successful result returns assigned partition and offset
print (record.topic)
print (record.partition)
print (record.offset)

# produce asynchronously
for i in range(100):
    producer.send('test', b'message %d' % i)

# block until all async messages are sent
producer.flush()

KafkaConsumer示例:

#!/usr/bin/python
from kafka import KafkaConsumer 

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('test',
                         bootstrap_servers=['localhost:9092'])

for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                         message.offset, message.key,
                                          message.value))

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐