mongodb数据迁移到hbase

时间:2022-01-08 08:57:31

mongodb数据迁移到hbase

  • 导入包
# encoding: utf-8
'''
@author: zcc
@license: (C) Copyright 2013-2017, Node Supply Chain Manager Corporation Limited.
@software: pycharm
@file: ggsn_to_hbase.py
@time: 9/1/17 2:43 PM
@desc:
'''
from thrift.transport import TSocket, TTransport
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation, TRegionInfo
from hbase.ttypes import IOError, AlreadyExists
from hbase import Hbase
from hbase.ttypes import *
  • 操作hbase的类
import struct
def encode(n):
return struct.pack("i", n) class HbaseControl(object):
def __init__(self, table, col_name, host='192.168.1.10', port=9090):
self.table = table
self.host = host
self.port = port # Connect to HBase Thrift server
self.transport = TTransport.TBufferedTransport(TSocket.TSocket(host, port))
self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport) # Create and open the client connection
self.client = Hbase.Client(self.protocol)
self.transport.open() # set type and field of column families
self.set_column_families(col_name)
self._build_column_families() def set_column_families(self, col_list=['name', 'sex', 'age']):
'''
设置每列名称和属性
:param self:
:param type_list:
:param col_list:
:return:
'''
self.columnFamilies = col_list def _build_column_families(self):
'''
如果hbase中没有当前表,则建立
:param self:
:return:
'''
tables = self.client.getTableNames()
if self.table not in tables:
self.__create_table(self.table) def __create_table(self, table):
'''
在hbase中建表
:param self:
:param table:
:return:
'''
columnFamilies = []
for columnFamily in self.columnFamilies:
name = Hbase.ColumnDescriptor(name=columnFamily)
columnFamilies.append(name)
self.client.createTable(table, columnFamilies) def del_row(self, row_key):
'''
删除行
:param row_key:
:return:
'''
self.client.deleteAllRow(self.table, row_key, {}) def __del__(self):
'''
销毁对象前关闭hbase链接
:return:
'''
self.transport.close() def _del_table(self, table):
'''
删除hbase中的表
:param table:
:return:
'''
self.client.disableTable(table)
self.client.deleteTable(table) def getColumnDescriptors(self):
'''
获取hbase表的列簇描述
:return:
'''
return self.client.getColumnDescriptors(self.table) def put(self, record, day):
'''
向hbase中插入一条记录
:param record:
:return:
'''
assert isinstance(record, dict)
mutations = []
# tel和日期构成hbase内的行名
row_key = '{0}_{1}'.format(record['tel'], day)
# 插入tel
mutations.append(Hbase.Mutation(column='baseinfo:tel', value=str(record['tel'])))
# 插入day
mutations.append(Hbase.Mutation(column='baseinfo:day', value=str(day)))
# 插入suminfo
mutations.append(Hbase.Mutation(column='suminfo:context', value=str(record['suminfo'])))
self.client.mutateRow(self.table, row_key, mutations, {}) def puts(self, records, day):
'''
hbase批量插入
:param records:
:param day:
:return:
'''
assert isinstance(records, list) mutationsBatch = []
for record in records:
mutations = []
# tel和日期构成hbase内的行名
row_key = '{0}_{1}'.format(record['tel'], day)
# 插入tel
mutations.append(Hbase.Mutation(column='baseinfo:tel', value=str(record['tel'])))
# 插入day
mutations.append(Hbase.Mutation(column='baseinfo:day', value=str(day)))
# 插入suminfo
mutations.append(Hbase.Mutation(column='suminfo:context', value=str(record['suminfo']))) mutationsBatch.append(Hbase.BatchMutation(row=row_key, mutations=mutations))
self.client.mutateRows(self.table, mutationsBatch, {})
  • 操作mongodb且到将数据导入到hbase的类
from pymongo import MongoClient
class MongDBControl(object):
def __init__(self, table_name, host='192.168.1.20', port=27017):
self.client = MongoClient(host, port)
db = self.client.table
self.collect = db[table_name]
self.table = table_name def __del__(self):
self.client.close() def record_to_hbase(self, hc):
assert isinstance(hc, HbaseControl) num = 0 while True:
records = self.collect.find().skip(1000*num).limit(1000)
if not records: break
hc.puts(list(records), self.table)
num += 1
print '已经从mongodb向hbase导入{0}条数据!!'.format(num*1000)
print '数据迁移完毕!!!'
  • 主函数
if __name__ == '__main__':
if 1:
hc = HbaseControl(table='table', col_name=['baseinfo', 'count', 'suminfo', 'nodebll', 'nodebzl'])
mc = MongDBControl('20170806')
# mc.record_to_hbase(hc)
hc._del_table('table')