在基于hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于hive来实现统计分析,但是对于实时的需求hive就不合适了。实时应用场景可以使用storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以
在基于hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于hive来实现统计分析,但是对于实时的需求hive就不合适了。实时应用场景可以使用storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别进行分析处理,这时我们可以考虑将数据源(如使用flume收集日志)直接连接一个消息中间件,如kafka,可以整合flume+kafka,flume作为消息的producer,生产的消息数据(日志数据、业务请求数据等等)发布到kafka中,然后通过订阅的方式,使用storm的topology作为消息的consumer,在storm集群中分别进行如下两个需求场景的处理:
直接使用storm的topology对数据进行实时分析处理整合storm+hdfs,将消息处理后写入hdfs进行离线分析处理实时处理,只要开发满足业务需要的topology即可,不做过多说明。这里,我们主要从安装配置kafka、storm,以及整合kafka+storm、整合storm+hdfs、整合kafka+storm+hdfs这几点来配置实践,满足上面提出的一些需求。配置实践使用的软件包如下所示:
zookeeper-3.4.5.tar.gzkafka_2.9.2-0.8.1.1.tgzapache-storm-0.9.2-incubating.tar.gzhadoop-2.2.0.tar.gz程序配置运行所基于的操作系统为centos 5.11。
kafka安装配置
我们使用3台机器搭建kafka集群:
192.168.4.142 h1192.168.4.143 h2192.168.4.144 h3
在安装kafka集群之前,这里没有使用kafka自带的zookeeper,而是独立安装了一个zookeeper集群,也是使用这3台机器,保证zookeeper集群正常运行。
首先,在h1上准备kafka安装文件,执行如下命令:
cd /usr/local/wget http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgztar xvzf kafka_2.9.2-0.8.1.1.tgzln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafkachown -r kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
修改配置文件/usr/local/kafka/config/server.properties,修改如下内容:
broker.id=0zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka
这里需要说明的是,默认kafka会使用zookeeper默认的/路径,这样有关kafka的zookeeper配置就会散落在根路径下面,如果你有其他的应用也在使用zookeeper集群,查看zookeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在zookeeper.connect配置项中指定:
zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka
而且,需要手动在zookeeper中创建路径/kafka,使用如下命令连接到任意一台zookeeper服务器:
cd /usr/local/zookeeperbin/zkcli.sh
在zookeeper执行如下命令创建chroot路径:
create /kafka ''
这样,每次连接kafka集群的时候(使用--zookeeper选项),也必须使用带chroot路径的连接字符串,后面会看到。
然后,将配置好的安装文件同步到其他的h2、h3节点上:
scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h2:/usr/local/scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h3:/usr/local/
最后,在h2、h3节点上配置,执行如下命令:
cd /usr/local/ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafkachown -r kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
并修改配置文件/usr/local/kafka/config/server.properties内容如下所示:
broker.id=1 # 在h1修改broker.id=2 # 在h2修改
因为kafka集群需要保证各个broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个broker进程来模拟分布式的kafka集群,也需要broker的id唯一,还需要修改一些配置目录的信息)。
在集群中的h1、h2、h3这三个节点上分别启动kafka,分别执行如下命令:
bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
可以通过查看日志,或者检查进程状态,保证kafka集群启动成功。
我们创建一个名称为my-replicated-topic5的topic,5个分区,并且复制因子为3,执行如下命令:
bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
查看创建的topic,执行如下命令:
bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5
结果信息如下所示:
topic:my-replicated-topic5 partitioncount:5 replicationfactor:3 configs: topic: my-replicated-topic5 partition: 0 leader: 0 replicas: 0,2,1 isr: 0,2,1 topic: my-replicated-topic5 partition: 1 leader: 0 replicas: 1,0,2 isr: 0,2,1 topic: my-replicated-topic5 partition: 2 leader: 2 replicas: 2,1,0 isr: 2,0,1 topic: my-replicated-topic5 partition: 3 leader: 0 replicas: 0,1,2 isr: 0,2,1 topic: my-replicated-topic5 partition: 4 leader: 2 replicas: 1,2,0 isr: 2,0,1
上面leader、replicas、isr的含义如下:
partition: 分区leader : 负责读写指定分区的节点replicas : 复制该分区log的节点列表isr : in-sync replicas,当前活跃的副本列表(是一个子集),并且可能成为leader
我们可以通过kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
在一个终端,启动producer,并向我们上面创建的名称为my-replicated-topic5的topic中生产消息,执行如下脚本:
bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5
在另一个终端,启动consumer,并订阅我们上面创建的名称为my-replicated-topic5的topic中生产的消息,执行如下脚本:
bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5
可以在producer终端上输入字符串消息行,然后回车,就可以在consumer终端上看到消费者消费的消息内容。
也可以参考kafka的producer和consumer的java api,通过api编码的方式来实现消息生产和消费的处理逻辑。
storm安装配置
storm集群也依赖zookeeper集群,要保证zookeeper集群正常运行。storm的安装配置比较简单,我们仍然使用下面3台机器搭建:
192.168.4.142 h1192.168.4.143 h2192.168.4.144 h3
首先,在h1节点上,执行如下命令安装:
cd /usr/local/wget http://mirror.bit.edu.cn/apache/incubator/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.tar.gztar xvzf apache-storm-0.9.2-incubating.tar.gzln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/stormchown -r storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
然后,修改配置文件conf/storm.yaml,内容如下所示:
storm.zookeeper.servers: - h1 - h2 - h3storm.zookeeper.port: 2181#nimbus.host: h1supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703storm.local.dir: /tmp/storm
将配置好的安装文件,分发到其他节点上:
scp -r /usr/local/apache-storm-0.9.2-incubating/ h2:/usr/local/scp -r /usr/local/apache-storm-0.9.2-incubating/ h3:/usr/local/
最后,在h2、h3节点上配置,执行如下命令:
cd /usr/local/ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/stormchown -r storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
storm集群的主节点为nimbus,从节点为supervisor,我们需要在h1上启动nimbus服务,在从节点h2、h3上启动supervisor服务:
bin/storm nimbus &bin/storm supervisor &
为了方便监控,可以启动storm ui,可以从web页面上监控storm topology的运行状态,例如在h2上启动:
bin/storm ui &
这样可以通过访问http://h2:8080/来查看topology的运行状况。
整合kafka+storm
消息通过各种方式进入到kafka消息中间件,比如可以通过使用flume来收集日志数据,然后在kafka中路由暂存,然后再由实时计算程序storm做实时分析,这时我们就需要将在storm的spout中读取kafka中的消息,然后交由具体的spot组件去分析处理。实际上,apache-storm-0.9.2-incubating这个版本的storm已经自带了一个集成kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的maven依赖配置,如下所示:
org.apache.stormstorm-core0.9.2-incubatingprovidedorg.apache.stormstorm-kafka0.9.2-incubatingorg.apache.kafkakafka_2.9.20.8.1.1org.apache.zookeeperzookeeperlog4jlog4j
下面,我们开发了一个简单wordcount示例程序,从kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,实现的topology的代码,如下所示:
package org.shirdrn.storm.examples;import java.util.arrays;import java.util.hashmap;import java.util.iterator;import java.util.map;import java.util.map.entry;import java.util.concurrent.atomic.atomicinteger;import org.apache.commons.logging.log;import org.apache.commons.logging.logfactory;import storm.kafka.brokerhosts;import storm.kafka.kafkaspout;import storm.kafka.spoutconfig;import storm.kafka.stringscheme;import storm.kafka.zkhosts;import backtype.storm.config;import backtype.storm.localcluster;import backtype.storm.stormsubmitter;import backtype.storm.generated.alreadyaliveexception;import backtype.storm.generated.invalidtopologyexception;import backtype.storm.spout.schemeasmultischeme;import backtype.storm.task.outputcollector;import backtype.storm.task.topologycontext;import backtype.storm.topology.outputfieldsdeclarer;import backtype.storm.topology.topologybuilder;import backtype.storm.topology.base.baserichbolt;import backtype.storm.tuple.fields;import backtype.storm.tuple.tuple;import backtype.storm.tuple.values;public class mykafkatopology { public static class kafkawordsplitter extends baserichbolt { private static final log log = logfactory.getlog(kafkawordsplitter.class); private static final long serialversionuid = 886149197481637894l; private outputcollector collector; @override public void prepare(map stormconf, topologycontext context, outputcollector collector) { this.collector = collector; } @override public void execute(tuple input) { string line = input.getstring(0); log.info(recv[kafka -> splitter] + line); string[] words = line.split(\\s+); for(string word : words) { log.info(emit[splitter -> counter] + word); collector.emit(input, new values(word, 1)); } collector.ack(input); } @override public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields(word, count)); } } public static class wordcounter extends baserichbolt { private static final log log = logfactory.getlog(wordcounter.class); private static final long serialversionuid = 886149197481637894l; private outputcollector collector; private map countermap; @override public void prepare(map stormconf, topologycontext context, outputcollector collector) { this.collector = collector; this.countermap = new hashmap(); } @override public void execute(tuple input) { string word = input.getstring(0); int count = input.getinteger(1); log.info(recv[splitter -> counter] + word + : + count); atomicinteger ai = this.countermap.get(word); if(ai == null) { ai = new atomicinteger(); this.countermap.put(word, ai); } ai.addandget(count); collector.ack(input); log.info(check statistics map: + this.countermap); } @override public void cleanup() { log.info(the final result:); iterator> iter = this.countermap.entryset().iterator(); while(iter.hasnext()) { entry entry = iter.next(); log.info(entry.getkey() + \t:\t + entry.getvalue().get()); } } @override public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields(word, count)); } } public static void main(string[] args) throws alreadyaliveexception, invalidtopologyexception, interruptedexception { string zks = h1:2181,h2:2181,h3:2181; string topic = my-replicated-topic5; string zkroot = /storm; // default zookeeper root configuration for storm string id = word; brokerhosts brokerhosts = new zkhosts(zks); spoutconfig spoutconf = new spoutconfig(brokerhosts, topic, zkroot, id); spoutconf.scheme = new schemeasmultischeme(new stringscheme()); spoutconf.forcefromstart = false; spoutconf.zkservers = arrays.aslist(new string[] {h1, h2, h3}); spoutconf.zkport = 2181; topologybuilder builder = new topologybuilder(); builder.setspout(kafka-reader, new kafkaspout(spoutconf), 5); // kafka我们创建了一个5分区的topic,这里并行度设置为5 builder.setbolt(word-splitter, new kafkawordsplitter(), 2).shufflegrouping(kafka-reader); builder.setbolt(word-counter, new wordcounter()).fieldsgrouping(word-splitter, new fields(word)); config conf = new config(); string name = mykafkatopology.class.getsimplename(); if (args != null && args.length > 0) { // nimbus host name passed from command line conf.put(config.nimbus_host, args[0]); conf.setnumworkers(3); stormsubmitter.submittopologywithprogressbar(name, conf, builder.createtopology()); } else { conf.setmaxtaskparallelism(3); localcluster cluster = new localcluster(); cluster.submittopology(name, conf, builder.createtopology()); thread.sleep(60000); cluster.shutdown(); } }}
上面程序,在本地调试(使用localcluster)不需要输入任何参数,提交到实际集群中运行时,需要传递一个参数,该参数为nimbus的主机名称。
通过maven构建,生成一个包含依赖的single jar文件(不要把storm的依赖包添加进去),例如storm-examples-0.0.1-snapshot.jar,在提交topology程序到storm集群之前,因为用到了kafka,需要拷贝一下依赖jar文件到storm集群中的lib目录下面:
cp /usr/local/kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/local/storm/lib/cp /usr/local/kafka/libs/scala-library-2.9.2.jar /usr/local/storm/lib/cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/cp /usr/local/kafka/libs/snappy-java-1.0.5.jar /usr/local/storm/lib/cp /usr/local/kafka/libs/zkclient-0.3.jar /usr/local/storm/lib/cp /usr/local/kafka/libs/log4j-1.2.15.jar /usr/local/storm/lib/cp /usr/local/kafka/libs/slf4j-api-1.7.2.jar /usr/local/storm/lib/cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/
然后,就可以提交我们开发的topology程序了:
bin/storm jar /home/storm/storm-examples-0.0.1-snapshot.jar org.shirdrn.storm.examples.mykafkatopology h1
可以通过查看日志文件(logs/目录下)或者storm ui来监控topology的运行状况。如果程序没有错误,可以使用前面我们使用的kafka producer来生成消息,就能看到我们开发的storm topology能够实时接收到并进行处理。
上面topology实现代码中,有一个很关键的配置对象spoutconfig,配置属性如下所示:
spoutconf.forcefromstart = false;
该配置是指,如果该topology因故障停止处理,下次正常运行时是否从spout对应数据源kafka中的该订阅topic的起始位置开始读取,如果forcefromstart=true,则之前处理过的tuple还要重新处理一遍,否则会从上次处理的位置继续处理,保证kafka中的topic数据不被重复处理,是在数据源的位置进行状态记录。
整合storm+hdfs
storm实时计算集群从kafka消息中间件中消费消息,有实时处理需求的可以走实时处理程序,还有需要进行离线分析的需求,如写入到hdfs进行分析。下面实现了一个topology,代码如下所示:
package org.shirdrn.storm.examples;import java.text.dateformat;import java.text.simpledateformat;import java.util.date;import java.util.map;import java.util.random;import org.apache.commons.logging.log;import org.apache.commons.logging.logfactory;import org.apache.storm.hdfs.bolt.hdfsbolt;import org.apache.storm.hdfs.bolt.format.defaultfilenameformat;import org.apache.storm.hdfs.bolt.format.delimitedrecordformat;import org.apache.storm.hdfs.bolt.format.filenameformat;import org.apache.storm.hdfs.bolt.format.recordformat;import org.apache.storm.hdfs.bolt.rotation.filerotationpolicy;import org.apache.storm.hdfs.bolt.rotation.timedrotationpolicy;import org.apache.storm.hdfs.bolt.rotation.timedrotationpolicy.timeunit;import org.apache.storm.hdfs.bolt.sync.countsyncpolicy;import org.apache.storm.hdfs.bolt.sync.syncpolicy;import backtype.storm.config;import backtype.storm.localcluster;import backtype.storm.stormsubmitter;import backtype.storm.generated.alreadyaliveexception;import backtype.storm.generated.invalidtopologyexception;import backtype.storm.spout.spoutoutputcollector;import backtype.storm.task.topologycontext;import backtype.storm.topology.outputfieldsdeclarer;import backtype.storm.topology.topologybuilder;import backtype.storm.topology.base.baserichspout;import backtype.storm.tuple.fields;import backtype.storm.tuple.values;import backtype.storm.utils.utils;public class stormtohdfstopology { public static class eventspout extends baserichspout { private static final log log = logfactory.getlog(eventspout.class); private static final long serialversionuid = 886149197481637894l; private spoutoutputcollector collector; private random rand; private string[] records; @override public void open(map conf, topologycontext context, spoutoutputcollector collector) { this.collector = collector; rand = new random(); records = new string[] { 10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 huawei g610-u00 huawei 2 70:72:3c:73:8b:22 2014-10-13 12:36:35, 10001 ffb52739a29348a67952e47c12da54ef 4.3 gt-i9300 samsung 2 50:cc:f8:e4:22:e2 2014-10-13 12:36:02, 10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 huawei g610-u00 huawei 2 70:72:3c:73:8b:22 2014-10-13 12:36:35 }; } @override public void nexttuple() { utils.sleep(1000); dateformat df = new simpledateformat(yyyy-mm-dd_hh-mm-ss); date d = new date(system.currenttimemillis()); string minute = df.format(d); string record = records[rand.nextint(records.length)]; log.info(emit[spout -> hdfs] + minute + : + record); collector.emit(new values(minute, record)); } @override public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields(minute, record)); } } public static void main(string[] args) throws alreadyaliveexception, invalidtopologyexception, interruptedexception { // use | instead of , for field delimiter recordformat format = new delimitedrecordformat() .withfielddelimiter( : ); // sync the filesystem after every 1k tuples syncpolicy syncpolicy = new countsyncpolicy(1000); // rotate files filerotationpolicy rotationpolicy = new timedrotationpolicy(1.0f, timeunit.minutes); filenameformat filenameformat = new defaultfilenameformat() .withpath(/storm/).withprefix(app_).withextension(.log); hdfsbolt hdfsbolt = new hdfsbolt() .withfsurl(hdfs://h1:8020) .withfilenameformat(filenameformat) .withrecordformat(format) .withrotationpolicy(rotationpolicy) .withsyncpolicy(syncpolicy); topologybuilder builder = new topologybuilder(); builder.setspout(event-spout, new eventspout(), 3); builder.setbolt(hdfs-bolt, hdfsbolt, 2).fieldsgrouping(event-spout, new fields(minute)); config conf = new config(); string name = stormtohdfstopology.class.getsimplename(); if (args != null && args.length > 0) { conf.put(config.nimbus_host, args[0]); conf.setnumworkers(3); stormsubmitter.submittopologywithprogressbar(name, conf, builder.createtopology()); } else { conf.setmaxtaskparallelism(3); localcluster cluster = new localcluster(); cluster.submittopology(name, conf, builder.createtopology()); thread.sleep(60000); cluster.shutdown(); } }}
上面的处理逻辑,可以对hdfsbolt进行更加详细的配置,如filenameformat、syncpolicy、filerotationpolicy(可以设置在满足什么条件下,切出一个新的日志,如可以指定多长时间切出一个新的日志文件,可以指定一个日志文件大小达到设置值后,再写一个新日志文件),更多设置可以参考storm-hdfs,。
上面代码在打包的时候,需要注意,使用storm-starter自带的maven打包配置,可能在将topology部署运行的时候,会报错,可以使用maven-shade-plugin这个插件,如下配置所示:
org.apache.maven.pluginsmaven-shade-plugin1.4truepackageshade
整合kafka+storm+hdfs
上面分别对整合kafka+storm和storm+hdfs做了实践,可以将后者的spout改成前者的spout,从kafka中消费消息,在storm中可以做简单处理,然后将数据写入hdfs,最后可以在hadoop平台上对数据进行离线分析处理。下面,写了一个简单的例子,从kafka消费消息,然后经由storm处理,写入到hdfs存储,代码如下所示:
package org.shirdrn.storm.examples;import java.util.arrays;import java.util.map;import org.apache.commons.logging.log;import org.apache.commons.logging.logfactory;import org.apache.storm.hdfs.bolt.hdfsbolt;import org.apache.storm.hdfs.bolt.format.defaultfilenameformat;import org.apache.storm.hdfs.bolt.format.delimitedrecordformat;import org.apache.storm.hdfs.bolt.format.filenameformat;import org.apache.storm.hdfs.bolt.format.recordformat;import org.apache.storm.hdfs.bolt.rotation.filerotationpolicy;import org.apache.storm.hdfs.bolt.rotation.timedrotationpolicy;import org.apache.storm.hdfs.bolt.rotation.timedrotationpolicy.timeunit;import org.apache.storm.hdfs.bolt.sync.countsyncpolicy;import org.apache.storm.hdfs.bolt.sync.syncpolicy;import storm.kafka.brokerhosts;import storm.kafka.kafkaspout;import storm.kafka.spoutconfig;import storm.kafka.stringscheme;import storm.kafka.zkhosts;import backtype.storm.config;import backtype.storm.localcluster;import backtype.storm.stormsubmitter;import backtype.storm.generated.alreadyaliveexception;import backtype.storm.generated.invalidtopologyexception;import backtype.storm.spout.schemeasmultischeme;import backtype.storm.task.outputcollector;import backtype.storm.task.topologycontext;import backtype.storm.topology.outputfieldsdeclarer;import backtype.storm.topology.topologybuilder;import backtype.storm.topology.base.baserichbolt;import backtype.storm.tuple.fields;import backtype.storm.tuple.tuple;import backtype.storm.tuple.values;public class distributewordtopology { public static class kafkawordtouppercase extends baserichbolt { private static final log log = logfactory.getlog(kafkawordtouppercase.class); private static final long serialversionuid = -5207232012035109026l; private outputcollector collector; @override public void prepare(map stormconf, topologycontext context, outputcollector collector) { this.collector = collector; } @override public void execute(tuple input) { string line = input.getstring(0).trim(); log.info(recv[kafka -> splitter] + line); if(!line.isempty()) { string upperline = line.touppercase(); log.info(emit[splitter -> counter] + upperline); collector.emit(input, new values(upperline, upperline.length())); } collector.ack(input); } @override public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new fields(line, len)); } } public static class realtimebolt extends baserichbolt { private static final log log = logfactory.getlog(kafkawordtouppercase.class); private static final long serialversionuid = -4115132557403913367l; private outputcollector collector; @override public void prepare(map stormconf, topologycontext context, outputcollector collector) { this.collector = collector; } @override public void execute(tuple input) { string line = input.getstring(0).trim(); log.info(realtime: + line); collector.ack(input); } @override public void declareoutputfields(outputfieldsdeclarer declarer) { } } public static void main(string[] args) throws alreadyaliveexception, invalidtopologyexception, interruptedexception { // configure kafka string zks = h1:2181,h2:2181,h3:2181; string topic = my-replicated-topic5; string zkroot = /storm; // default zookeeper root configuration for storm string id = word; brokerhosts brokerhosts = new zkhosts(zks); spoutconfig spoutconf = new spoutconfig(brokerhosts, topic, zkroot, id); spoutconf.scheme = new schemeasmultischeme(new stringscheme()); spoutconf.forcefromstart = false; spoutconf.zkservers = arrays.aslist(new string[] {h1, h2, h3}); spoutconf.zkport = 2181; // configure hdfs bolt recordformat format = new delimitedrecordformat() .withfielddelimiter(\t); // use \t instead of , for field delimiter syncpolicy syncpolicy = new countsyncpolicy(1000); // sync the filesystem after every 1k tuples filerotationpolicy rotationpolicy = new timedrotationpolicy(1.0f, timeunit.minutes); // rotate files filenameformat filenameformat = new defaultfilenameformat() .withpath(/storm/).withprefix(app_).withextension(.log); // set file name format hdfsbolt hdfsbolt = new hdfsbolt() .withfsurl(hdfs://h1:8020) .withfilenameformat(filenameformat) .withrecordformat(format) .withrotationpolicy(rotationpolicy) .withsyncpolicy(syncpolicy); // configure & build topology topologybuilder builder = new topologybuilder(); builder.setspout(kafka-reader, new kafkaspout(spoutconf), 5); builder.setbolt(to-upper, new kafkawordtouppercase(), 3).shufflegrouping(kafka-reader); builder.setbolt(hdfs-bolt, hdfsbolt, 2).shufflegrouping(to-upper); builder.setbolt(realtime, new realtimebolt(), 2).shufflegrouping(to-upper); // submit topology config conf = new config(); string name = distributewordtopology.class.getsimplename(); if (args != null && args.length > 0) { string nimbus = args[0]; conf.put(config.nimbus_host, nimbus); conf.setnumworkers(3); stormsubmitter.submittopologywithprogressbar(name, conf, builder.createtopology()); } else { conf.setmaxtaskparallelism(3); localcluster cluster = new localcluster(); cluster.submittopology(name, conf, builder.createtopology()); thread.sleep(60000); cluster.shutdown(); } }}
上面代码中,名称为to-upper的bolt将接收到的字符串行转换成大写以后,会将处理过的数据向后面的hdfs-bolt、realtime这两个bolt各发一份拷贝,然后由这两个bolt分别根据实际需要(实时/离线)单独处理。
打包后,在storm集群上部署并运行这个topology:
bin/storm jar ~/storm-examples-0.0.1-snapshot.jar org.shirdrn.storm.examples.distributewordtopology h1
可以通过storm ui查看topology运行情况,可以查看hdfs上生成的数据。
参考链接
http://kafka.apache.org/http://kafka.apache.org/documentation.htmlhttps://cwiki.apache.org/confluence/display/kafka/consumer+group+examplehttp://storm.apache.org/http://storm.apache.org/documentation/tutorial.htmlhttp://storm.apache.org/documentation/faq.htmlhttps://github.com/ptgoetz/storm-hdfs 原文地址:kafka+storm+hdfs整合实践, 感谢原作者分享。