前言 hbase对mapreduce api进行了扩展,方便mapreduce任务读写htable数据。 hbase作为源的mapreduce读取示例 package hbase;import java.io.ioexception;import java.sql.connection;import java.sql.drivermanager;import java.sql.sqlexception;import jav
前言hbase对mapreduce api进行了扩展,方便mapreduce任务读写htable数据。
hbase作为源的mapreduce读取示例package hbase;import java.io.ioexception;import java.sql.connection;import java.sql.drivermanager;import java.sql.sqlexception;import java.sql.statement;import org.apache.hadoop.conf.configuration;import org.apache.hadoop.hbase.hbaseconfiguration;import org.apache.hadoop.hbase.client.result;import org.apache.hadoop.hbase.client.scan;import org.apache.hadoop.hbase.io.immutablebyteswritable;import org.apache.hadoop.hbase.mapreduce.tablemapreduceutil;import org.apache.hadoop.hbase.mapreduce.tablemapper;import org.apache.hadoop.hbase.mapreduce.tablereducer;import org.apache.hadoop.hbase.util.bytes;import org.apache.hadoop.io.text;import org.apache.hadoop.mapreduce.job;import org.apache.hadoop.mapreduce.lib.output.nulloutputformat;public class examplehbasetomysqlmapreduce { public static void main(string[] args) throws exception { //hbase配置 configuration config = hbaseconfiguration.create(); string tablename = flws; scan scan = new scan(); scan.setstartrow(bytes.tobytes(5768014)); scan.setstoprow(bytes.tobytes(5768888)); scan.addcolumn(bytes.tobytes(cf), bytes.tobytes(ah)); scan.setcaching(500); scan.setcacheblocks(false); //job定义 job job = new job(config, examplehbasemapreduce); job.setjarbyclass(examplehbasetomysqlmapreduce.class); //设置map读取hbase方法 tablemapreduceutil.inittablemapperjob(tablename, scan, mymapper.class, text.class,text.class, job); //reduce设置 job.setreducerclass(myreducer.class); job.setoutputformatclass(nulloutputformat.class); job.setnumreducetasks(5); boolean b = job.waitforcompletion(true); if (!b) { throw new exception(error with job!); } } public static class mymapper extends tablemapper { public void map(immutablebyteswritable row, result value, context context) throws ioexception, interruptedexception { context.write( new text(row.get()), new text(value.getvalue(bytes.tobytes(cf), bytes.tobytes(ah)))); } } public static class myreducer extends tablereducer { private connection conn = null; @override protected void cleanup(context context) throws ioexception, interruptedexception { try { conn.close(); } catch (sqlexception e) { e.printstacktrace(); } } @override protected void setup(context context) throws ioexception, interruptedexception { string driver = com.mysql.jdbc.driver; string url = jdbc:mysql://172.16.35.242/judgment?useunicode=true&characterencoding=gbk&zerodatetimebehavior=converttonull; try { class.forname(driver); } catch (classnotfoundexception e) { e.printstacktrace(); } try { conn = drivermanager.getconnection(url, root, root); } catch (sqlexception e) { e.printstacktrace(); } super.setup(context); } public void reduce(text key, iterable values, context context) throws ioexception, interruptedexception { stringbuffer sb = new stringbuffer(); for (text text : values) { sb.append(text.tostring()); } try { statement st = conn.createstatement(); st.executeupdate(insert into test_mapreduce (id,ah) values ( + integer.valueof(key.tostring()) + ,' + sb.tostring() + ')); } catch (sqlexception e) { e.printstacktrace(); } } }}
原文地址:mapreduce读取hbase汇总到rdbms, 感谢原作者分享。
