mapreduce(分布式计算模型)作为hadoop家族一重要的家庭成员主要用于搜素领域,海量数据计算等问题。 内部模型采用分而治之的思想。mapreduce分为两部分(map和reduce)。其中shuffler是对reduce的预处理。 map和reduce的数据处理方式均采取键对的方式:即 [k1
mapreduce(分布式计算模型)作为hadoop家族一重要的家庭成员主要用于搜素领域,海量数据计算等问题。
内部模型采用分而治之的思想。mapreduce分为两部分(map和reduce)。其中shuffler是对reduce的预处理。
map和reduce的数据处理方式均采取键值对的方式:即 [k1,v1]->map->[k2,v2]->reduce->[k3,v3]。
mr执行流程
(1).客户端提交一个mr的jar包给jobclient(提交方式:hadoop jar ...)
(2).jobclient通过rpc和jobtracker进行通信,返回一个存放jar包的地址(hdfs)和jobid
(3).client将jar包写入到hdfs当中(path = hdfs上的地址 + jobid)
(4).开始提交任务(任务的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
(5).jobtracker进行初始化任务
(6).读取hdfs上的要处理的文件,开始计算输入分片,每一个分片对应一个mappertask
(7).tasktracker通过心跳机制领取任务(任务的描述信息)
(8).下载所需的jar,配置文件等
(9).tasktracker启动一个java child子进程,用来执行具体的任务(mappertask或reducertask)
(10).将结果写入到hdfs当中
在hadoop2.0以上版本中jobtracker取名为rm(resourcemanage) tasttracker取名为nm(nodemanage)
mapreduce操作实现wordcount功能(即从文本中读取内容,计算出每个单词出现的次数)
程序分为3个类(自定义map方法功能实现,自定义reduce方法功能实现,最后类拼凑成mapreduce模式导成jar包,在hdfs分布式功能中实现)
1.wcmapper类(实现map)
import java.io.ioexception;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.mapper;
/*
* 给wordcount写mapper
* 定义mapper
* keyin:k1的类型
* valuein:v1的类型
*
* 重写map方法
* hadoop没有使用jdk默认的序列化机制(long->longwriteable string->text)
*/
public class wcmapper extends mapper {
@override
protected void map(longwritable key, text value,
mapper.context context)
throws ioexception, interruptedexception {
// todo auto-generated method stub
// 接收信息v1
string line = value.tostring();
// 切分数据
string[] words = line.split( );
// 循环
for (string w : words) {
// 出现一次记一个1,输出
// 构一个新的key,value
context.write(new text(w), new longwritable(1));
}
}
}
2.wcreducer类实现reduce功能
import java.io.ioexception;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.reducer;
/*
* keyin k2的类型
* valuein v2的类型
*
* 重写reducer方法
*/
public class wcreducer extends reducer {
@override
protected void reduce(text k2, iterable v2s,
reducer.context context)
throws ioexception, interruptedexception {
// 接收数据
text k3 = k2;
// 定义一个计数器
long count = (long) 0;
// 循环v2s
for (longwritable i : v2s) {
count += i.get();
}
// 输出
context.write(k3, new longwritable(count));
}
}
3.wordcount类。拼凑前两个类,符合mapreduce格式
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.lib.input.fileinputformat;
import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;
/*
* mapreduce
*
* 组装自定义的map和reduce
*/
public class wordcount {
public static void main(string[] args) throws exception {
// job job=job.instance(new configuration()); //版本hadoop2
job job = new job(new configuration()); // 版本hadoop1
// 4.注意---将main方法中的类设进去
job.setjarbyclass(wordcount.class);
// 1.设置自定义mapper
job.setmapperclass(wcmapper.class);
job.setmapoutputkeyclass(text.class);
job.setmapoutputvalueclass(longwritable.class);
// 设置mapper读入的path(hdfs路径)
fileinputformat.setinputpaths(job, new path(/words.txt));
// 2.设置reduce
job.setreducerclass(wcreducer.class);
job.setoutputkeyclass(text.class);
job.setoutputvalueclass(longwritable.class);
fileoutputformat.setoutputpath(job, new path(/wcountresult));
// 3.提交
job.waitforcompletion(true); // 打印进度和详情
}
}