您好,欢迎访问一九零五行业门户网

使用flume+kafka+storm构建实时日志分析系统_PHP教程

使用flume+kafka+storm构建实时日志分析系统本文只会涉及flume和kafka的结合,kafka和storm的结合可以参考其他博客
1. flume安装使用
下载flume安装包http://www.apache.org/dyn/closer.cgi/flume/1.5.2/apache-flume-1.5.2-bin.tar.gz
解压$ tar -xzvf apache-flume-1.5.2-bin.tar.gz -c /opt/flume
flume配置文件放在conf文件目录下,执行文件放在bin文件目录下。
1)配置flume
进入conf目录将flume-conf.properties.template拷贝一份,并命名为自己需要的名字
$ cp flume-conf.properties.template flume.conf
修改flume.conf的内容,我们使用file sink来接收channel中的数据,channel采用memory channel,source采用exec source,配置文件如下:
agent.sources = seqgensrcagent.channels = memorychannelagent.sinks = loggersink# for each one of the sources, the type is definedagent.sources.seqgensrc.type = execagent.sources.seqgensrc.command = tail -f /data/mongodata/mongo.log#agent.sources.seqgensrc.bind = 172.168.49.130# the channel can be defined as follows.agent.sources.seqgensrc.channels = memorychannel# each sink's type must be definedagent.sinks.loggersink.type = file_rollagent.sinks.loggersink.sink.directory = /data/flume#specify the channel the sink should useagent.sinks.loggersink.channel = memorychannel# each channel's type is defined.agent.channels.memorychannel.type = memory# other config values specific to each type of channel(sink or source)# can be defined as well# in this case, it specifies the capacity of the memory channelagent.channels.memorychannel.capacity = 1000agent.channels.memory4log.transactioncapacity = 100
2)运行flume agent
切换到bin目录下,运行一下命令:
$ ./flume-ng agent --conf ../conf -f ../conf/flume.conf --n agent -dflume.root.logger=info,console
在/data/flume目录下可以看到生成的日志文件。
2. 结合kafka
由于flume1.5.2没有kafka sink,所以需要自己开发kafka sink
可以参考flume 1.6里面的kafka sink,但是要注意使用的kafka版本,由于有些kafka api不兼容的
这里只提供核心代码,process()内容。
sink.status status = status.ready;
channel ch = getchannel();
transaction transaction = null;
event event = null;
string eventtopic = null;
string eventkey = null;
try {
transaction = ch.gettransaction();
transaction.begin();
messagelist.clear();
if (type.equals(sync)) {
event = ch.take();
if (event != null) {
byte[] tempbody = event.getbody();
string eventbody = new string(tempbody,utf-8);
map headers = event.getheaders();
if ((eventtopic = headers.get(topic_hdr)) == null) {
eventtopic = topic;
}
eventkey = headers.get(key_hdr);
if (logger.isdebugenabled()) {
logger.debug({event} + eventtopic + : + eventkey + :
+ eventbody);
}
producerdata data = new producerdata
(eventtopic, new message(tempbody));
long starttime = system.nanotime();
logger.debug(eventtopic++++++eventbody);
producer.send(data);
long endtime = system.nanotime(); }
} else {
long processedevents = 0;
for (; processedevents 0) {
long starttime = system.nanotime(); long endtime = system.nanotime(); }
}
transaction.commit();
} catch (exception ex) {
string errormsg = failed to publish events;
logger.error(failed to publish events, ex);
status = status.backoff;
if (transaction != null) {
try {
transaction.rollback(); } catch (exception e) {
logger.error(transaction rollback failed, e);
throw throwables.propagate(e);
}
}
throw new eventdeliveryexception(errormsg, ex);
} finally {
if (transaction != null) {
transaction.close();
}
}
return status;
下一步,修改flume配置文件,将其中sink部分的配置改成kafka sink,如:
producer.sinks.r.type = org.apache.flume.sink.kafka.kafkasink
producer.sinks.r.brokerlist = bigdata-node00:9092
producer.sinks.r.requiredacks = 1
producer.sinks.r.batchsize = 100
#producer.sinks.r.kafka.producer.type=async
#producer.sinks.r.kafka.customer.encoding=utf-8
producer.sinks.r.topic = testflume1
type指向kafkasink所在的完整路径
下面的参数都是kafka的一系列参数,最重要的是brokerlist和topic参数
现在重新启动flume,就可以在kafka的对应topic下查看到对应的日志
http://www.bkjia.com/phpjc/1109725.htmlwww.bkjia.comtruehttp://www.bkjia.com/phpjc/1109725.htmltecharticle使用flume+kafka+storm构建实时日志分析系统 本文只会涉及flume和kafka的结合,kafka和storm的结合可以参考其他博客 1. flume安装使用 下载flume安装...
其它类似信息

推荐信息