python使用thrift访问操作hbase - 孤独的皮蛋

时间:2024-04-15 17:51:23

python使用thrift访问操作hbase

1.看不同语言对hbase的thrift访问demo,可以参考examples/thrift/DemClient.*,有c++,java,php,pl,py,rb等多种语言 
2.使用python访问hbase需要借助thrift的帮助,首先通过 --gen生成python的hbase的thrift脚本 
使用命令:

cd $HBASE_HOME    
thrift --gen py /src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift  
这样在当前目录就生成了gen-py目录  
Hbase.py 中定义了一些HbaseClient可以使用的方法  
ttypes.py中定义了HbaseClient传输的数据类型  
  
将生成的hbase目录copy到python的包下  
cp -r hbase /usr/lib/python2.4/site-packages/  
3。启动hbase和thrift服务  
./bin/start-hbase.sh  
./bin/hbase-daemon.sh start thrift

  现在我们就可以用python来和hbase通信了 

#-*-coding:utf-8 -*-  
#!/usr/bin/python  
from thrift import Thrift  
from thrift.transport import TSocket  
from thrift.transport import TTransport  
from thrift.protocol import TBinaryProtocol  
from hbase import Hbase  
from hbase.ttypes import ColumnDescriptor,Mutation,BatchMutation  
  
class HbaseWriter:  
  
        """ 
                IP地址 
                端口 
                表名 
        """  
        def __init__(self,address,port,table=\'user\'):  
                self.tableName = table  
  
                #建立与hbase的连接  
                self.transport=TTransport.TBufferedTransport(TSocket.TSocket(address,port))  
  
                self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)  
  
                self.client=Hbase.Client(self.protocol)  
                self.transport.open()  
  
                tables = self.client.getTableNames()  
  
                if self.tableName not in tables:  
                        print "not in tables"  
                        self.__createTable()  
  
                self.write("hell,babay!!!")  
                self.read()  
  
        #关闭  
        def __del__(self):  
                self.transport.close()  
  
        #建表  
        def __createTable(self):  
                col1 = ColumnDescriptor(name="person:",maxVersions=1)  
                col2 = ColumnDescriptor(name="contents:",maxVersions=1)  
                col3 = ColumnDescriptor(name="info:",maxVersions=1)  
                self.client.createTable(self.tableName,[col1,col2,col3])  
  
  
        def write(self,content):  
                row="abc"  
                mutations=[Mutation(column="person:",value=content),Mutation(column="info:",value=content)]  
                self.client.mutateRow(self.tableName,row,mutations)  
  
        def read(self):  
                scannerId = self.client.scannerOpen(self.tableName,"",["contents:",])  
                while True:  
                        try:  
                                result = self.client.scannerGet(scannerId)  
                        except:  
                                break  
                        contents = result.columns["contents:"].value  
                        #print contents  
                self.client.scannerClose(scannerId)  
  
if __name__ == "__main__":  
        client = HbaseWriter("192.168.239.135","9090","person")  

  我们看下使用thrift生成的代码中都提供了那些方法 

 

提供的方法有:  
void enableTable(Bytes tableName)  
enable表  
void disableTable(Bytes tableName)  
disable表  
bool isTableEnabled(Bytes tableName)  
查看表状态  
void compact(Bytes tableNameOrRegionName)  
void majorCompact(Bytes tableNameOrRegionName)  
getTableNames()  
getColumnDescriptors(Text tableName)  
getTableRegions(Text tableName)  
void createTable(Text tableName, columnFamilies)  
void deleteTable(Text tableName)  
get(Text tableName, Text row, Text column)  
getVer(Text tableName, Text row, Text column, i32 numVersions)  
getVerTs(Text tableName, Text row, Text column, i64 timestamp, i32 numVersions)  
getRow(Text tableName, Text row)  
getRowWithColumns(Text tableName, Text row,  columns)  
getRowTs(Text tableName, Text row, i64 timestamp)  
getRowWithColumnsTs(Text tableName, Text row,  columns, i64 timestamp)  
getRows(Text tableName,  rows)  
getRowsWithColumns(Text tableName,  rows,  columns)  
getRowsTs(Text tableName,  rows, i64 timestamp)  
getRowsWithColumnsTs(Text tableName,  rows,  columns, i64 timestamp)  
void mutateRow(Text tableName, Text row,  mutations)  
void mutateRowTs(Text tableName, Text row,  mutations, i64 timestamp)  
void mutateRows(Text tableName,  rowBatches)  
void mutateRowsTs(Text tableName,  rowBatches, i64 timestamp)  
i64 atomicIncrement(Text tableName, Text row, Text column, i64 value)  
void deleteAll(Text tableName, Text row, Text column)  
void deleteAllTs(Text tableName, Text row, Text column, i64 timestamp)  
void deleteAllRow(Text tableName, Text row)  
void deleteAllRowTs(Text tableName, Text row, i64 timestamp)  
ScannerID scannerOpenWithScan(Text tableName, TScan scan)  
ScannerID scannerOpen(Text tableName, Text startRow,  columns)  
ScannerID scannerOpenWithStop(Text tableName, Text startRow, Text stopRow,  columns)  
ScannerID scannerOpenWithPrefix(Text tableName, Text startAndPrefix,  columns)  
ScannerID scannerOpenTs(Text tableName, Text startRow,  columns, i64 timestamp)  
ScannerID scannerOpenWithStopTs(Text tableName, Text startRow, Text stopRow,  columns, i64 timestamp)  
scannerGet(ScannerID id)  
scannerGetList(ScannerID id, i32 nbRows)  
void scannerClose(ScannerID id)