1. 本篇主要讲述,flume收集日志数据,这里写了脚本,日志数据每隔一秒写入一条记录,flume将记录写到Kafka中,供Storm进行处理,这里的storm,可参考的我的其它文章,这里就不讲了。
  2. 产生数据脚本。
    python脚本
#!/usr/bin/python
import time
file1=open("/opt/app/log.log","a");
mess="message";
file1.write(mess)
file1.write('\r\n')
file1.close()

shell脚本

while [ true ]; do
/bin/sleep 1
/opt/app/pr.py
done

3.flume配置文件,参考了网上的代码。

a1.sources = r1  
a1.sinks = k1  
a1.channels = c1  

# Describe/configure the source  
a1.sources.r1.type = exec  
a1.sources.r1.command = tail -F /home/airib/work/log.log  

# Describe the sink  
#a1.sinks.k1.type = logger  
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink  
a1.sinks.k1.topic = test  
a1.sinks.k1.brokerList = localhost:9092  
a1.sinks.k1.requiredAcks = 1  
a1.sinks.k1.batchSize = 20  

# Use a channel which buffers events in memory  
a1.channels.c1.type = memory  
a1.channels.c1.capacity = 1000  
a1.channels.c1.transactionCapacity = 100  

# Bind the source and sink to the channel  
a1.sources.r1.channels = c1  
a1.sinks.k1.channel = c1  

4.执行命令
bin/flume-ng agent –conf conf –conf-file conf/flume-conf.properties –name a1 -Dflume.root.logger=INFO,console

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐