前面有说道spark-streaming的简单demo,也有说到kafka成功跑通的例子,这里就结合二者,也是常用的使用之一。
1.相关组件版本 
首先确认版本,因为跟之前的版本有些不一样,所以才有必要记录下,另外仍然没有使用scala,使用java8,spark 2.0.0,kafka 0.10。
2.引入maven包 
网上找了一些结合的例子,但是跟我当前版本不一样,所以根本就成功不了,所以探究了下,列出引入包。
<dependency>      <groupid>org.apache.spark</groupid>      <artifactid>spark-streaming-kafka-0-10_2.11</artifactid>      <version>2.0.0</version></dependency>
网上能找到的不带kafka版本号的包最新是1.6.3,我试过,已经无法在spark2下成功运行了,所以找到的是对应kafka0.10的版本,注意spark2.0的scala版本已经是2.11,所以包括之前必须后面跟2.11,表示scala版本。
3.sparksteamingkafka类 
需要注意的是引入的包路径是org.apache.spark.streaming.kafka010.xxx,所以这里把import也放进来了。其他直接看注释。
import java.util.arrays;import java.util.collection;import java.util.hashmap;import java.util.hashset;import java.util.map;import org.apache.kafka.clients.consumer.consumerrecord;import org.apache.kafka.common.topicpartition;import org.apache.spark.sparkconf;import org.apache.spark.api.java.javasparkcontext;import org.apache.spark.streaming.durations;import org.apache.spark.streaming.api.java.javainputdstream;import org.apache.spark.streaming.api.java.javapairdstream;import org.apache.spark.streaming.api.java.javastreamingcontext;import org.apache.spark.streaming.kafka010.consumerstrategies;import org.apache.spark.streaming.kafka010.kafkautils;import org.apache.spark.streaming.kafka010.locationstrategies;import scala.tuple2;public class sparksteamingkafka {    public static void main(string[] args) throws interruptedexception {        string brokers = "master2:6667";        string topics = "topic1";        sparkconf conf = new sparkconf().setmaster("local[2]").setappname("streaming word count");        javasparkcontext sc = new javasparkcontext(conf);        sc.setloglevel("warn");        javastreamingcontext ssc = new javastreamingcontext(sc, durations.seconds(1));        collection<string> topicsset = new hashset<>(arrays.aslist(topics.split(",")));        //kafka相关参数,必要!缺了会报错        map<string, object> kafkaparams = new hashmap<>();        kafkaparams.put("metadata.broker.list", brokers) ;        kafkaparams.put("bootstrap.servers", brokers);        kafkaparams.put("group.id", "group1");        kafkaparams.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");        kafkaparams.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");        kafkaparams.put("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");        //topic分区        map<topicpartition, long> offsets = new hashmap<>();        offsets.put(new topicpartition("topic1", 0), 2l);         //通过kafkautils.createdirectstream(...)获得kafka数据,kafka相关参数由kafkaparams指定        javainputdstream<consumerrecord<object,object>> lines = kafkautils.createdirectstream(                ssc,                locationstrategies.preferconsistent(),                consumerstrategies.subscribe(topicsset, kafkaparams, offsets)            );        //这里就跟之前的demo一样了,只是需要注意这边的lines里的参数本身是个consumerrecord对象        javapairdstream<string, integer> counts =                 lines.flatmap(x -> arrays.aslist(x.value().tostring().split(" ")).iterator())                .maptopair(x -> new tuple2<string, integer>(x, 1))                .reducebykey((x, y) -> x + y);          counts.print();//  可以打印所有信息,看下consumerrecord的结构//      lines.foreachrdd(rdd -> {//          rdd.foreach(x -> {//            system.out.println(x);//          });//        });        ssc.start();        ssc.awaittermination();        ssc.close();    }}
4.运行测试 
这里使用上一篇kafka初探里写的producer类,put数据到kafka服务端,我这是master2节点上部署的kafka,本地测试跑spark2。
userkafkaproducer producerthread = new userkafkaproducer(kafkaproperties.topic);producerthread.start();
再运行3里的sparksteamingkafka类,可以看到已经成功。
以上就是java8下spark-streaming结合kafka编程(spark 2.0 & kafka 0.10的详细内容。
   
 
   