您好,欢迎访问一九零五行业门户网

MapReduce2.0处理机制

mapreduce(分布式计算模型)作为hadoop家族一重要的家庭成员主要用于搜素领域,海量数据计算等问题。 内部模型采用分而治之的思想。mapreduce分为两部分(map和reduce)。其中shuffler是对reduce的预处理。 map和reduce的数据处理方式均采取键对的方式:即 [k1
                      mapreduce(分布式计算模型)作为hadoop家族一重要的家庭成员主要用于搜素领域,海量数据计算等问题。
                 内部模型采用分而治之的思想。mapreduce分为两部分(map和reduce)。其中shuffler是对reduce的预处理。
map和reduce的数据处理方式均采取键值对的方式:即  [k1,v1]->map->[k2,v2]->reduce->[k3,v3]。
mr执行流程
 (1).客户端提交一个mr的jar包给jobclient(提交方式:hadoop jar ...)
 (2).jobclient通过rpc和jobtracker进行通信,返回一个存放jar包的地址(hdfs)和jobid
 (3).client将jar包写入到hdfs当中(path = hdfs上的地址 + jobid)
 (4).开始提交任务(任务的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
 (5).jobtracker进行初始化任务
 (6).读取hdfs上的要处理的文件,开始计算输入分片,每一个分片对应一个mappertask
 (7).tasktracker通过心跳机制领取任务(任务的描述信息)
 (8).下载所需的jar,配置文件等
 (9).tasktracker启动一个java child子进程,用来执行具体的任务(mappertask或reducertask)
 (10).将结果写入到hdfs当中
在hadoop2.0以上版本中jobtracker取名为rm(resourcemanage)  tasttracker取名为nm(nodemanage)
mapreduce操作实现wordcount功能(即从文本中读取内容,计算出每个单词出现的次数)
程序分为3个类(自定义map方法功能实现,自定义reduce方法功能实现,最后类拼凑成mapreduce模式导成jar包,在hdfs分布式功能中实现)
1.wcmapper类(实现map)
import java.io.ioexception;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.mapper;
/*
 * 给wordcount写mapper
 * 定义mapper
 * keyin:k1的类型
 * valuein:v1的类型
 *
 * 重写map方法
 * hadoop没有使用jdk默认的序列化机制(long->longwriteable string->text)
 */
public class wcmapper extends mapper {
 @override
 protected void map(longwritable key, text value,
   mapper.context context)
   throws ioexception, interruptedexception {
  // todo auto-generated method stub
  // 接收信息v1
  string line = value.tostring();
  // 切分数据
  string[] words = line.split( );
  // 循环
  for (string w : words) {
   // 出现一次记一个1,输出
   // 构一个新的key,value
   context.write(new text(w), new longwritable(1));
  }
 }
}
2.wcreducer类实现reduce功能
import java.io.ioexception;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.reducer;
/*
 * keyin k2的类型
 * valuein v2的类型
 *
 * 重写reducer方法
 */
public class wcreducer extends reducer {
 @override
 protected void reduce(text k2, iterable v2s,
   reducer.context context)
   throws ioexception, interruptedexception {
  // 接收数据
  text k3 = k2;
  // 定义一个计数器
  long count = (long) 0;
  // 循环v2s
  for (longwritable i : v2s) {
   count += i.get();
  }
  // 输出
  context.write(k3, new longwritable(count));
 }
}
3.wordcount类。拼凑前两个类,符合mapreduce格式
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.lib.input.fileinputformat;
import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;
/*
 * mapreduce
 *
 * 组装自定义的map和reduce
 */
public class wordcount {
 public static void main(string[] args) throws exception {
  // job job=job.instance(new configuration()); //版本hadoop2
  job job = new job(new configuration()); // 版本hadoop1
  // 4.注意---将main方法中的类设进去
  job.setjarbyclass(wordcount.class);
  // 1.设置自定义mapper
  job.setmapperclass(wcmapper.class);
  job.setmapoutputkeyclass(text.class);
  job.setmapoutputvalueclass(longwritable.class);
  // 设置mapper读入的path(hdfs路径)
  fileinputformat.setinputpaths(job, new path(/words.txt));
  // 2.设置reduce
  job.setreducerclass(wcreducer.class);
  job.setoutputkeyclass(text.class);
  job.setoutputvalueclass(longwritable.class);
  fileoutputformat.setoutputpath(job, new path(/wcountresult));
  // 3.提交
  job.waitforcompletion(true); // 打印进度和详情
 }
}
其它类似信息

推荐信息