您好,欢迎访问一九零五行业门户网

基于C#+Thrift操作HBase实践

在基于hbase数据库的开发中,对应java语言来说,可以直接使用hbase的原生api来操作hbase表数据,当然你要是不嫌麻烦可以使用thrift客户端java api,这里有我曾经使用过的 hbase thrift客户端java api实践,可以参考。对于具有其他编程语言背景的开发人员,为
在基于hbase数据库的开发中,对应java语言来说,可以直接使用hbase的原生api来操作hbase表数据,当然你要是不嫌麻烦可以使用thrift客户端java api,这里有我曾经使用过的 hbase thrift客户端java api实践,可以参考。对于具有其他编程语言背景的开发人员,为了获取hbase带来的好处,那么就可以选择使用hbase thrift客户端对应编程语言的api,来实现与hbase的交互。
这里,我们使用c#客户端来操作hbase。hbase的thrift接口的定义,可以通过链接http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/resources/org/apache/hadoop/hbase/thrift/hbase.thrift?view=markup看到,我们需要安装thrift编译器,才能生成hbase跨语言的api,这里,我使用的版本是0.9.0。需要注意的是,一定要保证,安装了某个版本thrift的thrift编译器,在导入对应语言库的时候,版本一定要统一,否则就会出现各种各样的问题,因为不同thrift版本,对应编程语言的库api可能有变化。
首先,下载上面链接的内容,保存为hbase.thrift。
然后,执行如下命令,生成c#编程语言的hbase thrift客户端api:
[hadoop@master hbase]$ thrift --gen csharp hbase.thrift[hadoop@master hbase]$ lsgen-csharp
这里,我们基于c#语言,使用hbase 的thrift 客户端api访问hbase表。事实上,如果使用java来实现对hbase表的操作,最好是使用hbase的原生api,无论从性能还是便利性方面,都会提供更好的体验。使用thrift api访问,实际也是在hbase api之上进行了一层封装,可能初次使用thrift api感觉很别扭,有时候还要参考thrift服务端的实现代码。
准备工作如下:
下载thrift软件包,解压缩后,拷贝thrift-0.9.0/lib/java/src下面的代码到工作区(开发工具中)将上面生成的gen-csharp目录中代码拷贝到工作区保证hbase集群正常运行,接着启动hbase的thrift服务,执行如下命令:bin/hbase thrift -b master -p 9090 start
上面,hbase的thrift服务端口为9090,下面通过thrift api访问的时候,需要用到,而不是hbase的服务端口(默认60000)。
接着,实现一个简单的例子,访问hbase表。
首先,我们通过hbase shell创建一个表:
create 'test_info', 'info'
表名为test_info,列簇名称为info。
然后,我们开始基于上面生成的thrift代码来实现对hbase表的操作。
这里,我们实际上是对hbase thrift客户端java api实践中的java代码进行了翻译,改写成c#语言的相关操作。我们在客户端,进行了一层抽象,更加便于传递各种参数,抽象类为abstracthbasethriftservice,对应的命名空间为hbasethrift.hbase.thrift,该类实现代码如下所示:
using system;using system.collections.generic;using system.linq;using system.text;using system.threading.tasks;using thrift.transport;using thrift.protocol;namespace hbasethrift.hbase.thrift{ public abstract class abstracthbasethriftservice { protected static readonly string charset = utf-8; private string host = localhost; private int port = 9090; private readonly ttransport transport; protected readonly hbase.client client; public abstracthbasethriftservice() : this(localhost, 9090) { } public abstracthbasethriftservice(string host, int port) { this.host = host; this.port = port; transport = new tsocket(host, port); tprotocol protocol = new tbinaryprotocol(transport, true, true); client = new hbase.client(protocol); } public void open() { if (transport != null) { transport.open(); } } public void close() { if (transport != null) { transport.close(); } } public abstract list gettables(); public abstract void update(string table, string rowkey, bool writetowal, string fieldname, string fieldvalue, dictionary attributes); public abstract void update(string table, string rowkey, bool writetowal, dictionary fieldnamevalues, dictionary attributes); public abstract void deletecell(string table, string rowkey, bool writetowal, string column, dictionary attributes); public abstract void deletecells(string table, string rowkey, bool writetowal, list columns, dictionary attributes); public abstract void deleterow(string table, string rowkey, dictionary attributes); public abstract int scanneropen(string table, string startrow, list columns, dictionary attributes); public abstract int scanneropen(string table, string startrow, string stoprow, list columns, dictionary attributes); public abstract int scanneropenwithprefix(string table, string startandprefix, list columns, dictionary attributes); public abstract int scanneropents(string table, string startrow, list columns, long timestamp, dictionary attributes); public abstract int scanneropents(string table, string startrow, string stoprow, list columns, long timestamp, dictionary attributes); public abstract list scannergetlist(int id, int nbrows); public abstract list scannerget(int id); public abstract list getrow(string table, string row, dictionary attributes); public abstract list getrows(string table, list rows, dictionary attributes); public abstract list getrowswithcolumns(string table, list rows, list columns, dictionary attributes); public abstract void scannerclose(int id); /** * iterate result rows(just for test purpose) * @param result */ public abstract void iterateresults(trowresult result); }}
这里,简单叙述一下,我们提供的客户端api的基本功能:
建立到thrift服务的连接:open()获取到hbase中的所有表名:gettables()更新hbase表记录:update()删除hbase表中一行的记录的数据(cell):deletecell()和delecells()删除hbase表中一行记录:deleterow()打开一个scanner,返回id:scanneropen()、scanneropenwithprefix()和scanneropents();然后用返回的id迭代记录:scannergetlist()和scannerget()获取一行记录结果:getrow()、getrows()和getrowswithcolumns()关闭一个scanner:scannerclose()迭代结果,用于调试:iterateresults()比如,我们想要实现分页的逻辑,可能和传统的关系型数据库操作有些不同。基于hbase表的实现是,首先打开一个scanner实例(例如调用scanneropen()),返回一个id,然后再使用该id,调用scannergetlist()方法(可以指定每次返回几条记录的变量nbrows的值),返回一个记录列表,反复调用该scannergetlist()方法,直到此次没有结果返回为止。后面会通过测试用例来实际体会。
现在,我们基于上抽象出来的客户端操作接口,给出一个基本的实现,代码如下所示:
using system;using system.collections.generic;using system.linq;using system.text;using system.threading.tasks;namespace hbasethrift.hbase.thrift{ class hbasethriftclient : abstracthbasethriftservice { public hbasethriftclient() : this(localhost, 9090) { } public hbasethriftclient(string host, int port) : base(host, port) { } public override list gettables() { list tables = client.gettablenames(); list list = new list(); foreach(byte[] table in tables) { list.add(decode(table)); } return list; } public override void update(string table, string rowkey, bool writetowal, string fieldname, string fieldvalue, dictionary attributes) { byte[] tablename = encode(table); byte[] row = encode(rowkey); dictionary encodedattributes = encodeattributes(attributes); list mutations = new list(); mutation mutation = new mutation(); mutation.isdelete = false; mutation.writetowal = writetowal; mutation.column = encode(fieldname); mutation.value = encode(fieldvalue); mutations.add(mutation); client.mutaterow(tablename, row, mutations, encodedattributes); } public override void update(string table, string rowkey, bool writetowal, dictionary fieldnamevalues, dictionary attributes) { byte[] tablename = encode(table); byte[] row = encode(rowkey); dictionary encodedattributes = encodeattributes(attributes); list mutations = new list(); foreach (keyvaluepair pair in fieldnamevalues) { mutation mutation = new mutation(); mutation.isdelete = false; mutation.writetowal = writetowal; mutation.column = encode(pair.key); mutation.value = encode(pair.value); mutations.add(mutation); } client.mutaterow(tablename, row, mutations, encodedattributes); } public override void deletecell(string table, string rowkey, bool writetowal, string column, dictionary attributes) { byte[] tablename = encode(table); byte[] row = encode(rowkey); dictionary encodedattributes = encodeattributes(attributes); list mutations = new list(); mutation mutation = new mutation(); mutation.isdelete = true; mutation.writetowal = writetowal; mutation.column = encode(column); mutations.add(mutation); client.mutaterow(tablename, row, mutations, encodedattributes); } public override void deletecells(string table, string rowkey, bool writetowal, list columns, dictionary attributes) { byte[] tablename = encode(table); byte[] row = encode(rowkey); dictionary encodedattributes = encodeattributes(attributes); list mutations = new list(); foreach (string column in columns) { mutation mutation = new mutation(); mutation.isdelete = true; mutation.writetowal = writetowal; mutation.column = encode(column); mutations.add(mutation); } client.mutaterow(tablename, row, mutations, encodedattributes); } public override void deleterow(string table, string rowkey, dictionary attributes) { byte[] tablename = encode(table); byte[] row = encode(rowkey); dictionary encodedattributes = encodeattributes(attributes); client.deleteallrow(tablename, row, encodedattributes); } public override int scanneropen(string table, string startrow, list columns, dictionary attributes) { byte[] tablename = encode(table); byte[] start = encode(startrow); list encodedcolumns = encodestringlist(columns); dictionary encodedattributes = encodeattributes(attributes); return client.scanneropen(tablename, start, encodedcolumns, encodedattributes); } public override int scanneropen(string table, string startrow, string stoprow, list columns, dictionary attributes) { byte[] tablename = encode(table); byte[] start = encode(startrow); byte[] stop = encode(stoprow); list encodedcolumns = encodestringlist(columns); dictionary encodedattributes = encodeattributes(attributes); return client.scanneropenwithstop(tablename, start, stop, encodedcolumns, encodedattributes); } public override int scanneropenwithprefix(string table, string startandprefix, list columns, dictionary attributes) { byte[] tablename = encode(table); byte[] prefix = encode(startandprefix); list encodedcolumns = encodestringlist(columns); dictionary encodedattributes = encodeattributes(attributes); return client.scanneropenwithprefix(tablename, prefix, encodedcolumns, encodedattributes); } public override int scanneropents(string table, string startrow, list columns, long timestamp, dictionary attributes) { byte[] tablename = encode(table); byte[] start = encode(startrow); list encodedcolumns = encodestringlist(columns); dictionary encodedattributes = encodeattributes(attributes); return client.scanneropents(tablename, start, encodedcolumns, timestamp, encodedattributes); } public override int scanneropents(string table, string startrow, string stoprow, list columns, long timestamp, dictionary attributes) { byte[] tablename = encode(table); byte[] start = encode(startrow); byte[] stop = encode(stoprow); list encodedcolumns = encodestringlist(columns); dictionary encodedattributes = encodeattributes(attributes); return client.scanneropenwithstopts(tablename, start, stop, encodedcolumns, timestamp, encodedattributes); } public override list scannergetlist(int id, int nbrows) { return client.scannergetlist(id, nbrows); } public override list scannerget(int id) { return client.scannerget(id); } public override list getrow(string table, string row, dictionary attributes) { byte[] tablename = encode(table); byte[] startrow = encode(row); dictionary encodedattributes = encodeattributes(attributes); return client.getrow(tablename, startrow, encodedattributes); } public override list getrows(string table, list rows, dictionary attributes) { byte[] tablename = encode(table); list encodedrows = encodestringlist(rows); dictionary encodedattributes = encodeattributes(attributes); return client.getrows(tablename, encodedrows, encodedattributes); } public override list getrowswithcolumns(string table, list rows, list columns, dictionary attributes) { byte[] tablename = encode(table); list encodedrows = encodestringlist(rows); list encodedcolumns = encodestringlist(columns); dictionary encodedattributes = encodeattributes(attributes); return client.getrowswithcolumns(tablename, encodedrows, encodedcolumns, encodedattributes); } public override void scannerclose(int id) { client.scannerclose(id); } public override void iterateresults(trowresult result) { foreach (keyvaluepair pair in result.columns) { console.writeline(\tcol= + decode(pair.key) + , value= + decode(pair.value.value)); } } private string decode(byte[] bs) { return utf8encoding.default.getstring(bs); } private byte[] encode(string str) { return utf8encoding.default.getbytes(str); } private dictionary encodeattributes(dictionary attributes) { dictionary encodedattributes = new dictionary(); foreach (keyvaluepair pair in attributes) { encodedattributes.add(encode(pair.key), encode(pair.value)); } return encodedattributes; } private list encodestringlist(list strings) { list list = new list(); if (strings != null) { foreach (string str in strings) { list.add(encode(str)); } } return list; } }}
上面代码,给出了基本的实现,接着我们给出测试用例,调用我们实现的客户端操作,与hbase表进行交互。实现的测试用例类如下所示:
using system;using system.collections.generic;using system.linq;using system.text;using system.threading.tasks;namespace hbasethrift.hbase.thrift{ class test { private readonly abstracthbasethriftservice client; public test(string host, int port) { client = new hbasethriftclient(host, port); } public test() : this(master, 9090) { } static string randomlybirthday() { random r = new random(); int year = 1900 + r.next(100); int month = 1 + r.next(12); int date = 1 + r.next(30); return year + - + month.tostring().padleft(2, '0') + - + date.tostring().padleft(2, '0'); } static string randomlygender() { random r = new random(); int flag = r.next(2); return flag == 0 ? m : f; } static string randomlyusertype() { random r = new random(); int flag = 1 + r.next(10); return flag.tostring(); } public void close() { client.close(); } public void caseforupdate() { bool writetowal = false; dictionary attributes = new dictionary(0); string table = settable(); // put kv pairs for (int i = 0; i 上面的测试可以实现操作hbase表数据。另外,在生成的thrift客户端代码中,iface中给出了全部的服务接口,可以根据需要来选择,客户端client实现了与thrift交互的一些逻辑的处理,通过该类对象可以代理hbase提供的thrift服务。
参考链接

http://wiki.apache.org/hadoop/hbase/thriftapihttp://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/resources/org/apache/hadoop/hbase/thrift/hbase.thrift?view=markuphttp://www.cnblogs.com/panfeng412/archive/2012/11/11/hbase-thrift-api-common-issues-summary.htmlhttps://github.com/simplegeo/hadoop-hbase/blob/master/src/examples/thrift/democlient.javahttp://thrift.apache.org/tutorial/java/ 原文地址:基于c#+thrift操作hbase实践, 感谢原作者分享。
其它类似信息

推荐信息