前言

在前面的章节内, 我们介绍了如何安装与操作HBase. 本章, 我们将讲解下HBase MR的相关操作. 这段内容在开发的过程中, 经常用来构建索引. 以提高用户的搜索效率.

本文相关资源, 可在我的Github项目 https://github.com/SeanYanxml/bigdata/ 目录下的HBase子目录模块可以找到. PS: (如果觉得项目不错, 可以给我一个Star.)


前置条件

  • JDK
  • Hadoop
  • HBase
  • HBase API

正文

  • Maven导入文件
<dependencies>
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-client</artifactId>
			<version>1.2.11</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common -->
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-common</artifactId>
			<version>1.2.11</version>
		</dependency>

		<!-- for org.apache.hadoop.hbase.mapreduce.TableMapper -->
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-server</artifactId>
			<version>1.2.11</version>
		</dependency>


		<dependency>
			<groupId>com.google.protobuf</groupId>
			<artifactId>protobuf-java</artifactId>
			<version>2.5.0</version>

		</dependency>
  • 初始化表

	/**
	 * HBase配置.
	 * 
	 * */
	static Configuration config = null;
	static{
		config = HBaseConfiguration.create();
		config.set("hbase.zookeeper.quorum", "localhost:2181,localhost:2182,localhost:2183");
	}
	
	/**
	 * 表信息.
	 * */
	public static final String tableName = "word";
	public static final String colFamily = "content";
	public static final String col = "info";
	public static final String resultTableName = "stat";
	
	
	/**
	 * 初始化表结构即其数据.
	 * *
	 */
	@Before
	 public static void initTable(){
		 HTable table = null;
		 HBaseAdmin admin = null;
		 try{
			 admin = new HBaseAdmin(config); // 创建表管理
			 // 如果表存在, 则删除表(Demo写法)
			 if(admin.tableExists(tableName)||admin.tableExists(resultTableName)){
				 System.out.println("Table is exists.");
				 admin.disableTable(tableName);
				 admin.deleteTable(tableName);
				 admin.disableTable(resultTableName);
				 admin.deleteTable(resultTableName);
			 }
			 // 创建表
			 // 创建数据表
			 HTableDescriptor desc = new HTableDescriptor(tableName);
			 HColumnDescriptor columnFamily = new HColumnDescriptor(colFamily);
			 desc.addFamily(columnFamily);
			 admin.createTable(desc);
			 // 创建结果表
			 HTableDescriptor resultDesc = new HTableDescriptor(resultTableName);
			 HColumnDescriptor resultColumnFamily = new HColumnDescriptor(colFamily);
			 resultDesc.addFamily(resultColumnFamily);
			 admin.createTable(resultDesc);
			 
			 // 插入数据
			 table = new HTable(config, tableName);
			 table.setAutoFlush(false);
			 table.setWriteBufferSize(500);
			 // 编写写入的数据
			 List<Put> listPut = new ArrayList<Put>();
			 Put put1 = new Put(Bytes.toBytes("1"));
			 put1.add(colFamily.getBytes(), col.getBytes(), Bytes.toBytes("HELlO Kitty123"));
			 listPut.add(put1);
			 Put put2 = new Put(Bytes.toBytes("2"));
			 put2.add(colFamily.getBytes(), col.getBytes(), Bytes.toBytes("Mark 10029."));
			 listPut.add(put2);
			 Put put3 = new Put(Bytes.toBytes("3"));
			 put3.add(colFamily.getBytes(), col.getBytes(), Bytes.toBytes("Demo Kitty123"));
			 listPut.add(put3);
			 
			 table.put(listPut);
			 table.flushCommits();
			 listPut.clear();
			 System.out.println("Init Table MethodOK.");
		 }catch(Exception e){
			 
		 }
	 }
  • Mapper & Reducer
 public static class HBaseMRMapper extends TableMapper<Text, IntWritable>{
		
		// 输入为一行的<key:rowKey, value> 输出为<result>
		@Override
		protected void map(ImmutableBytesWritable key, Result value,
				org.apache.hadoop.mapreduce.Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			// 读取一行数据
			String words = Bytes.toString(value.getValue(Bytes.toBytes(colFamily), Bytes.toBytes(col)));
			String wordSplit[] = words.split(" ");
			// 循环输出
			for(String word:wordSplit){
				context.write(new Text(word), new IntWritable(1));
			}
		}	
	 }
	 
	 public static class HBaseMRReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{

		@Override
		// 统计次数
		protected void reduce(Text key, Iterable<IntWritable> values,
				org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, ImmutableBytesWritable, Mutation>.Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			for(IntWritable num:values){
				sum+=num.get();
			}
			Put put = new Put(Bytes.toBytes(key.toString()));
			// 需要注意的是 int 存储为/xxxx/xxx的形式. 有时为了方便查看, 转换为字符串形式.
			put.add(Bytes.toBytes(colFamily),Bytes.toBytes(col),Bytes.toBytes(String.valueOf(sum)));
			// 写到hbase
			context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())), put);
		}
	 }
  • 主程序Main
	 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//		config.set("fs.defaultFS", "hdfs://localhost:9000/");
//		config.set("fs.defaultFS", "hdfs://localhost:9000/");

//		config.set("mapreduce.framework.name", "yarn");
		config.set("mapreduce.framework.name", "local");
		// 初始化表- 注意线程安全问题.
//		synchronized (HBaseMR.class) {
			initTable();
//		}
		
//		initTable();
		System.out.println("Init Table OK.");
		
		Job job = Job.getInstance(config,"HBaseMR");
		job.setJarByClass(HBaseMR.class);
		
		// 创建Scan
		Scan scan = new Scan();
		scan.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
		
		// 创建查询Mapper
		// 特别注意这个类 不要用hadoop1.x中的
//		TableMapReduceUtil.initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job);(scan, HBaseMRMapper.class, Text.class, IntWritable.class, job);
		TableMapReduceUtil.initTableMapperJob(tableName, scan, HBaseMRMapper.class, Text.class, IntWritable.class, job);

		// 创建写入HBase的Reducer
		
		TableMapReduceUtil.initTableReducerJob(resultTableName, HBaseMRReducer.class, job);
//		HBaseMRReducer.class, 
		
		System.out.println(job.waitForCompletion(true)?0:1);
		
	}

运行结果

本地运行后, 查看HBase内的表, 显示如下:
另: 当然你也可以通过设置// config.set("mapreduce.framework.name", "yarn");MR程序在Yarn上进行运行. 当然, 是需要提前打包.

hbase(main):048:0> scan 'word'
ROW                                            COLUMN+CELL
 1                                             column=content:info, timestamp=1554978176277, value=HELlO Kitty123
 2                                             column=content:info, timestamp=1554978176277, value=Mark 10029.
 3                                             column=content:info, timestamp=1554978176277, value=Demo Kitty123
3 row(s) in 0.1960 seconds

hbase(main):049:0> scan 'stat'
ROW                                            COLUMN+CELL
 10029.                                        column=content:info, timestamp=1554978179119, value=1
 Demo                                          column=content:info, timestamp=1554978179119, value=1
 HELlO                                         column=content:info, timestamp=1554978179119, value=1
 Kitty123                                      column=content:info, timestamp=1554978179119, value=2
 Mark                                          column=content:info, timestamp=1554978179119, value=1
5 row(s) in 0.2310 seconds

在这里插入图片描述


Q&A

  • protobufjar包异常.
log4j:WARN No appenders could be found for logger (org.apache.hadoop.security.Groups).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Table is exists.
Exception in thread "main" java.io.FileNotFoundException: File does not exist: hdfs://localhost:9000/Users/Sean/.m2/repository/com/google/protobuf/protobuf-java/2.5.0/protobuf-java-2.5.0.jar
	at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1072)
	at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1064)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1064)
	at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:288)
	at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:224)
	at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestamps(ClientDistributedCacheManager.java:93)
	at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(ClientDistributedCacheManager.java:57)
	at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:265)
	at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:301)
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:389)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
	at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)
	at com.yanxml.bigdata.hbase.qucikstart.HBaseMR.main(HBaseMR.java:169)

这类问题,有2个: Jar包冲突地址错误.

  • 地址错误
主类中不需要再配置 //		config.set("fs.defaultFS", "hdfs://localhost:9000/");
注释即可.
  • Jar包冲突
<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-client</artifactId>
			<version>1.2.11</version>
			<exclusions>
				<exclusion>
						<groupId>com.google.protobuf</groupId>
						<artifactId>protobuf-java</artifactId>
				</exclusion>
			</exclusions>
	</dependency>
<dependency>
			<groupId>com.google.protobuf</groupId>
			<artifactId>protobuf-java</artifactId>
			<version>2.5.0</version>
		</dependency>

HBase 工程中 protobuf 版本冲突解决
如何解决java工程中多个版本的包冲突问题
HBase 工程中 protobuf 版本冲突解决
hadoop :java.io.FileNotFoundException: File does not exist:

  • initTableMapJobinitTableMapperJob.

解决办法: initTableMapJobHadoop1.x的API. initTableMapperJobHadoop 2.x的. 起因是TableMapReduceUtil工具类导入了1.x的引用, 换成2.x即可.
HBase 1.0.0 API的变化

  • HBase& ElasticSearch?

es里可以只存需要查询的字段和hbase的rowkey,hbase存所有数据
使用elasticsearch提高hbase基于列的查询效率
聊聊MySQL、HBase、ES的特点和区别


Reference

[1]. [How to] MapReduce on HBase ----- 简单二级索引的实现
[2]. Elasticsearch+hbase 实现hbase中数据的快速查询(一)
[3]. HBase数据同步到ElasticSearch的方案
[4]. HBase整合MapReduce之建立HBase索引
[5]. MapReduce来构建索引
[6]. 大数据MapReduce入门之倒排索引
[7]. 用MR(MapReduce)查询hbase数据-用到TableMapper和Scan

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐