前面有说道spark-streaming的简单demo,也有说到kafka成功跑通的例子,这里就结合二者,也是常用的使用之一。
1.相关组件版本
首先确认版本,因为跟之前的版本有些不一样,所以才有必要记录下,另外仍然没有使用scala,使用java8,spark 2.0.0,kafka 0.10。
2.引入maven包
网上找了一些结合的例子,但是跟我当前版本不一样,所以根本就成功不了,所以探究了下,列出引入包。
<dependency> <groupid>org.apache.spark</groupid> <artifactid>spark-streaming-kafka-0-10_2.11</artifactid> <version>2.0.0</version></dependency>
网上能找到的不带kafka版本号的包最新是1.6.3,我试过,已经无法在spark2下成功运行了,所以找到的是对应kafka0.10的版本,注意spark2.0的scala版本已经是2.11,所以包括之前必须后面跟2.11,表示scala版本。
3.sparksteamingkafka类
需要注意的是引入的包路径是org.apache.spark.streaming.kafka010.xxx,所以这里把import也放进来了。其他直接看注释。
import java.util.arrays;import java.util.collection;import java.util.hashmap;import java.util.hashset;import java.util.map;import org.apache.kafka.clients.consumer.consumerrecord;import org.apache.kafka.common.topicpartition;import org.apache.spark.sparkconf;import org.apache.spark.api.java.javasparkcontext;import org.apache.spark.streaming.durations;import org.apache.spark.streaming.api.java.javainputdstream;import org.apache.spark.streaming.api.java.javapairdstream;import org.apache.spark.streaming.api.java.javastreamingcontext;import org.apache.spark.streaming.kafka010.consumerstrategies;import org.apache.spark.streaming.kafka010.kafkautils;import org.apache.spark.streaming.kafka010.locationstrategies;import scala.tuple2;public class sparksteamingkafka { public static void main(string[] args) throws interruptedexception { string brokers = "master2:6667"; string topics = "topic1"; sparkconf conf = new sparkconf().setmaster("local[2]").setappname("streaming word count"); javasparkcontext sc = new javasparkcontext(conf); sc.setloglevel("warn"); javastreamingcontext ssc = new javastreamingcontext(sc, durations.seconds(1)); collection<string> topicsset = new hashset<>(arrays.aslist(topics.split(","))); //kafka相关参数,必要!缺了会报错 map<string, object> kafkaparams = new hashmap<>(); kafkaparams.put("metadata.broker.list", brokers) ; kafkaparams.put("bootstrap.servers", brokers); kafkaparams.put("group.id", "group1"); kafkaparams.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); kafkaparams.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); kafkaparams.put("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); //topic分区 map<topicpartition, long> offsets = new hashmap<>(); offsets.put(new topicpartition("topic1", 0), 2l); //通过kafkautils.createdirectstream(...)获得kafka数据,kafka相关参数由kafkaparams指定 javainputdstream<consumerrecord<object,object>> lines = kafkautils.createdirectstream( ssc, locationstrategies.preferconsistent(), consumerstrategies.subscribe(topicsset, kafkaparams, offsets) ); //这里就跟之前的demo一样了,只是需要注意这边的lines里的参数本身是个consumerrecord对象 javapairdstream<string, integer> counts = lines.flatmap(x -> arrays.aslist(x.value().tostring().split(" ")).iterator()) .maptopair(x -> new tuple2<string, integer>(x, 1)) .reducebykey((x, y) -> x + y); counts.print();// 可以打印所有信息,看下consumerrecord的结构// lines.foreachrdd(rdd -> {// rdd.foreach(x -> {// system.out.println(x);// });// }); ssc.start(); ssc.awaittermination(); ssc.close(); }}
4.运行测试
这里使用上一篇kafka初探里写的producer类,put数据到kafka服务端,我这是master2节点上部署的kafka,本地测试跑spark2。
userkafkaproducer producerthread = new userkafkaproducer(kafkaproperties.topic);producerthread.start();
再运行3里的sparksteamingkafka类,可以看到已经成功。
以上就是java8下spark-streaming结合kafka编程(spark 2.0 & kafka 0.10的详细内容。