工作中会在开发环境中测试生产kafka消息,该脚本简单的实现了这一功能。

#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient
import json
import logging
logging.basicConfig(level=logging.INFO)

client = KafkaClient(hosts="102.2.20.31:15386")  # 可接受多个Client,多个broker

# print(client.topics) # 查看所有topic

def sendDevKafkaMsg(topic, message):
    try:
        topic = client.topics[topic]  # 选择一个topic
        producer = topic.get_producer(delivery_reports=True)
        producer.produce(bytes(message, encoding="utf8"))
        producer.get_delivery_report() # 返回之前发送失败的消息和结果
    except Exception as e:
        print(e)


sendDevKafkaMsg("TEST_TOPIC", json.dumps(data1))

微信公众号:
在这里插入图片描述

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐