Python操作Kafka
consumer#!/usr/bin/env python# -*- coding: utf-8 -*-# @author: yangyue# pip install kafkafrom kafka import KafkaProducerfrom kafka import KafkaConsumerfrom kafka.errors import KafkaErrorclass Kaf
·
consumer
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @author: yangyue
# pip install kafka
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
class Kafka_consumer():
'''
Consumer Module
'''
def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.groupid = groupid
self.consumer = KafkaConsumer(self.kafkatopic,
group_id = self.
groupid,bootstrap_servers='{kafka_host}:{kafka_port}'.format(kafka_host=self.kafkaHost,kafka_port=self.kafkaPort),
auto_offset_reset="smallest")
def consume_data(self):
try:
for message in self.consumer:
yield message
except KeyboardInterrupt, e:
print e
producer
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @author: yangyue
# pip install kafka
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
class Kafka_producer():
'''
Producer Module
'''
def __init__(self, kafkahost,kafkaport, kafkatopic):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.producer = KafkaProducer(bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort
))
def produce_data(self, params):
try:
producer = self.producer
producer.send(self.kafkatopic, params)
producer.flush()
except KafkaError as e:
print e
从kafka1拉取数据推送到kafka2
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @author: yangyue
# pip install kafka
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import datetime
import thread
import threading
class Kafka_producer():
'''
Producer Module
'''
def __init__(self, kafkahost,kafkaport, kafkatopic):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.producer = KafkaProducer(bootstrap_servers = '{kafka_host}:{kafka_port}'.format(
kafka_host=self.kafkaHost,
kafka_port=self.kafkaPort
))
def produce_data(self, params):
try:
producer = self.producer
producer.send(self.kafkatopic, params)
producer.flush()
except KafkaError as e:
print e
class Kafka_consumer():
'''
Consumer Module
'''
def __init__(self, kafkahost, kafkaport, kafkatopic, groupid):
self.kafkaHost = kafkahost
self.kafkaPort = kafkaport
self.kafkatopic = kafkatopic
self.groupid = groupid
self.consumer = KafkaConsumer(self.kafkatopic,
group_id = self.
groupid,bootstrap_servers='{kafka_host}:{kafka_port}'.format(kafka_host=self.kafkaHost,kafka_port=self.kafkaPort),
auto_offset_reset="smallest")
def consume_data(self):
try:
for message in self.consumer:
# print json.loads(message.value)
yield message
except KeyboardInterrupt, e:
print e
def external_to_internal_com(EHost,EPort,ETopic,EGroupid,IHost,IPort,ITopic):
'''
receive the External kafka data and send to Internal kafka
:param EHost: External Kafka Host
:param EPort: External Kafka Port
:param ETopic: External Kafka Topic
:param EGroupid: External Kafka Groupid
:param IHost: Internal Kafka Host
:param IPort: Internal Kafka Port
:param ITopic: Internal Kafka Topic
:return: none
'''
consumer = Kafka_consumer(EHost, EPort, ETopic, EGroupid)
producer = Kafka_producer(IHost, IPort, ITopic)
message = consumer.consume_data()
messageCount = 0
for i in message:
messageCount += 1
now = datetime.datetime.now()
now.strftime('%Y-%m-%d %H:%M:%S')
if messageCount % 5000==0:print now, " From External Topic: ", ETopic, " Send to Internal Topic ", ITopic, ">>>>>>>>>> count: ",messageCount
producer.sendjsondata(i.value)
def main():
'''
from external kafka to internal kafka
'''
EHost = "1.1.1.1"
EPort = 9092
IHost = "2.2.2.2"
IPort = 9092
GroupId = "external_to_internal_com"
PortInfoTopic = "topic1"
VulInfoTopic = "topic2"
ImageTopic = "topic3"
try:
#def external_to_internal_com(EHost,EPort,ETopic,EGroupid,IHost,IPort,ITopic):
thread.start_new_thread(external_to_internal_com, (EHost, EPort, PortInfoTopic, GroupId, IHost, IPort, PortInfoTopic, ))
thread.start_new_thread(external_to_internal_com, (EHost, EPort, VulInfoTopic, GroupId, IHost, IPort, VulInfoTopic, ))
thread.start_new_thread(external_to_internal_com, (EHost, EPort, ImageTopic, GroupId, IHost, IPort, ImageTopic, ))
except:
print "Error: unable to start thread"
while 1:
pass
if __name__ == '__main__':
main()
更多推荐
已为社区贡献1条内容
所有评论(0)