如果要统对hbase中的数据,进行某种统计,比如统计某个字段最大值,统计满足某种条件的记录数,统计各种记录特点,并按照记录特点分类(类似于sql的group by)~ 常规的做法就是把hbase中整个表的数据scan出来,或者稍微环保一点,加一个filter,进行一些初步
如果要统对hbase中的数据,进行某种统计,比如统计某个字段最大值,统计满足某种条件的记录数,统计各种记录特点,并按照记录特点分类(类似于sql的group by)~
常规的做法就是把hbase中整个表的数据scan出来,或者稍微环保一点,加一个filter,进行一些初步的过滤(对于rowcounter来说,就加了firstkeyonlyfilter),但是这么做来说还是会有很大的副作用,比如占用大量的网络带宽(当标级别到达千万级别,亿级别之后)尤为明显,rpc的量也是不容小觑的。
理想的方式应该是怎样?拿row counter这个简单例子来说,我要统计总行数,如果每个region 告诉我他又多少行,然后把结果告诉我,我再将他们的结果汇总一下,不就行了么?
现在的问题是hbase没有提供这种接口,来统计每个region的行数,那是否我们可以自己来实现一个呢?
没错,正如本文标题所说,我们可以自己来实现一个endpoint,然后让hbase加载起来,然后我们远程调用即可。
什么是endpoint?先弄清楚什么是hbase coprocessor
hbase有两种coprocessor,一种是observer(观察者),类似于关系数据库的trigger(触发器),另外一种就是endpoint,类似于关系数据库的存储过程。
观察者这里就多做介绍了,这里介绍endpoint。
endpoint是动态rpc插件的接口,它的实现代码被部署在服务器端(regionserver),从而能够通过hbase rpc调用。客户端类库提供了非常方便的方法来调用这些动态接口,它们可以在任意时候调用一个endpoint,它们的实现代码会被目标region远程执行,结果会返回到终端。用户可以结合使用这些强大的插件接口,为hbase添加全新的特性。
怎么实现一个endpoint1. 定义一个新的protocol接口,必须继承coprocessorprotocol.
2. 实现终端接口,继承抽象类baseendpointcoprocessor,改实现代码需要部署到
3. 在客户端,终端可以被两个新的hbase client api调用 。单个region:htableinterface.coprocessorproxy(class protocol, byte[] row) 。rigons区域:htableinterface.coprocessorexec(class protocol, byte[] startkey, byte[] endkey, batch.call callable),这里的region是通过一个row来标示的,就是说,改row落到那个region,rpc就发给哪个region,对于start-end的,[start,end)范围内的region都会受到rpc调用。
如图
public interface counterprotocol extends coprocessorprotocol { public long count(byte[] start, byte[] end) throws ioexception;}
public class counterendpoint extends baseendpointcoprocessor implements counterprotocol { @override public long count(byte[] start, byte []end) throws ioexception { // aggregate at each region scan scan = new scan(); long numrow = 0; internalscanner scanner = ((regioncoprocessorenvironment) getenvironment()).getregion() .getscanner(scan); try { list curvals = new arraylist(); boolean hasmore = false; do { curvals.clear(); hasmore = scanner.next(curvals); if (bytes.compareto(curvals.get(0).getrow(), start)= 0) { break; } numrow++; } while (hasmore); } finally { scanner.close(); } return numrow; }}
public class counterendpointdemo { public static void main(string[] args) throws ioexception, throwable { final string startrow = args[0]; final string endrow = args[1]; @suppresswarnings(resource) htableinterface table = new htable(hbaseconfiguration.create(), tc); map results; // scan: for all regions results = table.coprocessorexec(counterprotocol.class, startrow.getbytes(), endrow.getbytes(), new batch.call() { public long call(counterprotocol instance) throws ioexception { return instance.count(startrow.getbytes(), endrow.getbytes()); } }); long total = 0; for (map.entry e : results.entryset()) { system.out.println(e.getvalue()); total += e.getvalue(); } system.out.println(total: + total); }}
整个程序的框架其实又是另外一个mapreduce,只是运行在region server上面,reduce运行在客户端,其中map计算量较大,reduce计算量很小!
另外需要提醒的是:
protocol的返回类型,可以是基本类型。
如果是一个自定义的类型需要实现org.apache.hadoop.io.writable接口。
关于详细的支持类型,请参考代码hbase源码:org.apache.hadoop.hbase.io.hbaseobjectwritable
怎么部署?1. 通过hbase-site.xml增加
hbase.coprocessor.region.classes xxxx.counterendpoint
如果要配置多个,就用逗号(,)分割。包含此类的jar必须位于hbase的classpath这种coprocessor是作用于所有的表,如果你只想作用于部分表,请使用下面一种方式。2. 通过shell方式
增加:
hbase(main):005:0> alter 't1', method => 'table_att','coprocessor'=>'hdfs:///foo.jar|com.foo.fooregionobserver|1001|arg1=1,arg2=2'updating all regions with the new schema...1/1 regions updated.done.0 row(s) in 1.0730 seconds
coprocessor格式为:
[filepath]|classname|priority|arguments
arguments: k=v[,k=v]+
其中filepath是hdfs路径,例如/tmp/zhenhe/cp/zhenhe-1.0.jarclassnameendpoint实现类的全名priority为,整数,框架会根据这个数据决定多个cp的执行顺序arguments,传给cp的参数如果hbase的classpath包含改类,filepath可以留空卸载:
先describe “tablename‘,查看你要卸载的cp的编号然后alter 't1', method => 'table_att_unset', name=> 'coprocessor$3',coprocessor$3可变。应用场景这是一个最简单的例子,另外还有很多统计场景,可以用在这种方式实现,有如下好处:
节省网络带宽减少rpc调用(scan的调用随着cacheszie的变小而线性增加),减轻hbase压力可以提高统计效率,那我之前写过的一个groupby类型的例子来说,大约可以提高50%以上的统计速度。其他应用场景?
一个保存着用户信息的表,可以统计每个用户信息(counter job)统计最大值,最小值,平均值,参考:https://issues.apache.org/jira/browse/hbase-1512批量删除记录,批量删除某个时间戳的记录参考:
1. http://blogs.apache.org/hbase/entry/coprocessor_introduction
2. https://issues.apache.org/jira/browse/hbase-1512
原文地址:使用hbase endpoint(coprocessor)进行计算, 感谢原作者分享。