HBase Java API
文章目录HBase Java API创建HBase模块并导入Maven依赖HBase Java APIHBase Java API 常用操作HBase Java API创建HBase模块并导入Maven依赖<dependencies><dependency><groupId>org.apache.hbase</groupId><artifact
·
文章目录
HBase Java API
API
- 几个主要HBase API 类和数据模型之间的对应关系:
HBaseAdmin
- 类: org.apache.hadoop.hbase.client.HBaseAdmin
- 作用:提供了一个接口来管理 HBase 数据库的表信息。
- 它提供的方法包括:创建表,删 除表,列出表项,使表有效或无效,以及添加或删除表列族成员等。
用法示例:
HBaseAdmin admin = new HBaseAdmin(config);
admin.disableTable("tablename");
HBaseConfiguration
- 类:org.apache.hadoop.hbase.HBaseConfiguration
- 作用:对 HBase 进行配置
用法示例:
HBaseConfiguration hconfig = new HBaseConfiguration();
hconfig.set("hbase.zookeeper.property.clientPort","2181");
该方法设置了 "hbase.zookeeper.property.clientPort"的端口号wei2181。
一般 情况下,HBaseConfiguration 会 使用构造函数进行初始化,然后 再使用其它方法。
HTableDescriptor
- 类: org.apache.hadoop.hbase.HTableDescriptor
- 作用:包含了表的名字极其对应表的列族
用法示例:
HTableDescriptor htd = new HTableDescriptor(table);
htd.addFamily(new HcolumnDescriptor("family");
在上述例子中,通过一个HTableDescriptor 实例,为HTableDescriptor 添加了一个列簇:family
HColumnDescriptor
- 类: org.apache.hadoop.hbase.HColumnDescriptor
- 作用:
- 维护着关于列族的信息,例如版本号,压缩设置等。
- 它通常在创建表或者为表添 加列族的时候使用。
- 列族被创建后不能直接修改,只能通过删除然后重新创建的方式。
- 列族被删除的时候,列族里面的数据也会同时被删除。
用法示例:
HTableDescriptor htd = new HTableDescriptor(tablename);
HColumnDescriptor col = new HColumnDescriptor("content");
htd.addFamily(col);
此例添加了一个content列簇
HTable
- 类: org.apache.hadoop.hbase.client.Htable
- 作用:可以用来和 HBase 表直接通信。此方法对于更新操作来说是非线程安全的。
用法示例:
HTable table = new HTable(conf,Bytes.toBytes(tablename));
ResultScanner scanner = table.getScanner(family);
Put
- 类:org.apache.hadoop.hbase.client.Put
- 作用:用来对单个行执行添加操作
用法示例:
HTable table = new HTable(conf,Bytes.toBytes(tablename));
Put put = new Put(brow); // 为指定行创建一个Put操作
put.add(family,qualifier,value);
table.Put(put);
Get
- 类: org.apache.hadoop.hbase.client.Get
- 作用:用来获取单个行的相关信息
用法示例:
HTable table = new HTabel(conf,Bytes.toBytes(tablename));
Get get = new Get(Bytes.toBytes(row));
Result
- 类: org.apache.hadoop.hbase.client.Result
- 作用:存储 Get 或者 Scan 操作后获取表的单行值。使用此类提供的方法可以直接获取值 或者各种 Map 结构( key-value 对)
创建HBase模块并导入Maven依赖
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.4.6</version>
</dependency>
</dependencies>
HBase Java API
package com.liangzai.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;
public class Demo01TestAPI {
public static void main(String[] args) throws IOException {
// 1、创建配置文件,设置HBase的连接地址(ZK的地址)
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181");
// 2、建立连接
Connection conn = ConnectionFactory.createConnection(conf);
/**
* 3、执行操作:
* 对表的结构进行操作 则getAdmin
* 对表的数据进行操作 则getTable
*/
Admin admin = conn.getAdmin();
conn.getTable(TableName.valueOf("test5"));
}
}
HBase Java API 常用操作
建立连接
Connection conn;
@Before
// 建立连接
public void init() throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.3.100:2181,192.168.3.101:2181,192.168.3.102:2181");
conn = ConnectionFactory.createConnection(conf);
}
创建表
@Test
// cerate table
public void createTable() throws IOException {
Admin admin = conn.getAdmin();
HTableDescriptor testAPI = new HTableDescriptor(TableName.valueOf("testAPI"));
// 创建列簇
HColumnDescriptor cf1 = new HColumnDescriptor("cf1");
// 对列簇进行配置
cf1.setMaxVersions(3);
// 给testAPI表添加一个列簇
testAPI.addFamily(cf1);
// 创建testAPI表
admin.createTable(testAPI);
}
查看所有表
@Test
// 查看所有表
public void ListTbales() throws IOException {
Admin admin = conn.getAdmin();
TableName[] tableNames = admin.listTableNames();
for (TableName tableName : tableNames) {
System.out.println(tableName.getNameAsString());
}
}
查看表结构
@Test
// desc查看表结构
public void getTableDescriptor() throws IOException {
Admin admin = conn.getAdmin();
HTableDescriptor testAPI = admin.getTableDescriptor(TableName.valueOf("testAPI"));
HColumnDescriptor[] cfs = testAPI.getColumnFamilies();
for (HColumnDescriptor cf : cfs) {
System.out.println(cf.getNameAsString());
System.out.println(cf.getMaxVersions());
System.out.println(cf.getTimeToLive());
}
}
修改表
@Test
// alter
// 对testAPI 将cf1的版本设置为5,并且新加一个列簇cf2
public void AlterTbale() throws IOException {
Admin admin = conn.getAdmin();
TableName testAPI = TableName.valueOf("testAPI");
HTableDescriptor testAPIDesc = admin.getTableDescriptor(testAPI);
HColumnDescriptor[] cfs = testAPIDesc.getColumnFamilies();
for (HColumnDescriptor cf : cfs) {
if ("cf1".equals(cf.getNameAsString())) {
cf.setMaxVersions(5);
}
}
testAPIDesc.addFamily(new HColumnDescriptor("cf2"));
admin.modifyTable(testAPI, testAPIDesc);
}
alter
@Test
// drop
public void DropTable() throws IOException {
Admin admin = conn.getAdmin();
TableName tableName = TableName.valueOf("test2");
if (admin.tableExists(tableName)) {
// 表在删除之前需要先disable
admin.disableTable(tableName);
admin.deleteTable(tableName);
} else {
System.out.println("表不存在!");
}
}
put
@Test
// put
public void PutData() throws IOException {
Table testAPI = conn.getTable(TableName.valueOf("testAPI"));
Put put = new Put("001".getBytes());
put.addColumn("cf1".getBytes(), "name".getBytes(), "张三".getBytes());
put.addColumn("cf1".getBytes(), "age".getBytes(), "18".getBytes());
put.addColumn("cf1".getBytes(), "clazz".getBytes(), "文科一班".getBytes());
put.addColumn("cf1".getBytes(), "clazz".getBytes(), 1, "文科二班".getBytes());
testAPI.put(put);
}
get
@Test
// get
public void GetData() throws IOException {
Table testAPI = conn.getTable(TableName.valueOf("testAPI"));
Get get = new Get("001".getBytes());
get.setMaxVersions(10);
Result rs = testAPI.get(get);
// 获取rowkey
byte[] row = rs.getRow();
byte[] name = rs.getValue("cf1".getBytes(), "name".getBytes());
byte[] age = rs.getValue("cf1".getBytes(), "age".getBytes());
byte[] clazz = rs.getValue("cf1".getBytes(), "clazz".getBytes());
System.out.println(Bytes.toString(row) + "," + Bytes.toString(name) + "," + Bytes.toString(age) + "," + Bytes.toString(clazz));
}
提取数据的另一种方式
@Test
// 提取数据的另一种方式
public void ListCells() throws IOException {
Table testAPI = conn.getTable(TableName.valueOf("testAPI"));
Get get = new Get("001".getBytes());
get.setMaxVersions(10);
Result rs = testAPI.get(get);
// 获取所有的Cell
List<Cell> cells = rs.listCells();
for (Cell cell : cells) {
String value = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println(value);
}
}
创建stu表,增加一个info列簇,将students.txt的数据批量插入
@Test
/**
* 创建stu表,增加一个info列簇,将students.txt的1000条数据全部插入
*/ public void PutStu() throws IOException {
TableName stu = TableName.valueOf("stu");
// 创建表
Admin admin = conn.getAdmin();
if (!admin.tableExists(stu)) {
admin.createTable(new HTableDescriptor(stu).addFamily(new HColumnDescriptor("info")));
}
Table stuTable = conn.getTable(stu);
ArrayList<Put> puts = new ArrayList<>();
// 读取文件
BufferedReader br = new BufferedReader(new FileReader("data/students.txt"));
int cnt = 0;
String line;
while ((line = br.readLine()) != null) {
String[] split = line.split(",");
String id = split[0];
String name = split[1];
String age = split[2];
String gender = split[3];
String clazz = split[4];
Put put = new Put(id.getBytes());
put.addColumn("info".getBytes(),"name".getBytes(),name.getBytes());
put.addColumn("info".getBytes(),"age".getBytes(),age.getBytes());
put.addColumn("info".getBytes(),"gender".getBytes(),gender.getBytes());
put.addColumn("info".getBytes(),"clazz".getBytes(),clazz.getBytes());
// 批量插入
puts.add(put);
cnt += 1;
if (cnt == 100) {
stuTable.put(puts);
puts.clear(); // 清空
cnt = 0;
}
// 逐条插入,效率低
// stuTable.put(put);
}
// 判断Put的List是否为空
if (!puts.isEmpty()) {
stuTable.put(puts);
}
br.close();
}
delete
@Test
// delete
public void DeleteData() throws IOException {
Table stuTable = conn.getTable(TableName.valueOf("stu"));
Delete del = new Delete("1500100001".getBytes());
stuTable.delete(del);
}
完整代码
package com.liangzai.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class Demo02API {
Connection conn;
@Before
// 建立连接
public void init() throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "192.168.3.100:2181,192.168.3.101:2181,192.168.3.102:2181");
conn = ConnectionFactory.createConnection(conf);
}
@Test
// cerate table
public void createTable() throws IOException {
Admin admin = conn.getAdmin();
HTableDescriptor testAPI = new HTableDescriptor(TableName.valueOf("testAPI"));
// 创建列簇
HColumnDescriptor cf1 = new HColumnDescriptor("cf1");
// 对列簇进行配置
cf1.setMaxVersions(3);
// 给testAPI表添加一个列簇
testAPI.addFamily(cf1);
// 创建testAPI表
admin.createTable(testAPI);
}
@Test
// 查看所有表
public void ListTbales() throws IOException {
Admin admin = conn.getAdmin();
TableName[] tableNames = admin.listTableNames();
for (TableName tableName : tableNames) {
System.out.println(tableName.getNameAsString());
}
}
@Test
// desc查看表结构
public void getTableDescriptor() throws IOException {
Admin admin = conn.getAdmin();
HTableDescriptor testAPI = admin.getTableDescriptor(TableName.valueOf("testAPI"));
HColumnDescriptor[] cfs = testAPI.getColumnFamilies();
for (HColumnDescriptor cf : cfs) {
System.out.println(cf.getNameAsString());
System.out.println(cf.getMaxVersions());
System.out.println(cf.getTimeToLive());
}
}
@Test
// alter
// 对testAPI 将cf1的版本设置为5,并且新加一个列簇cf2
public void AlterTbale() throws IOException {
Admin admin = conn.getAdmin();
TableName testAPI = TableName.valueOf("testAPI");
HTableDescriptor testAPIDesc = admin.getTableDescriptor(testAPI);
HColumnDescriptor[] cfs = testAPIDesc.getColumnFamilies();
for (HColumnDescriptor cf : cfs) {
if ("cf1".equals(cf.getNameAsString())) {
cf.setMaxVersions(5);
}
}
testAPIDesc.addFamily(new HColumnDescriptor("cf2"));
admin.modifyTable(testAPI, testAPIDesc);
}
@Test
// drop
public void DropTable() throws IOException {
Admin admin = conn.getAdmin();
TableName tableName = TableName.valueOf("test2");
if (admin.tableExists(tableName)) {
// 表在删除之前需要先disable
admin.disableTable(tableName);
admin.deleteTable(tableName);
} else {
System.out.println("表不存在!");
}
}
@Test
// put
public void PutData() throws IOException {
Table testAPI = conn.getTable(TableName.valueOf("testAPI"));
Put put = new Put("001".getBytes());
put.addColumn("cf1".getBytes(), "name".getBytes(), "张三".getBytes());
put.addColumn("cf1".getBytes(), "age".getBytes(), "18".getBytes());
put.addColumn("cf1".getBytes(), "clazz".getBytes(), "文科一班".getBytes());
put.addColumn("cf1".getBytes(), "clazz".getBytes(), 1, "文科二班".getBytes());
testAPI.put(put);
}
@Test
// get
public void GetData() throws IOException {
Table testAPI = conn.getTable(TableName.valueOf("testAPI"));
Get get = new Get("001".getBytes());
get.setMaxVersions(10);
Result rs = testAPI.get(get);
// 获取rowkey
byte[] row = rs.getRow();
byte[] name = rs.getValue("cf1".getBytes(), "name".getBytes());
byte[] age = rs.getValue("cf1".getBytes(), "age".getBytes());
byte[] clazz = rs.getValue("cf1".getBytes(), "clazz".getBytes());
System.out.println(Bytes.toString(row) + "," + Bytes.toString(name) + "," + Bytes.toString(age) + "," + Bytes.toString(clazz));
}
@Test
// 提取数据的另一种方式
public void ListCells() throws IOException {
Table testAPI = conn.getTable(TableName.valueOf("testAPI"));
Get get = new Get("001".getBytes());
get.setMaxVersions(10);
Result rs = testAPI.get(get);
// 获取所有的Cell
List<Cell> cells = rs.listCells();
for (Cell cell : cells) {
String value = Bytes.toString(CellUtil.cloneValue(cell));
System.out.println(value);
}
}
@Test
/**
* 创建stu表,增加一个info列簇,将students.txt的1000条数据全部插入
*/ public void PutStu() throws IOException {
TableName stu = TableName.valueOf("stu");
// 创建表
Admin admin = conn.getAdmin();
if (!admin.tableExists(stu)) {
admin.createTable(new HTableDescriptor(stu).addFamily(new HColumnDescriptor("info")));
}
Table stuTable = conn.getTable(stu);
ArrayList<Put> puts = new ArrayList<>();
// 读取文件
BufferedReader br = new BufferedReader(new FileReader("data/students.txt"));
int cnt = 0;
String line;
while ((line = br.readLine()) != null) {
String[] split = line.split(",");
String id = split[0];
String name = split[1];
String age = split[2];
String gender = split[3];
String clazz = split[4];
Put put = new Put(id.getBytes());
put.addColumn("info".getBytes(),"name".getBytes(),name.getBytes());
put.addColumn("info".getBytes(),"age".getBytes(),age.getBytes());
put.addColumn("info".getBytes(),"gender".getBytes(),gender.getBytes());
put.addColumn("info".getBytes(),"clazz".getBytes(),calzz.getBytes());
// 批量插入
puts.add(put);
cnt += 1;
if (cnt == 100) {
stuTable.put(puts);
puts.clear(); // 清空
cnt = 0;
}
// 逐条插入,效率低
// stuTable.put(put);
}
// 判断Put的List是否为空
if (!puts.isEmpty()) {
stuTable.put(puts);
}
br.close();
}
@Test
// delete
public void DeleteData() throws IOException {
Table stuTable = conn.getTable(TableName.valueOf("stu"));
Delete del = new Delete("1500100001".getBytes());
stuTable.delete(del);
}
@After
public void close() throws IOException {
conn.close();
}
}
电信案例
批量插入数据
@Test
// 将数据写入HBase
public void putALL() throws IOException {
Table dx_tb = conn.getTable(dianxin);
ArrayList<Put> puts = new ArrayList<>();
int cnt = 0;
int batchSize = 1000;
BufferedReader br = new BufferedReader(new FileReader("data/DIANXIN.csv"));
String line;
while ((line = br.readLine()) != null) {
String[] split = line.split(",");
String mdn = split[0];
String start_time = split[1];
String lg = split[4];
String lat = split[5];
Put put = new Put(mdn.getBytes());
put.addColumn("cf1".getBytes(),"lg".getBytes(),Long.parseLong(start_time), lg.getBytes());
put.addColumn("cf1".getBytes(),"lat".getBytes(),Long.parseLong(start_time),lat.getBytes());
puts.add(put);
cnt += 1;
if (cnt == batchSize) {
dx_tb.put(puts);
puts.clear();
cnt = 0;
}
}
if (!puts.isEmpty()) {
dx_tb.put(puts);
}
br.close();
}
根据经纬度获取用户最新的3个地理位置
@Test
// 根据mdn获取用户最新的3个位置
public void getPositionByMdn() throws IOException {
Table dx_tb = conn.getTable(dianxin);
String mdn = "48E938D94E866A716C86642C5E9AF179DA658A07";
Get get = new Get(mdn.getBytes());
get.setMaxVersions(3);
Result rs = dx_tb.get(get);
ArrayList<String> lgArr = new ArrayList<>();
ArrayList<String> latArr = new ArrayList<>();
for (Cell cell : rs.listCells()) {
String value = Bytes.toString(CellUtil.cloneValue(cell));
if ("lg".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
lgArr.add(value);
} else if ("lat".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
latArr.add(value);
}
}
for (int i = 0; i < 3; i++) {
System.out.println(lgArr.get(i) + "," + latArr.get(i));
}
}
到底啦!关注靓仔学习更多的大数据知识!😊
更多推荐
已为社区贡献1条内容
所有评论(0)