您好,欢迎访问一九零五行业门户网

Hadoop Pig Loadfunc

hadoop pig 设计的还是很不错的,可以写 udf 每一个统计基本上都是要对原始日志进行切分,把想要的一些字段 extract 提取出来 日志有着基本的模式 mac:50:a4:c8:d7:10:7d|build:5141bc99|network:mobile|version:2.4.1|id:taobao22935952431| 基本
hadoop pig 设计的还是很不错的,可以写 udf
每一个统计基本上都是要对原始日志进行切分,把想要的一些字段 extract 提取出来
日志有着基本的模式
mac:50:a4:c8:d7:10:7d|build:5141bc99|network:mobile|version:2.4.1|id:taobao22935952431|
基本上是 key, value对,自定义一个 load function ,指定 key,就可以获取 对应的value,在 pig 中可以使用
register /jar/kload.jar;aa = load '/log/load.log' using kload.koudailoader('mac,build') as (mac,build);dump aa;
输出结果
(50:a4:c8:d7:10:7d,5141bc99)
koudailoader是自己实现的一个 load function,输出为要获取的key,输出为key所对应的 value
package kload; import java.io.ioexception; import java.util.*; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.pig.*; import org.apache.pig.backend.executionengine.execexception; import org.apache.pig.backend.hadoop.executionengine.mapreducelayer.*; import org.apache.pig.data.*; public class koudailoader extends loadfunc{ protected recordreader recordreader = null; private string fielddel = ; private string[] reqfildlist; private arraylist mprototuple = null; private tuplefactory mtuplefactory = tuplefactory.getinstance(); private static final int buffer_size = 1024; public koudailoader() { } public koudailoader(string delimiter) { this(); if(delimiter == null || delimiter.length() == 0){ throw new runtimeexception(empty delimiter); } this.reqfildlist=delimiter.split(,); } @override public tuple getnext() throws ioexception { try { map tmpmap = new hashmap(); list lst = new arraylist(); boolean flag = recordreader.nextkeyvalue(); int i = 0; if (!flag) { return null; } text value = (text) recordreader.getcurrentvalue(); tmpmap = this.sourcetomap(value.tostring()); if( tmpmap == null || tmpmap.size() == 0 ){ return null; } for (string s :this.reqfildlist){ string item = tmpmap.get(s); if(item == null || item.length() == 0){ item = ; } lst.add(i++, item); } return tuplefactory.getinstance().newtuple(lst); } catch (interruptedexception e) { throw new execexception(read data error, pigexception.remote_environment, e); } } public map sourcetomap(string pline){ string line = pline; int strlen = 0; string[] strarr; string[] strsubarr; map maplog = new hashmap(); if(pline == null || pline.length()
编译
javac -cp /usr/local/webserver/pig/pig-0.9.2.jar:. koudailoader.java
打成jar包
jar -cf kload.jar kload
用pig在本地模式下运行
java -cp /usr/local/webserver/pig/pig-0.9.2.jar:/jar/kload.jar org.apache.pig.main -x local kload.pig
原文地址:hadoop pig loadfunc, 感谢原作者分享。
其它类似信息

推荐信息