hbase默认不支持聚合函数(sum,avg等)。可利用hbase的coprocessor特性实现。这样做的好处是利用regionserver在服务端进行运算。效率高,避免客户端取回大量数据,占用网络带宽,消耗大量内存等。 实现方式: 利用hbase提供的endpoint类型的aggregateimpleme
hbase默认不支持聚合函数(sum,avg等)。可利用hbase的coprocessor特性实现。这样做的好处是利用regionserver在服务端进行运算。效率高,避免客户端取回大量数据,占用网络带宽,消耗大量内存等。
实现方式:
利用hbase提供的endpoint类型的aggregateimplementation coprocess,配合aggregationclient访问客户端实现regionserver端的集合计算。aggregationclient访问代码如下:
aggregationclient.avg(bytes. tobytes(tablename), ci, scan);
scan即为要计算列的查询条件。这里有一个columninterperter类型的参数ci。即列解释器,用于解析列中的值。hbase默认提供了longcolumninterpreter。而我要处理的值是double类型的,所以先实现了一个doublecolumninterpreter。(从jira上看doulbe类型的解释器好像正在开发中)。columninterpreter接口的实现会在aggregateimplementation
/*** double类型的列解释器实现* * @author onecoder*/public class doublecolumninterpreter implements columninterpreter { @override public void write(dataoutput out) throws ioexception { } @override public void readfields(datainput in) throws ioexception { } @override public double getvalue( byte[] colfamily, byte[] colqualifier, keyvalue kv) throws ioexception { if (kv == null) return null; // 临时解决方案,如果采用bytes.todouble(kv.getvalue())会报错,偏移量大于总长度。 // todouble(getbuffer(), getvalueoffset),偏移量也不对。 return double. valueof(new string(kv.getvalue())); } @override public double add(double l1, double l2) { if (l1 == null ^ l2 == null) { return l1 == null ? l2 : l1; } else if (l1 == null) { return null; } return l1 + l2; } @override public double getmaxvalue() { // todo auto-generated method stub return null; } @override public double getminvalue() { // todo auto-generated method stub return null; } @override public double multiply(double o1, double o2) { if (o1 == null ^ o2 == null) { return o1 == null ? o2 : o1; } else if (o1 == null) { return null; } return o1 * o2; } @override public double increment(double o) { // todo auto-generated method stub return null; } @override public double casttoreturntype(double o) { return o.doublevalue(); } @override public int compare(double l1, double l2) { if (l1 == null ^ l2 == null) { return l1 == null ? -1 : 1; // either of one is null. } else if (l1 == null) return 0; // both are null return l1.compareto(l2); // natural ordering. } @override public double divideforavg(double o, long l) { return (o == null || l == null) ? double. nan : (o.doublevalue() / l .doublevalue()); }}
导出jar包上传到hbase region节点的lib下。然后配置regionserver的coprocessor。在服务端hbase-site.xml中,增加
hbase.coprocessor.region.classes org.apache.hadoop.hbase.coprocessor.aggregateimplementation
重启服务,使配置和jar生效。然后调用aggregationclient中提供的avg, max等聚合函数,即可在region端计算出结果,返回。
原文地址:hbase 利用coprocessor实现聚合函数, 感谢原作者分享。