学习篇-Hadoop-HDFS-API-综合案例
综合性的HDFS实战:使用HDFS Java API才完成HDFS文件系统上的文件的词频统计。举例:/path/1.txthello word hello/path/2.txthello word word==>(hello,3) (word,3)将统计完的结果输出到HDFS上去。功能拆解:读取HDFS上的文件==> HDFS API业务处理(词频统计):对...
·
综合性的HDFS实战:使用HDFS Java API才完成HDFS文件系统上的文件的词频统计。
举例:/path/1.txt hello word hello /path/2.txt hello word word ==> (hello,3) (word,3)
将统计完的结果输出到HDFS上去。
功能拆解:
- 读取HDFS上的文件 ==> HDFS API
- 业务处理(词频统计):对文件中的每一行数据都要进行业务处理(按照分隔符分割)==> Mapper
- 将处理结果缓存起来 ==> Context
- 将结果输出到HDFS ==> HDFS API
代码实现:
-
新建一个WordConut类-主程序
public static void main(String[] args) throws Exception { // 1. 读取HDFS上的文件 ===> HDFS API // 获取配置资源 Properties properties = ParamsUtils.getProperties(); // 构建读取文件Path对象 Path input = new Path(properties.getProperty(Constants.INTPUT_PATH)); // 具体配置 Configuration configuration = new Configuration(); // 阿里云hdfs处理 configuration.set("dfs.client.use.datanode.hostname", "true"); // 设置副本 configuration.set("dfs.replication", properties.getProperty(Constants.HDFS_REPLICATION)); // 获取HDFS文件系统 FileSystem fileSystem = FileSystem.get(new URI(properties.getProperty(Constants.HDFS_URI)), configuration, "root"); // 通过listFiles方法获取RemoteIterator对象 RemoteIterator<LocatedFileStatus> iterators = fileSystem.listFiles(input, false); // 通过反射实现可插拔 Class<?> clazz = Class.forName(properties.getProperty(Constants.MAPPER_CALSS)); WordMapper wordMapper = (WordMapper) clazz.newInstance(); // 构建上下文 WordContext wordContext = new WordContext(); // 遍历文件目录 while (iterators.hasNext()) { LocatedFileStatus fileStatus = iterators.next(); // 打开文件 FSDataInputStream open = fileSystem.open(fileStatus.getPath()); BufferedReader reader = new BufferedReader(new InputStreamReader(open)); String line = ""; // 读取文件内容 while ((line = reader.readLine()) != null) { // 2. 业务处理(词频统计) // 在业务逻辑处理完之后将结果写到缓存中去 wordMapper.map(line, wordContext); } reader.close(); open.close(); } // 3. 将处理结果缓存起来 Map<Object, Object> contextMap = wordContext.getCacheMap(); // 4. 将结果输出到HDFS ===> HDFS API Path output = new Path(properties.getProperty(Constants.OUTPUT_PATH)); FSDataOutputStream out = fileSystem.create(new Path(output, new Path(properties.getProperty(Constants.OUTPUT_FILE)))); // 将第三步缓存中的内容输出到out中去 Set<Map.Entry<Object, Object>> entries = contextMap.entrySet(); for (Map.Entry<Object, Object> entry: entries) { out.write((entry.getKey().toString() + "\t" + entry.getValue() + "\n").getBytes()); } out.close(); fileSystem.close(); System.out.println("HDFS API统计运行成功"); }
-
主程序中涉及到比较好的点
- 通过Constant抽取配置常量,减少主程序中的硬编码
- 通过Properties实现可配置,让配置和程序解耦
- 通过接口+可配置+反射机制将词频统计的实现方式变成可插拔
-
相关的类
-
常量:Constants,推荐使用final class或者枚举方式编写常量,不建议使用接口常量和普通类作为常量。
/** * @ClassName Constants * @Description 常量 * @Author eastern * @Date 2020/4/27 下午11:20 * @Version 1.0 **/ public final class Constants { public static final String INTPUT_PATH = "INTPUT_PATH"; public static final String OUTPUT_PATH = "OUTPUT_PATH"; public static final String OUTPUT_FILE = "OUTPUT_FILE"; public static final String HDFS_URI = "HDFS_URI"; public static final String HDFS_REPLICATION = "HDFS_REPLICATION"; public static final String MAPPER_CALSS = "MAPPER_CALSS"; }
-
读取配置:ParamsUtils
/** * @ClassName ParamsUtils * @Description 读取属性配置文件 * @Author eastern * @Date 2020/4/27 下午11:17 * @Version 1.0 **/ public class ParamsUtils { private static Properties properties = new Properties(); static { try { properties.load(ParamsUtils.class.getClassLoader().getResourceAsStream("wc.properties")); } catch (IOException e) { e.printStackTrace(); } } public static Properties getProperties() { return properties; } }]
-
上下文:WordContext
/** * @ClassName WordContext * @Description 自定义上下文 * @Author eastern * @Date 2020/4/27 下午10:50 * @Version 1.0 **/ public class WordContext { private Map<Object, Object> cacheMap = new HashMap<>(); public Map<Object, Object> getCacheMap() { return cacheMap; } /** * 写数据到缓存中 * @param key 单词 * @param value 次数 */ public void write (Object key, Object value) { cacheMap.put(key, value); } /** * 从缓存中获取值 * @param key * @return */ public Object get(Object key) { return cacheMap.get(key); } }
-
词频处理接口:WordMapper
public interface WordMapper { /** * 处理词频 * @param line 读取到的每一行数据 * @param wordContext 上下文/缓存 */ void map(String line, WordContext wordContext); }
-
词频统计实现一:WordCountMapper
/** * @ClassName WordCountMapper * @Description 统计词频 * @Author eastern * @Date 2020/4/27 下午10:57 * @Version 1.0 **/ public class WordCountMapper implements WordMapper { @Override public void map(String line, WordContext wordContext) { String[] words = line.split("\t"); for (String word: words) { Object value = wordContext.get(word); if (value == null) { wordContext.write(word, 1); } else { int v = Integer.parseInt(value.toString()); wordContext.write(word, v + 1); } } } }
-
词频统计实现二:CaseIgnoreWordCountMapper
/** * @ClassName WordCountMapper * @Description 忽略大小写 * @Author eastern * @Date 2020/4/27 下午10:57 * @Version 1.0 **/ public class CaseIgnoreWordCountMapper implements WordMapper { @Override public void map(String line, WordContext wordContext) { String[] words = line.toLowerCase().split("\t"); for (String word: words) { Object value = wordContext.get(word); if (value == null) { wordContext.write(word, 1); } else { int v = Integer.parseInt(value.toString()); wordContext.write(word, v + 1); } } } }
-
配置文件:wc.properties
INTPUT_PATH=/hdfsapi/test/second/words.txt OUTPUT_PATH=/hdfsapi/output OUTPUT_FILE=wc.out2 HDFS_URI=hdfs://139.129.240.xxx:8020 HDFS_REPLICATION=1 MAPPER_CALSS=com.laogoubi.hadoop.hdfs.wordcount.CaseIgnoreWordCountMapper
-
-
更多推荐
已为社区贡献2条内容
所有评论(0)