HBase MR
前言在前面的章节内, 我们介绍了如何安装与操作HBase. 本章, 我们将讲解下HBase MR的相关操作. 这段内容在开发的过程中, 经常用来构建索引. 以提高用户的搜索效率.前置条件JDKHadoopHBaseHBase API正文Reference[1]. [How to] MapReduce on HBase ----- 简单二级索引的实现[2]. Elast...
前言
在前面的章节内, 我们介绍了如何安装与操作HBase
. 本章, 我们将讲解下HBase MR
的相关操作. 这段内容在开发的过程中, 经常用来构建索引. 以提高用户的搜索效率.
本文相关资源, 可在我的Github项目 https://github.com/SeanYanxml/bigdata/ 目录下的HBase子目录模块
可以找到. PS: (如果觉得项目不错, 可以给我一个Star.)
前置条件
- JDK
- Hadoop
- HBase
- HBase API
正文
- Maven导入文件
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.2.11</version>
</dependency>
<!-- for org.apache.hadoop.hbase.mapreduce.TableMapper -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.11</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
- 初始化表
/**
* HBase配置.
*
* */
static Configuration config = null;
static{
config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", "localhost:2181,localhost:2182,localhost:2183");
}
/**
* 表信息.
* */
public static final String tableName = "word";
public static final String colFamily = "content";
public static final String col = "info";
public static final String resultTableName = "stat";
/**
* 初始化表结构即其数据.
* *
*/
@Before
public static void initTable(){
HTable table = null;
HBaseAdmin admin = null;
try{
admin = new HBaseAdmin(config); // 创建表管理
// 如果表存在, 则删除表(Demo写法)
if(admin.tableExists(tableName)||admin.tableExists(resultTableName)){
System.out.println("Table is exists.");
admin.disableTable(tableName);
admin.deleteTable(tableName);
admin.disableTable(resultTableName);
admin.deleteTable(resultTableName);
}
// 创建表
// 创建数据表
HTableDescriptor desc = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily = new HColumnDescriptor(colFamily);
desc.addFamily(columnFamily);
admin.createTable(desc);
// 创建结果表
HTableDescriptor resultDesc = new HTableDescriptor(resultTableName);
HColumnDescriptor resultColumnFamily = new HColumnDescriptor(colFamily);
resultDesc.addFamily(resultColumnFamily);
admin.createTable(resultDesc);
// 插入数据
table = new HTable(config, tableName);
table.setAutoFlush(false);
table.setWriteBufferSize(500);
// 编写写入的数据
List<Put> listPut = new ArrayList<Put>();
Put put1 = new Put(Bytes.toBytes("1"));
put1.add(colFamily.getBytes(), col.getBytes(), Bytes.toBytes("HELlO Kitty123"));
listPut.add(put1);
Put put2 = new Put(Bytes.toBytes("2"));
put2.add(colFamily.getBytes(), col.getBytes(), Bytes.toBytes("Mark 10029."));
listPut.add(put2);
Put put3 = new Put(Bytes.toBytes("3"));
put3.add(colFamily.getBytes(), col.getBytes(), Bytes.toBytes("Demo Kitty123"));
listPut.add(put3);
table.put(listPut);
table.flushCommits();
listPut.clear();
System.out.println("Init Table MethodOK.");
}catch(Exception e){
}
}
- Mapper & Reducer
public static class HBaseMRMapper extends TableMapper<Text, IntWritable>{
// 输入为一行的<key:rowKey, value> 输出为<result>
@Override
protected void map(ImmutableBytesWritable key, Result value,
org.apache.hadoop.mapreduce.Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// 读取一行数据
String words = Bytes.toString(value.getValue(Bytes.toBytes(colFamily), Bytes.toBytes(col)));
String wordSplit[] = words.split(" ");
// 循环输出
for(String word:wordSplit){
context.write(new Text(word), new IntWritable(1));
}
}
}
public static class HBaseMRReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{
@Override
// 统计次数
protected void reduce(Text key, Iterable<IntWritable> values,
org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, ImmutableBytesWritable, Mutation>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for(IntWritable num:values){
sum+=num.get();
}
Put put = new Put(Bytes.toBytes(key.toString()));
// 需要注意的是 int 存储为/xxxx/xxx的形式. 有时为了方便查看, 转换为字符串形式.
put.add(Bytes.toBytes(colFamily),Bytes.toBytes(col),Bytes.toBytes(String.valueOf(sum)));
// 写到hbase
context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())), put);
}
}
- 主程序Main
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// config.set("fs.defaultFS", "hdfs://localhost:9000/");
// config.set("fs.defaultFS", "hdfs://localhost:9000/");
// config.set("mapreduce.framework.name", "yarn");
config.set("mapreduce.framework.name", "local");
// 初始化表- 注意线程安全问题.
// synchronized (HBaseMR.class) {
initTable();
// }
// initTable();
System.out.println("Init Table OK.");
Job job = Job.getInstance(config,"HBaseMR");
job.setJarByClass(HBaseMR.class);
// 创建Scan
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
// 创建查询Mapper
// 特别注意这个类 不要用hadoop1.x中的
// TableMapReduceUtil.initTableMapJob(table, columns, mapper, outputKeyClass, outputValueClass, job);(scan, HBaseMRMapper.class, Text.class, IntWritable.class, job);
TableMapReduceUtil.initTableMapperJob(tableName, scan, HBaseMRMapper.class, Text.class, IntWritable.class, job);
// 创建写入HBase的Reducer
TableMapReduceUtil.initTableReducerJob(resultTableName, HBaseMRReducer.class, job);
// HBaseMRReducer.class,
System.out.println(job.waitForCompletion(true)?0:1);
}
运行结果
本地运行后, 查看HBase
内的表, 显示如下:
另: 当然你也可以通过设置// config.set("mapreduce.framework.name", "yarn");
让MR
程序在Yarn
上进行运行. 当然, 是需要提前打包.
hbase(main):048:0> scan 'word'
ROW COLUMN+CELL
1 column=content:info, timestamp=1554978176277, value=HELlO Kitty123
2 column=content:info, timestamp=1554978176277, value=Mark 10029.
3 column=content:info, timestamp=1554978176277, value=Demo Kitty123
3 row(s) in 0.1960 seconds
hbase(main):049:0> scan 'stat'
ROW COLUMN+CELL
10029. column=content:info, timestamp=1554978179119, value=1
Demo column=content:info, timestamp=1554978179119, value=1
HELlO column=content:info, timestamp=1554978179119, value=1
Kitty123 column=content:info, timestamp=1554978179119, value=2
Mark column=content:info, timestamp=1554978179119, value=1
5 row(s) in 0.2310 seconds
Q&A
protobuf
jar包异常.
log4j:WARN No appenders could be found for logger (org.apache.hadoop.security.Groups).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Table is exists.
Exception in thread "main" java.io.FileNotFoundException: File does not exist: hdfs://localhost:9000/Users/Sean/.m2/repository/com/google/protobuf/protobuf-java/2.5.0/protobuf-java-2.5.0.jar
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1072)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1064)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1064)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:288)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus(ClientDistributedCacheManager.java:224)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestamps(ClientDistributedCacheManager.java:93)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(ClientDistributedCacheManager.java:57)
at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:265)
at org.apache.hadoop.mapreduce.JobSubmitter.copyAndConfigureFiles(JobSubmitter.java:301)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:389)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)
at com.yanxml.bigdata.hbase.qucikstart.HBaseMR.main(HBaseMR.java:169)
这类问题,有2个:
Jar包冲突
和地址错误
.
- 地址错误
主类中不需要再配置 // config.set("fs.defaultFS", "hdfs://localhost:9000/"); 注释即可.
- Jar包冲突
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.11</version> <exclusions> <exclusion> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency>
HBase 工程中 protobuf 版本冲突解决
如何解决java工程中多个版本的包冲突问题
HBase 工程中 protobuf 版本冲突解决
hadoop :java.io.FileNotFoundException: File does not exist:
initTableMapJob
与initTableMapperJob
.
解决办法:
initTableMapJob
是Hadoop1.x
的API.initTableMapperJob
是Hadoop 2.x
的. 起因是TableMapReduceUtil
工具类导入了1.x
的引用, 换成2.x
即可.
HBase 1.0.0 API的变化
HBase
&ElasticSearch
?
es里可以只存需要查询的字段和hbase的rowkey,hbase存所有数据
使用elasticsearch提高hbase基于列的查询效率
聊聊MySQL、HBase、ES的特点和区别
Reference
[1]. [How to] MapReduce on HBase ----- 简单二级索引的实现
[2]. Elasticsearch+hbase 实现hbase中数据的快速查询(一)
[3]. HBase数据同步到ElasticSearch的方案
[4]. HBase整合MapReduce之建立HBase索引
[5]. MapReduce来构建索引
[6]. 大数据MapReduce入门之倒排索引
[7]. 用MR(MapReduce)查询hbase数据-用到TableMapper和Scan
更多推荐
所有评论(0)