log:

pip3 install kafka

pip3 install kafka-python

1.通过脚本实现让kafka生产测试数据,测试下游业务服务性能

2.可以增加线程池,让多线程并发执行,效果更好

# !/usr/bin/env python
# -*- encoding: utf-8 -*-
# @Author : yyq

import datetime
import json
import random
import time
from itertools import islice
from kafka import KafkaProducer
time1 = time.time()
#生产的机器
producer = KafkaProducer(bootstrap_servers='192.168.xx.xx:9092,192.168.xx.xx:9092')

# 消息体
data = {"a":"12","b":"test"}
#获取文件数据
a = open("id.TXT", "r")

for i in range(3):
    #更新json数据
    data["currentGradeId"] = random.randint(335501, 335506)
    data["occurrence_ts"] = int(time.time()*1000)
    data["detection_ts"] = int(time.time()*1000)
    data["key"] = "event" + str(i)
    #传入文件变量
    for k in islice(a, 0, i):
        #移除读取到的回车符
        line = k.strip('\n')
        data["memberId"] = str(line)
        #格式为字符类型
        msg = json.dumps(data).encode()
        #写入消息
        future = producer.send('topic', msg)
        record_metadata = future.get(timeout=10)
        #print(record_metadata, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
time2 = time.time()
print("总耗时:", time2-time1)

注意事项:包名或者文件命名不要与第三方库同名

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐