python操作阿里云增强版hbase
一.安装# 步骤1:安装依赖pip install thriftpip install hbase-thrift。。。依赖# 步骤二:将链接上的hbase文件放到site-packages替换原有的hbase文件https://github.com/aliyun/aliyun-apsaradb-hbase-demo/tree/master/hbase/thrift2/python...
·
一.安装
# 步骤1:安装依赖
pip install thrift
pip install hbase-thrift
。。。依赖
# 步骤二:将链接上的hbase文件放到site-packages替换原有的hbase文件
https://github.com/aliyun/aliyun-apsaradb-hbase-demo/tree/master/hbase/thrift2/python
二.python脚本操作阿里云增强版hbase
# -*- coding: utf-8 -*-
# @Time : 2020/4/26 20:13
# @Author :
from thrift.protocol import TBinaryProtocol
from thrift.transport import THttpClient
from hbase import THBaseService
from hbase.ttypes import TColumnValue, TColumn, TTableName, TTableDescriptor, TColumnFamilyDescriptor, \
TNamespaceDescriptor, TGet, TPut, TScan
class HbaseHelper(object):
"""hbase读写封装"""
def __init__(self, url="http://ld-xxxxx-xxxx-xxxxxxs.xxx.com:9190",
accesskeyid="root", accesssignature="root"):
self.transport = THttpClient.THttpClient(url)
headers = {}
# 用户名
headers["ACCESSKEYID"] = accesskeyid
# 密码
headers["ACCESSSIGNATURE"] = accesssignature
self.transport.setCustomHeaders(headers)
self.protocol = TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)
self.client = THBaseService.Client(self.protocol)
def single_insert(self, namespace, column_family, family, **kwargs):
"""
单条插入
:param namespace:
:param column_family:
:param family:
:param kwargs: {"row_key":row_key,"data":{"k1":"v1","k2":"v2","k3":"v3"}}
:return:
"""
self.transport.open()
tableInbytes = "{}:{}".format(namespace, column_family).encode("utf-8")
row_key = kwargs.get("row_key", "")
if not row_key:
raise Exception("row_key not exit exception")
data = kwargs.get("data", {})
self.client.put(table=tableInbytes, tput=TPut(row="{}".format(row_key).encode("utf8"), columnValues=[
TColumnValue(family="{}".format(family).encode("utf8"), qualifier="{}".format(k).encode("utf8"),
value="{}".format(v).encode("utf8")) for k, v in data.items()]))
self.transport.close()
return True
def bulk_insert(self, namespace, column_family, family, kwargs_list):
"""
批量插入
:param namespace:
:param column_family:
:param family:
:param row_key:
:param kwargs_list:[{"row_key":row_key,"data":{"k1":"v1","k2":"v2","k3":"v3"}}]
:return:
"""
self.transport.open()
tableInbytes = "{}:{}".format(namespace, column_family).encode("utf-8")
puts = [TPut(row="{}".format(v.get("row_key")).encode("utf-8"), columnValues=[
TColumnValue(family="{}".format(family).encode("utf-8") , qualifier="{}".format( k).encode("utf-8"),
value="{}".format( v2).encode("utf-8")) for k, v2 in v.get('data',{}).items()]) for v in kwargs_list]
self.client.putMultiple(table=tableInbytes, tputs=puts)
self.transport.close()
def get_by_row_key(self, namespace, column_family, row_key):
"""
:param namespace:
:param column_family:
:param row_key:
:return:
"""
self.transport.open()
tableInbytes = "{}:{}".format(namespace, column_family).encode("utf-8")
result = self.client.get(tableInbytes, TGet(row="{}".format(row_key).encode("utf-8")))
self.transport.close()
return result
def scan(self, namespace, column_family, start_row_key, stop_row_key, caching=2):
"""
:param namespace:
:param column_family:
:param start_row_key:
:param stop_row_key:
:param caching:
# # caching的大小为每次从服务器返回的行数,设置太大会导致服务器处理过久,太小会导致范围扫描与服务器做过多交互
# # 根据每行的大小,caching的值一般设置为10到100之间
:return:
"""
self.transport.open()
tableInbytes = "{}:{}".format(namespace, column_family).encode("utf-8")
startRow = "{}".format(start_row_key).encode("utf-8")
stopRow = "{}".format(stop_row_key).encode("utf-8")
scan = TScan(startRow=startRow, stopRow=stopRow)
# # 扫描的结果
results = []
# # 此函数可以找到比当前row大的最小row,方法是在当前row后加入一个0x00的byte
# # 从比当前row大的最小row开始scan,可以保证中间不会漏扫描数据
def createClosestRowAfter(row):
array = bytearray(row)
array.append(0x00)
return bytes(array)
while True:
lastResult = None
# getScannerResults会自动完成open,close 等scanner操作,HBase增强版必须使用此方法进行范围扫描
currentResults = self.client.getScannerResults(tableInbytes, scan, caching)
for result in currentResults:
results.append(result)
lastResult = result
# 如果一行都没有扫描出来,说明扫描已经结束,我们已经获得startRow和stopRow之间所有的result
if lastResult is None:
break
# 如果此次扫描是有结果的,我们必须构造一个比当前最后一个result的行大的最小row,继续进行扫描,以便返回所有结果
else:
nextStartRow = createClosestRowAfter(lastResult.row)
scan = TScan(startRow=nextStartRow, stopRow=stopRow)
self.transport.close()
return results
def scan_by_prefix(self, namespace, column_family, prefix_row_key, caching=2):
"""
rk前缀查询
:param namespace:
:param column_family:
:param prefix_row_key: 前缀查询 例如一个rk:000190c1c5b4da6_b48083aaf2bf98801 通过前缀:000190c1c5b4da6找到相应的rk
:param caching:
# # caching的大小为每次从服务器返回的行数,设置太大会导致服务器处理过久,太小会导致范围扫描与服务器做过多交互
# # 根据每行的大小,caching的值一般设置为10到100之间
:return:
"""
self.transport.open()
tableInbytes = "{}:{}".format(namespace, column_family).encode("utf-8")
startRow = "{}".format(prefix_row_key).encode("utf-8")
scan = TScan(startRow=startRow)
# # 扫描的结果
results = []
# # 此函数可以找到比当前row大的最小row,方法是在当前row后加入一个0x00的byte
# # 从比当前row大的最小row开始scan,可以保证中间不会漏扫描数据
def createClosestRowAfter(row):
array = bytearray(row)
array.append(0x00)
return bytes(array)
while True:
lastResult = None
# getScannerResults会自动完成open,close 等scanner操作,HBase增强版必须使用此方法进行范围扫描
currentResults = self.client.getScannerResults(tableInbytes, scan, caching)
if currentResults and prefix_row_key in currentResults[0].row.decode('utf-8'):
for result in currentResults:
results.append(result)
lastResult = result
else:
break
# 如果一行都没有扫描出来,说明扫描已经结束,我们已经获得startRow和stopRow之间所有的result
if lastResult is None:
break
# 如果此次扫描是有结果的,我们必须构造一个比当前最后一个result的行大的最小row,继续进行扫描,以便返回所有结果
else:
nextStartRow = createClosestRowAfter(lastResult.row)
scan = TScan(startRow=nextStartRow, )
self.transport.close()
return results
if __name__ == '__main__':
hb = HbaseHelper()
# 创建一个名为facebook的命名空间,理解创建一个Facebook数据库
namespace = "faceboook"
# 创建一个数据库为Facebook的数据库,大表为jkt_client,表的所有信息都属于小表facebook_main
column_family = "jkt_client"
family = "facebook_main"
kwargs = {
"row_key": "2",
"data": {
"type": 2,
"page_id": "-Elías-PCB-161219993935151",
"home_id": "161219993935151",
"obj_name": "(<-) Elías - PCB",
"img_url": "http://spider-silicon.3935151.jpg",
"c_url": "https://www.facebook.com/-Elías-PCB-161219993935151/?ref=br_rs",
"company_url": "http://www.eliaspcb.com"
}
}
# 插入单条数据
# hb.single_insert(namespace, column_family, family, **kwargs)
kwargs_list = [
{
"row_key": "3",
"data": {
"type": 2,
"page_id": "-Hair-Accessories--416385828419351",
"home_id": "416385828419351",
"obj_name": "~{ Hair Accessories }~",
"img_url": "http://spider-ccessories--416385828419351.jpg",
"c_url": "https://www.facebook.com/-Hair-Accessories--416385828419351/?ref=br_rs",
"company_url": ""
}
},
{
"row_key": "4",
"data": {
"type": 2,
"page_id": "-Joc-Lyn-Bicycle-Parts--386837101395047",
"home_id": "386837101395047",
"obj_name": "-Joc-Lyn-Bicycle-Parts",
"img_url": "",
"c_url": "https://www.facebook.com/-Joc-Lyn-Bicycle-Parts--386837101395047/?ref=br_rs",
"company_url": ""
}
}
]
# 批量插入数据
# hb.bulk_insert(namespace, column_family, family, kwargs_list)
# 通过行键获取数据
# row_key = "4"
# result = hb.get_by_row_key(namespace, column_family, row_key)
# print(result)
# 扫表
result = hb.scan(namespace, column_family, start_row_key="2", stop_row_key="3",caching=2)
print(result)
三.控制台查看结果
更多推荐
已为社区贡献3条内容
所有评论(0)