import java.io.datainput;import java.io.dataoutput;import java.io.ioexception;import java.sql.preparedstatement;import java.sql.resultset;import java.sql.sqlexception;import java.text.parseexception;import java.text.simpledateformat;import java.util.date;import java.util.iterator;import org.apache.hadoop.conf.configuration;import org.apache.hadoop.fs.path;import org.apache.hadoop.io.intwritable;import org.apache.hadoop.io.longwritable;import org.apache.hadoop.io.text;import org.apache.hadoop.io.writable;import org.apache.hadoop.mapreduce.job;import org.apache.hadoop.mapreduce.mapper;import org.apache.hadoop.mapreduce.reducer;import org.apache.hadoop.mapreduce.lib.db.dbconfiguration;import org.apache.hadoop.mapreduce.lib.db.dboutputformat;import org.apache.hadoop.mapreduce.lib.db.dbwritable;import org.apache.hadoop.mapreduce.lib.input.fileinputformat;import org.apache.hadoop.mapreduce.lib.input.textinputformat;/** * 将mapreduce的结果数据写入mysql中 * * @author asheng */public class writedatatomysql { /** * 重写dbwritable * * @author asheng tblswritable需要向mysql中写入数据 */ public static class tblswritable implements writable, dbwritable { string tbl_name; string tbl_type; public tblswritable() { } public tblswritable(string tbl_name, string tab_type) { this.tbl_name = tbl_name; this.tbl_type = tab_type; } @override public void write(preparedstatement statement) throws sqlexception { statement.setstring(1, this.tbl_name); statement.setstring(2, this.tbl_type); } @override public void readfields(resultset resultset) throws sqlexception { this.tbl_name = resultset.getstring(1); this.tbl_type = resultset.getstring(2); } @override public void write(dataoutput out) throws ioexception { out.writeutf(this.tbl_name); out.writeutf(this.tbl_type); } @override public void readfields(datainput in) throws ioexception { this.tbl_name = in.readutf(); this.tbl_type = in.readutf(); } public string tostring() { return new string(this.tbl_name + + this.tbl_type); } } public static class connmysqlmapper extends mapper // tblsrecord是自定义的类型,也就是上面重写的dbwritable类 { enum counter { lineskip, } private final static intwritable one = new intwritable(1); public void map(longwritable key, text value, context context) throws ioexception, interruptedexception { try { string line = value.tostring(); string[] strings = line.split(/t); string inittime = strings[1]; string devtype = strings[4]; if (inittime.length() == 19) { simpledateformat sdf = new simpledateformat( yyyy-mm-dd hh:mm:ss); date date = sdf.parse(inittime); context.write(new text(inittime.substring(0, 10)),one); } else { // system.err.println(inittime); context.getcounter(counter.lineskip).increment(1); } // } catch (arrayindexoutofboundsexception e) { } catch (arrayindexoutofboundsexception e) { context.getcounter(counter.lineskip).increment(1); return; } catch (parseexception e) { context.getcounter(counter.lineskip).increment(1); return; } } } public static class connmysqlreducer extends reducer { public void reduce(text key, iterable values, context context) throws ioexception, interruptedexception { int count = 0; for (iterator itr = values.iterator(); itr.hasnext(); itr .next()) { count++; } context.write( new tblswritable(key.tostring(), string.valueof(count)), null); } } public static void main(string args[]) throws ioexception, interruptedexception, classnotfoundexception { configuration conf = new configuration(); dbconfiguration.configuredb(conf, com.mysql.jdbc.driver, jdbc:mysql://127.0.0.1:3306/xingxuntong, hadoop, 123456); job job = new job(conf, test mysql connection); job.setjarbyclass(writedatatomysql.class); job.setmapperclass(connmysqlmapper.class); job.setreducerclass(connmysqlreducer.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); job.setinputformatclass(textinputformat.class); job.setoutputformatclass(dboutputformat.class); fileinputformat.addinputpath(job, new path(args[0])); dboutputformat.setoutput(job, test, inittime, new_user_total); system.exit(job.waitforcompletion(true) ? 0 : 1); }}
之所以写入mysql是因为我们平时处理的tb级log文件处理结果却很小,写入关系数据库使查询和使用非常便利