综合性的HDFS实战:使用HDFS Java API才完成HDFS文件系统上的文件的词频统计。

举例:/path/1.txt hello word hello /path/2.txt hello word word ==> (hello,3) (word,3)

将统计完的结果输出到HDFS上去。

功能拆解:

  • 读取HDFS上的文件 ==> HDFS API
  • 业务处理(词频统计):对文件中的每一行数据都要进行业务处理(按照分隔符分割)==> Mapper
  • 将处理结果缓存起来 ==> Context
  • 将结果输出到HDFS ==> HDFS API

代码实现:

  • 新建一个WordConut类-主程序

    public static void main(String[] args) throws Exception {
    	  // 1. 读取HDFS上的文件 ===> HDFS API
    		// 获取配置资源
    		Properties properties = ParamsUtils.getProperties();
    		// 构建读取文件Path对象
    		Path input = new Path(properties.getProperty(Constants.INTPUT_PATH));
    		// 具体配置
    		Configuration configuration = new Configuration();
    		// 阿里云hdfs处理
    		configuration.set("dfs.client.use.datanode.hostname", "true");
    		// 设置副本
    		configuration.set("dfs.replication", properties.getProperty(Constants.HDFS_REPLICATION));
    		// 获取HDFS文件系统
    		FileSystem fileSystem = FileSystem.get(new URI(properties.getProperty(Constants.HDFS_URI)), configuration, "root");
    		// 通过listFiles方法获取RemoteIterator对象
    		RemoteIterator<LocatedFileStatus> iterators = fileSystem.listFiles(input, false);
    		// 通过反射实现可插拔
    		Class<?> clazz = Class.forName(properties.getProperty(Constants.MAPPER_CALSS));
    		WordMapper wordMapper = (WordMapper) clazz.newInstance();
    		// 构建上下文
    		WordContext wordContext = new WordContext();
    		// 遍历文件目录
    		while (iterators.hasNext()) {
    			LocatedFileStatus fileStatus = iterators.next();
    			// 打开文件
    			FSDataInputStream open = fileSystem.open(fileStatus.getPath());
    			BufferedReader reader = new BufferedReader(new InputStreamReader(open));
    			String line = "";
    			// 读取文件内容
    			while ((line = reader.readLine()) != null) {
    				// 2. 业务处理(词频统计)
    				// 在业务逻辑处理完之后将结果写到缓存中去
    				wordMapper.map(line, wordContext);
    			}
    			reader.close();
    			open.close();
    		}
    		// 3. 将处理结果缓存起来
    		Map<Object, Object> contextMap = wordContext.getCacheMap();
    
    		// 4. 将结果输出到HDFS ===> HDFS	API
    		Path output = new Path(properties.getProperty(Constants.OUTPUT_PATH));
    		FSDataOutputStream out = fileSystem.create(new Path(output, new Path(properties.getProperty(Constants.OUTPUT_FILE))));
    
    		// 将第三步缓存中的内容输出到out中去
    		Set<Map.Entry<Object, Object>> entries = contextMap.entrySet();
    		for (Map.Entry<Object, Object> entry: entries) {
    			out.write((entry.getKey().toString() + "\t" + entry.getValue() + "\n").getBytes());
    		}
    		out.close();
    		fileSystem.close();
    
    		System.out.println("HDFS API统计运行成功");
    	}
    
    • 主程序中涉及到比较好的点

      • 通过Constant抽取配置常量,减少主程序中的硬编码
      • 通过Properties实现可配置,让配置和程序解耦
      • 通过接口+可配置+反射机制将词频统计的实现方式变成可插拔
    • 相关的类

      • 常量:Constants,推荐使用final class或者枚举方式编写常量,不建议使用接口常量和普通类作为常量。

        /**
         * @ClassName Constants
         * @Description 常量
         * @Author eastern
         * @Date 2020/4/27 下午11:20
         * @Version 1.0
         **/
        public final class Constants {
        
        	public static final String INTPUT_PATH = "INTPUT_PATH";
        
        	public static final String OUTPUT_PATH = "OUTPUT_PATH";
        
        	public static final String OUTPUT_FILE = "OUTPUT_FILE";
        
        	public static final String HDFS_URI = "HDFS_URI";
        
        	public static final String HDFS_REPLICATION = "HDFS_REPLICATION";
        
        	public static final String MAPPER_CALSS = "MAPPER_CALSS";
        
        }
        
      • 读取配置:ParamsUtils

        /**
         * @ClassName ParamsUtils
         * @Description 读取属性配置文件
         * @Author eastern
         * @Date 2020/4/27 下午11:17
         * @Version 1.0
         **/
        public class ParamsUtils {
        
        	private static Properties properties = new Properties();
        
        	static {
        		try {
        			properties.load(ParamsUtils.class.getClassLoader().getResourceAsStream("wc.properties"));
        		} catch (IOException e) {
        			e.printStackTrace();
        		}
        	}
        
        	public static Properties getProperties() {
        		return properties;
        	}
        }]
        
      • 上下文:WordContext

        /**
         * @ClassName WordContext
         * @Description 自定义上下文
         * @Author eastern
         * @Date 2020/4/27 下午10:50
         * @Version 1.0
         **/
        public class WordContext {
          
        	private Map<Object, Object> cacheMap = new HashMap<>();
          
        	public Map<Object, Object> getCacheMap() {
        		return cacheMap;
        	}
        
        	/**
        	 * 写数据到缓存中
        	 * @param key	单词
        	 * @param value	次数
        	 */
        	public void write (Object key, Object value) {
        		cacheMap.put(key, value);
        	}
        
        	/**
        	 * 从缓存中获取值
        	 * @param key
        	 * @return
        	 */
        	public Object get(Object key) {
        		return cacheMap.get(key);
        	}
        }
        
      • 词频处理接口:WordMapper

        public interface WordMapper {
        
        	/**
        	 * 处理词频
        	 * @param line	读取到的每一行数据
        	 * @param wordContext	上下文/缓存
        	 */
        	void map(String line, WordContext wordContext);
        }
        
      • 词频统计实现一:WordCountMapper

        /**
         * @ClassName WordCountMapper
         * @Description 统计词频
         * @Author eastern
         * @Date 2020/4/27 下午10:57
         * @Version 1.0
         **/
        public class WordCountMapper implements WordMapper {
        
        	@Override
        	public void map(String line, WordContext wordContext) {
        		String[] words = line.split("\t");
        		for (String word: words) {
        			Object value = wordContext.get(word);
        			if (value == null) {
        				wordContext.write(word, 1);
        			} else {
        				int v = Integer.parseInt(value.toString());
        				wordContext.write(word, v + 1);
        			}
        		}
        	}
        }
        
      • 词频统计实现二:CaseIgnoreWordCountMapper

        /**
         * @ClassName WordCountMapper
         * @Description 忽略大小写
         * @Author eastern
         * @Date 2020/4/27 下午10:57
         * @Version 1.0
         **/
        public class CaseIgnoreWordCountMapper implements WordMapper {
        
        	@Override
        	public void map(String line, WordContext wordContext) {
        		String[] words = line.toLowerCase().split("\t");
        		for (String word: words) {
        			Object value = wordContext.get(word);
        			if (value == null) {
        				wordContext.write(word, 1);
        			} else {
        				int v = Integer.parseInt(value.toString());
        				wordContext.write(word, v + 1);
        			}
        		}
        	}
        }
        
      • 配置文件:wc.properties

        INTPUT_PATH=/hdfsapi/test/second/words.txt
        OUTPUT_PATH=/hdfsapi/output
        OUTPUT_FILE=wc.out2
        HDFS_URI=hdfs://139.129.240.xxx:8020
        HDFS_REPLICATION=1
        MAPPER_CALSS=com.laogoubi.hadoop.hdfs.wordcount.CaseIgnoreWordCountMapper
        
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐