HBase Java API

API

  • 几个主要HBase API 类和数据模型之间的对应关系:

HBaseAdmin

  • 类: org.apache.hadoop.hbase.client.HBaseAdmin
  • 作用:提供了一个接口来管理 HBase 数据库的表信息。
    • 它提供的方法包括:创建表,删 除表,列出表项,使表有效或无效,以及添加或删除表列族成员等。

用法示例:

HBaseAdmin admin = new HBaseAdmin(config);
admin.disableTable("tablename");

HBaseConfiguration

  • 类:org.apache.hadoop.hbase.HBaseConfiguration
  • 作用:对 HBase 进行配置

用法示例:

HBaseConfiguration hconfig = new HBaseConfiguration();
hconfig.set("hbase.zookeeper.property.clientPort","2181");

该方法设置了 "hbase.zookeeper.property.clientPort"的端口号wei2181。
一般 情况下,HBaseConfiguration 会 使用构造函数进行初始化,然后 再使用其它方法。


HTableDescriptor

  • 类: org.apache.hadoop.hbase.HTableDescriptor
  • 作用:包含了表的名字极其对应表的列族

用法示例:

HTableDescriptor htd = new HTableDescriptor(table);
htd.addFamily(new HcolumnDescriptor("family");

在上述例子中,通过一个HTableDescriptor 实例,为HTableDescriptor 添加了一个列簇:family


HColumnDescriptor

  • 类: org.apache.hadoop.hbase.HColumnDescriptor
  • 作用:
    • 维护着关于列族的信息,例如版本号,压缩设置等。
    • 它通常在创建表或者为表添 加列族的时候使用。
    • 列族被创建后不能直接修改,只能通过删除然后重新创建的方式。
    • 列族被删除的时候,列族里面的数据也会同时被删除。

用法示例:

HTableDescriptor htd = new HTableDescriptor(tablename);
HColumnDescriptor col =  new HColumnDescriptor("content");
htd.addFamily(col); 

此例添加了一个content列簇


HTable

  • 类: org.apache.hadoop.hbase.client.Htable
  • 作用:可以用来和 HBase 表直接通信。此方法对于更新操作来说是非线程安全的。

用法示例:

HTable table = new HTable(conf,Bytes.toBytes(tablename));
ResultScanner scanner = table.getScanner(family);

Put

  • 类:org.apache.hadoop.hbase.client.Put
  • 作用:用来对单个行执行添加操作

用法示例:

HTable table = new HTable(conf,Bytes.toBytes(tablename));
Put put = new Put(brow); // 为指定行创建一个Put操作
put.add(family,qualifier,value);
table.Put(put);

Get

  • 类: org.apache.hadoop.hbase.client.Get
  • 作用:用来获取单个行的相关信息

用法示例:

HTable table = new HTabel(conf,Bytes.toBytes(tablename));
Get get = new Get(Bytes.toBytes(row));

Result

  • 类: org.apache.hadoop.hbase.client.Result
  • 作用:存储 Get 或者 Scan 操作后获取表的单行值。使用此类提供的方法可以直接获取值 或者各种 Map 结构( key-value 对)

创建HBase模块并导入Maven依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.4.6</version>
        </dependency>
    </dependencies>

HBase Java API

package com.liangzai.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

import java.io.IOException;

public class Demo01TestAPI {
    public static void main(String[] args) throws IOException {
        // 1、创建配置文件,设置HBase的连接地址(ZK的地址)
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181");        
        // 2、建立连接
        Connection conn = ConnectionFactory.createConnection(conf);

        /**
         *  3、执行操作:
         *  对表的结构进行操作 则getAdmin
         *  对表的数据进行操作 则getTable
         */

        Admin admin = conn.getAdmin();
        
        conn.getTable(TableName.valueOf("test5"));
    }
}

HBase Java API 常用操作

建立连接

    Connection conn;

    @Before
    // 建立连接
    public void init() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "192.168.3.100:2181,192.168.3.101:2181,192.168.3.102:2181");
        conn = ConnectionFactory.createConnection(conf);
    }

创建表

    @Test
    // cerate table
    public void createTable() throws IOException {
        Admin admin = conn.getAdmin();

        HTableDescriptor testAPI = new HTableDescriptor(TableName.valueOf("testAPI"));

        // 创建列簇
        HColumnDescriptor cf1 = new HColumnDescriptor("cf1");

        // 对列簇进行配置
        cf1.setMaxVersions(3);

        // 给testAPI表添加一个列簇
        testAPI.addFamily(cf1);

        // 创建testAPI表
        admin.createTable(testAPI);
    }

查看所有表

    @Test
    // 查看所有表
    public void ListTbales() throws IOException {
        Admin admin = conn.getAdmin();
        TableName[] tableNames = admin.listTableNames();
        for (TableName tableName : tableNames) {
            System.out.println(tableName.getNameAsString());
        }
    }

查看表结构

    @Test
    // desc查看表结构
    public void getTableDescriptor() throws IOException {
        Admin admin = conn.getAdmin();
        HTableDescriptor testAPI = admin.getTableDescriptor(TableName.valueOf("testAPI"));
        HColumnDescriptor[] cfs = testAPI.getColumnFamilies();
        for (HColumnDescriptor cf : cfs) {
            System.out.println(cf.getNameAsString());
            System.out.println(cf.getMaxVersions());
            System.out.println(cf.getTimeToLive());
        }
    }

修改表

    @Test
    // alter
    // 对testAPI 将cf1的版本设置为5,并且新加一个列簇cf2
    public void AlterTbale() throws IOException {
        Admin admin = conn.getAdmin();
        TableName testAPI = TableName.valueOf("testAPI");
        HTableDescriptor testAPIDesc = admin.getTableDescriptor(testAPI);
        HColumnDescriptor[] cfs = testAPIDesc.getColumnFamilies();
        for (HColumnDescriptor cf : cfs) {
            if ("cf1".equals(cf.getNameAsString())) {
                cf.setMaxVersions(5);
            }
        }
        testAPIDesc.addFamily(new HColumnDescriptor("cf2"));
        admin.modifyTable(testAPI, testAPIDesc);
    }

alter

    @Test
    // drop
    public void DropTable() throws IOException {
        Admin admin = conn.getAdmin();
        TableName tableName = TableName.valueOf("test2");
        if (admin.tableExists(tableName)) {
            // 表在删除之前需要先disable
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
        } else {
            System.out.println("表不存在!");
        }
    }

put

    @Test
    // put
    public void PutData() throws IOException {
        Table testAPI = conn.getTable(TableName.valueOf("testAPI"));
        Put put = new Put("001".getBytes());
        put.addColumn("cf1".getBytes(), "name".getBytes(), "张三".getBytes());
        put.addColumn("cf1".getBytes(), "age".getBytes(), "18".getBytes());
        put.addColumn("cf1".getBytes(), "clazz".getBytes(), "文科一班".getBytes());
        put.addColumn("cf1".getBytes(), "clazz".getBytes(), 1, "文科二班".getBytes());
        testAPI.put(put);
    }

get

    @Test
    // get
    public void GetData() throws IOException {
        Table testAPI = conn.getTable(TableName.valueOf("testAPI"));
        Get get = new Get("001".getBytes());
        get.setMaxVersions(10);
        Result rs = testAPI.get(get);
        // 获取rowkey
        byte[] row = rs.getRow();
        byte[] name = rs.getValue("cf1".getBytes(), "name".getBytes());
        byte[] age = rs.getValue("cf1".getBytes(), "age".getBytes());
        byte[] clazz = rs.getValue("cf1".getBytes(), "clazz".getBytes());
        System.out.println(Bytes.toString(row) + "," + Bytes.toString(name) + "," + Bytes.toString(age) + "," + Bytes.toString(clazz));
    }

提取数据的另一种方式

    @Test
    // 提取数据的另一种方式
    public void ListCells() throws IOException {
        Table testAPI = conn.getTable(TableName.valueOf("testAPI"));
        Get get = new Get("001".getBytes());
        get.setMaxVersions(10);
        Result rs = testAPI.get(get);
        // 获取所有的Cell
        List<Cell> cells = rs.listCells();
        for (Cell cell : cells) {
            String value = Bytes.toString(CellUtil.cloneValue(cell));
            System.out.println(value);
        }
    }

创建stu表,增加一个info列簇,将students.txt的数据批量插入

    @Test
    /**
     *  创建stu表,增加一个info列簇,将students.txt的1000条数据全部插入
     */ public void PutStu() throws IOException {
        TableName stu = TableName.valueOf("stu");
        // 创建表
        Admin admin = conn.getAdmin();
        if (!admin.tableExists(stu)) {
            admin.createTable(new HTableDescriptor(stu).addFamily(new HColumnDescriptor("info")));
        }
        Table stuTable = conn.getTable(stu);
        ArrayList<Put> puts = new ArrayList<>();
        // 读取文件
        BufferedReader br = new BufferedReader(new FileReader("data/students.txt"));
        int cnt = 0;
        String line;
        while ((line = br.readLine()) != null) {
            String[] split = line.split(",");
            String id = split[0];
            String name = split[1];
            String age = split[2];
            String gender = split[3];
            String clazz = split[4];

            Put put = new Put(id.getBytes());
            put.addColumn("info".getBytes(),"name".getBytes(),name.getBytes());
            put.addColumn("info".getBytes(),"age".getBytes(),age.getBytes());
            put.addColumn("info".getBytes(),"gender".getBytes(),gender.getBytes());
            put.addColumn("info".getBytes(),"clazz".getBytes(),clazz.getBytes());

            // 批量插入
            puts.add(put);
            cnt += 1;
            if (cnt == 100) {
                stuTable.put(puts);
                puts.clear(); // 清空
                cnt = 0;
            }
            // 逐条插入,效率低
//            stuTable.put(put);
        }
        // 判断Put的List是否为空
        if (!puts.isEmpty()) {
            stuTable.put(puts);
        }
        br.close();
    }

delete

    @Test
    // delete
    public void DeleteData() throws IOException {
        Table stuTable = conn.getTable(TableName.valueOf("stu"));
        Delete del = new Delete("1500100001".getBytes());
        stuTable.delete(del);
    }

完整代码

package com.liangzai.hbase;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class Demo02API {
    Connection conn;

    @Before
    // 建立连接
    public void init() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "192.168.3.100:2181,192.168.3.101:2181,192.168.3.102:2181");
        conn = ConnectionFactory.createConnection(conf);
    }

    @Test
    // cerate table
    public void createTable() throws IOException {
        Admin admin = conn.getAdmin();

        HTableDescriptor testAPI = new HTableDescriptor(TableName.valueOf("testAPI"));

        // 创建列簇
        HColumnDescriptor cf1 = new HColumnDescriptor("cf1");

        // 对列簇进行配置
        cf1.setMaxVersions(3);

        // 给testAPI表添加一个列簇
        testAPI.addFamily(cf1);

        // 创建testAPI表
        admin.createTable(testAPI);
    }

    @Test
    // 查看所有表
    public void ListTbales() throws IOException {
        Admin admin = conn.getAdmin();
        TableName[] tableNames = admin.listTableNames();
        for (TableName tableName : tableNames) {
            System.out.println(tableName.getNameAsString());
        }
    }

    @Test
    // desc查看表结构
    public void getTableDescriptor() throws IOException {
        Admin admin = conn.getAdmin();
        HTableDescriptor testAPI = admin.getTableDescriptor(TableName.valueOf("testAPI"));
        HColumnDescriptor[] cfs = testAPI.getColumnFamilies();
        for (HColumnDescriptor cf : cfs) {
            System.out.println(cf.getNameAsString());
            System.out.println(cf.getMaxVersions());
            System.out.println(cf.getTimeToLive());
        }
    }

    @Test
    // alter
    // 对testAPI 将cf1的版本设置为5,并且新加一个列簇cf2
    public void AlterTbale() throws IOException {
        Admin admin = conn.getAdmin();
        TableName testAPI = TableName.valueOf("testAPI");
        HTableDescriptor testAPIDesc = admin.getTableDescriptor(testAPI);
        HColumnDescriptor[] cfs = testAPIDesc.getColumnFamilies();
        for (HColumnDescriptor cf : cfs) {
            if ("cf1".equals(cf.getNameAsString())) {
                cf.setMaxVersions(5);
            }
        }
        testAPIDesc.addFamily(new HColumnDescriptor("cf2"));
        admin.modifyTable(testAPI, testAPIDesc);
    }

    @Test
    // drop
    public void DropTable() throws IOException {
        Admin admin = conn.getAdmin();
        TableName tableName = TableName.valueOf("test2");
        if (admin.tableExists(tableName)) {
            // 表在删除之前需要先disable
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
        } else {
            System.out.println("表不存在!");
        }
    }

    @Test
    // put
    public void PutData() throws IOException {
        Table testAPI = conn.getTable(TableName.valueOf("testAPI"));
        Put put = new Put("001".getBytes());
        put.addColumn("cf1".getBytes(), "name".getBytes(), "张三".getBytes());
        put.addColumn("cf1".getBytes(), "age".getBytes(), "18".getBytes());
        put.addColumn("cf1".getBytes(), "clazz".getBytes(), "文科一班".getBytes());
        put.addColumn("cf1".getBytes(), "clazz".getBytes(), 1, "文科二班".getBytes());
        testAPI.put(put);
    }

    @Test
    // get
    public void GetData() throws IOException {
        Table testAPI = conn.getTable(TableName.valueOf("testAPI"));
        Get get = new Get("001".getBytes());
        get.setMaxVersions(10);
        Result rs = testAPI.get(get);
        // 获取rowkey
        byte[] row = rs.getRow();
        byte[] name = rs.getValue("cf1".getBytes(), "name".getBytes());
        byte[] age = rs.getValue("cf1".getBytes(), "age".getBytes());
        byte[] clazz = rs.getValue("cf1".getBytes(), "clazz".getBytes());
        System.out.println(Bytes.toString(row) + "," + Bytes.toString(name) + "," + Bytes.toString(age) + "," + Bytes.toString(clazz));
    }

    @Test
    // 提取数据的另一种方式
    public void ListCells() throws IOException {
        Table testAPI = conn.getTable(TableName.valueOf("testAPI"));
        Get get = new Get("001".getBytes());
        get.setMaxVersions(10);
        Result rs = testAPI.get(get);
        // 获取所有的Cell
        List<Cell> cells = rs.listCells();
        for (Cell cell : cells) {
            String value = Bytes.toString(CellUtil.cloneValue(cell));
            System.out.println(value);
        }
    }

    @Test
    /**
     *  创建stu表,增加一个info列簇,将students.txt的1000条数据全部插入
     */ public void PutStu() throws IOException {
        TableName stu = TableName.valueOf("stu");
        // 创建表
        Admin admin = conn.getAdmin();
        if (!admin.tableExists(stu)) {
            admin.createTable(new HTableDescriptor(stu).addFamily(new HColumnDescriptor("info")));
        }
        Table stuTable = conn.getTable(stu);
        ArrayList<Put> puts = new ArrayList<>();
        // 读取文件
        BufferedReader br = new BufferedReader(new FileReader("data/students.txt"));
        int cnt = 0;
        String line;
        while ((line = br.readLine()) != null) {
            String[] split = line.split(",");
            String id = split[0];
            String name = split[1];
            String age = split[2];
            String gender = split[3];
            String clazz = split[4];

            Put put = new Put(id.getBytes());
            put.addColumn("info".getBytes(),"name".getBytes(),name.getBytes());
            put.addColumn("info".getBytes(),"age".getBytes(),age.getBytes());
            put.addColumn("info".getBytes(),"gender".getBytes(),gender.getBytes());
            put.addColumn("info".getBytes(),"clazz".getBytes(),calzz.getBytes());

            // 批量插入
            puts.add(put);
            cnt += 1;
            if (cnt == 100) {
                stuTable.put(puts);
                puts.clear(); // 清空
                cnt = 0;
            }
            // 逐条插入,效率低
//            stuTable.put(put);
        }
        // 判断Put的List是否为空
        if (!puts.isEmpty()) {
            stuTable.put(puts);
        }
        br.close();
    }

    @Test
    // delete
    public void DeleteData() throws IOException {
        Table stuTable = conn.getTable(TableName.valueOf("stu"));
        Delete del = new Delete("1500100001".getBytes());
        stuTable.delete(del);
    }

    @After
    public void close() throws IOException {
        conn.close();
    }
}

电信案例

批量插入数据

    @Test
    // 将数据写入HBase
    public void putALL() throws IOException {
        Table dx_tb = conn.getTable(dianxin);
        ArrayList<Put> puts = new ArrayList<>();
        int cnt = 0;
        int batchSize = 1000;

        BufferedReader br = new BufferedReader(new FileReader("data/DIANXIN.csv"));
        String line;
        while ((line = br.readLine()) != null) {
            String[] split = line.split(",");
            String mdn = split[0];
            String start_time = split[1];
            String lg = split[4];
            String lat = split[5];

            Put put = new Put(mdn.getBytes());
            put.addColumn("cf1".getBytes(),"lg".getBytes(),Long.parseLong(start_time), lg.getBytes());
            put.addColumn("cf1".getBytes(),"lat".getBytes(),Long.parseLong(start_time),lat.getBytes());

            puts.add(put);
            cnt += 1;

            if (cnt == batchSize) {
                dx_tb.put(puts);
                puts.clear();
                cnt = 0;
            }
        }
        if (!puts.isEmpty()) {
            dx_tb.put(puts);
        }
        br.close();
    }

根据经纬度获取用户最新的3个地理位置

    @Test
    // 根据mdn获取用户最新的3个位置
    public void getPositionByMdn() throws IOException {
        Table dx_tb = conn.getTable(dianxin);
        String mdn = "48E938D94E866A716C86642C5E9AF179DA658A07";

        Get get = new Get(mdn.getBytes());
        get.setMaxVersions(3);
        Result rs = dx_tb.get(get);

        ArrayList<String> lgArr = new ArrayList<>();
        ArrayList<String> latArr = new ArrayList<>();

        for (Cell cell : rs.listCells()) {
            String value = Bytes.toString(CellUtil.cloneValue(cell));
            if ("lg".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                lgArr.add(value);
            } else if ("lat".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                latArr.add(value);
            }
        }

        for (int i = 0; i < 3; i++) {
            System.out.println(lgArr.get(i) + "," + latArr.get(i));
        }
    }

到底啦!关注靓仔学习更多的大数据知识!😊

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐