返回 登录
0

码说MapReduce

MapReduce框架作为Hadoop发展初期的核心计算框架,为大数据处理技术飞速演进提供了基石。在Hadoop生态圈中,MapReduce框架由于其成熟稳定的性能,仍然是离线批处理技术的主力。以我们的北京移动大数据集群为例,Hive、SparkSQL是支撑探索性数据查询的主要工具,其简单易懂的SQL语句查询,可以使具备基础数据库管理能力的人员轻松上手,完美地支撑了实时数据查询需求。

在我最初使用Java写MapReduce程序之前,总有一个疑问:既然可以用SQL这么通俗易懂的语句直接操作数据,而且不需要过多了解MapReduce执行过程,为什么还要费力地用Java垒代码,去了解MapReduce的底层执行过程。什么样的应用场景需要我们来开发MapReduce呢?

首先,SQL非常适用于处理结构化数据,对于非结构化数据以及需要特殊函数处理的数据比如文本数据,SQL则会力不从心。举一个小例子,从海量文本数据中提取各种字符编码并翻译为中文,过程中还涉及自动识别是utf-8还是ANSI亦或是其他编码格式,这个需求用MapReduce程序实现起来更为合理;另外,在处理业务逻辑较为复杂的任务时,使用SQL很难实现,其执行效率方面也很难满足业务需求。举例来说,我们需要将业务日志中的域名识别为相应的互联网应用,现实操作中需要分多种情况使用多重判断进行规则匹配,并剔除钓鱼网站和fake url,使用SQL很难实现业务逻辑。再例如,使用SQL进行多表join并叠加复杂的数学运算时,其效率也很难满足业务需求。

在我们的机器学习工具开发过程中,为了使用原有数据建立特征向量,我们需要对原有表结构进行转化,需要迭代原始数据生成具有较多特征值的特征向量。原始数据量为13亿条,共13.2GB,我们尝试使用Hive SQL进行实现,经过测试,任务执行时间过长无法满足需求。而使用MapReduce编写两个Job实现业务逻辑,同时使用哈希算法优化字符串查询效率,最终处理时长为15分钟。应对这些复杂情况,使用MapReduce编程可以使我们获得更多对程序实现的控制和方法选择,通过底层算法优化实现效率提升。

基于不同的业务场景,结合不同工具特点,我们采用SQL脚本和MapReduce开发程序结合的策略,使日常数据处理任务在效率上得到了很好地满足。在我们平台中,MapReduce程序承担了如关键字提取、应用匹配和标签规则运算等近30%的日常数据处理任务。

总之,我们在实际应用中依据灵活性和效率来选择是否自己开发程序。

概览MapReduce

认识MapReduce先从架构入手,在此我们一图以蔽之:

图片描述
图 1

现在广泛使用的MapReduce v2基于YARN架构,其角色包括Resource Manager(RM)、NodeManager(NM)、Application Master(AM)。RM由Master主机承担,主要负责任务调度和资源调配,NM和AM由各工作节点Slave承担,负责任务的处理和资源读写,其计算单位抽象为container。MapReduce的计算流程可以抽象为Splitting、Mapping、Shuffling、Reducing阶段,其中shuffling包括了Grouping、Sorting、Partitioning过程。以WordCount为例,如下图:

图片描述
图 2

在掌握了MapReduce架构和原理的基础上,从代码的角度认识MapReduce才是程序员的正确打开方式。

开发MapReduce

MapReduce程序中,Map和Reduce逻辑功能分别通过扩展Mapper类和Reducer类实现。具体在实现过程中,我们在主类中将Mapper和Reducer类扩展并作为内部类调用,最后通过main函数定义输入输出以及Job配置,从而作为程序主入口。

Map实现

Mapper类扩展需要实现map方法,如下:

private static class MyMapper extends
            Mapper<NullWritable, Writable, IntWritable, Text> {
        @Override
        protected void map(
                NullWritable key, 
                Writable value,
                Mapper<NullWritable, Writable, IntWritable, Text>.Context context)
                throws IOException, InterruptedException {
        }
    }

根据需求可以扩展setup、cleanup和自定义方法等,扩展Mapper类时需要声明键值对类型,如 Mapper< NullWritable,Writable,IntWritable,Text >,依次分别为输入输出< key,value >类型,其中< NullWritable,Writable >是orc文件格式输入< key,value >类型。

需要强调的是,MapReduce中所有输入输出字段类型都必须实现Writable或者WritableComparable类型,这是因为MapReduce中磁盘读写和节点数据传输过程涉及到数据的序列化和反序列化,需要通过这两类来实现。经常用到的IntWritable、LongWritable、Text等都是实现自WritableComparable类,如果需要,我们也可以扩展这两类实现自定义数据类型。例如,在通过MapReduce实现两表和多表Join的过程中,我通过实现WritableComparable类自定义Map输出的key字段类型,来实现对于Grouping和Sorting阶段不同比较字段的控制。

setup方法在类调用起始阶段运行,可以实现初始阶段对于参数读取和变量赋值的操作。在app应用识别案例中,我们在setup阶段实现对于平台DPI文件的读取操作,以在之后的map阶段实现MapJoin操作,代码如下:

protected void setup(Context context) throws IOException,
            InterruptedException {
Configuration conf = context.getConfiguration();
//读取DPI文件HDFS存放路径
    String ini=DefaultStringifier.load(conf, "ini", Text.class).toString();
    FileSystem fs = FileSystem.get(conf);
    FSDataInputStream in = fs.open(new Path(ini));
    BufferedReader bf = new BufferedReader(new InputStreamReader(in));
    String str = null;
    while ((str = bf.readLine()) != null) {
        String[] rules = str.split("\\|\\|");
        String host = rules[5];
        String appname = rules[1]
        DPIMap.put(host,appname);
    }
    if (bf != null) {
        bf.close();
    }
}

其中DPIMap是需要在主类中定义的HashMap变量,在map阶段将使用HashMap实现快速查找。

map方法是实现Mapper类的核心方法,map阶段主要逻辑都需要在map方法中实现。map方法参数定义包括输入< key,value >和上下文对象context声明。Context对象负责在MapReduce执行过程中平台配置和Job配置的传递。Job执行过程中,写入的业务逻辑会对每一条数据进行操作,并将中间结果< key,value >值通过context对象写入后台进行之后的shuffle和reduce操作。

例如我需要将业务数据中的host字段与DPI数据的host字段进行等值连接,统计出使用app的次数。我们可以在map方法中实现如下:

protected void map(
        NullWritable key, 
        Writable value,
        Mapper<NullWritable, Writable, Text, IntWritable>.Context context)
        throws IOException, InterruptedException {
    String appname = new String();
    //读入orc格式数据;
    OrcStruct struct = (OrcStruct)value;
    TypeInfo typeInfo =
            TypeInfoUtils.getTypeInfoFromTypeString(SCHEMA);
    StructObjectInspector inspector = (StructObjectInspector)
            OrcStruct.createObjectInspector(typeInfo);
    //读入host字段;
    try{
        host = inspector.getStructFieldData(struct, inspector.getStructFieldRef("host")).toString();
    }catch(Exception e){
        host = "";
    } 
    if ((appname = DpiList.get(host)) != null){
        context.write(new Text(appname),new IntWritable(1));
    }
}

在此默认输入数据为ORC格式,代码中涉及对ORC文件读取方法。

Reduce实现

同Mapper类类似,扩展Reducer类需要实现reduce方法。继续以统计app次数为例,Reducer类扩展实现为:

private static class MyReducer extends
        Reducer<Text, IntWritable, NullWritable, Writable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        OrcSerde orcSerde = new OrcSerde();
   //写orc格式文件操作;
        Writable row;
        int sum = 0;
        StructObjectInspector inspector = 
                (StructObjectInspector) ObjectInspectorFactory
                .getReflectionObjectInspector(MyRow.class,
                  ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
        for(IntWritable val:values){
            sum = sum + val.get();
        }
        String[] result = {key.toString, Integer.toString(sum)};
        row = orcSerde.serialize(new MyRow(result), inspector);
        context.write(NullWritable.get(), row);
    }
}

其中reduce方法实现的逻辑为对依据key值group之后的value值集合进行加和,并写入HDFS。

在reduce方法中,接收到的value集合通过Iterable接口实现,我们可以通过iterator对象提供的API实现对value值集合的遍历。Reduce的输出我们最终写为ORC格式。

程序主入口main()方法

通过在主类中定义main()方法作为程序的入口,我们需要在此完成对程序参数传递、输入输出配置和HDFS平台配置声明等工作,以app应用识别为例,代码如下:

public static void main(String[] args) throws IOException,
            URISyntaxException, InterruptedException, ClassNotFoundException {
    String inputPath = args[0];
    String outputPath = args[1];
    String ini = args[2];
    Configuration conf = new Configuration();
    //向Mapper传递DPI文件位置;
    DefaultStringifier.store(conf,ini,"ini");
    Job job = new Job(conf);
    //设置任务队列;
    conf.set("mapreduce.job.queuename", "background");
    job.setJarByClass(StrMatching_dpi_orc.class);
    //设置reduce数量;
    job.setNumReduceTasks(40);
    //定义输入输出文件类型;
    job.setInputFormatClass(OrcNewInputFormat.class);
    job.setOutputFormatClass(OrcNewOutputFormat.class);
    //配置输入输出文件路径;
    FileInputFormat.addInputPath(job, new Path(inputPath));
    FileSystem fs = FileSystem.get(conf);
    if (fs.exists(new Path(outputPath))) {
    fs.delete(new Path(outputPath), true);
    }
    FileOutputFormat.setOutputPath(job, new Path(outputPath));
    //设置Map输出的<key,value>类型;
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    //设置最终输出结果<key,value>类型;
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Writable.class);
    //声明Mapper类和Reducer类;
    job.setMapperClass(MyMapper.class);
    job.setReducerClass(MyReducer.class);
    //执行任务,结束后自动退出;
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

此例main()方法主要完成了对输入输出类型和路径的配置、任务执行队列和资源配置的定义。main()方法主要完成对程序接口的定义和资源调配,以上代码展示了一个最基本main()方法的定义。如果任务需要,我们还可以完成诸如自定义Group Comparator、Sort Comparator、Partitoner等对象的定义,并在main()方法中声明,作为MapReduce程序的comparator。

在我们平台的日常任务中,我们放弃使用占用空间较大的Text和Sequence文件格式,完全使用ORC文件格式作为数据存储格式。这样可以实现自定义MapReduce程序与Hive平台的无缝结合,更重要的是,可以为平台节省十倍的存储空间。

ORC存储方法

ORC File是Optimized Row Columnar (ORC) file的简称,它基于RCFile格式进行了优化。ORC文件格式的设计初衷是为了提高Hive数据读写以及数据处理能力,由于其实现了一定的数据压缩,可以占用更小的数据存储。

我们使用ORC格式作为MapReduce和Hive工具的统一存储格式,可以节省平台大量的存储空间,同时也实现了MapReduce程序与Hive的更好结合。

经过我们平台日常任务的实测积累,ORC文件格式可以为Hive提供稳定快速的数据读写,并且与Text文件存储相比,可以节省十倍的存储空间,可以大幅提升平台数据存储和处理能力。对于MapReduce程序读写ORC文件,无法像未压缩的Text文件一样直接读写,还需要做关于表数据结构声明等工作。

读ORC文件

仍然以app应用识别为例,主类中需要定义变量SCHEMA,声明读入表结构:

private static final String SCHEMA = "struct<ID:string,Name:string, time:string,fst_uri:string,host:string>";

读取ORC文件格式的代码如下:

OrcStruct struct = (OrcStruct)value;
TypeInfo typeInfo =
        TypeInfoUtils.getTypeInfoFromTypeString(SCHEMA);
StructObjectInspector inspector = (StructObjectInspector)
        OrcStruct.createObjectInspector(typeInfo);
//读入host字段;
try{
    host = inspector.getStructFieldData(struct, inspector.getStructFieldRef("host")).toString();
}catch(Exception e){
   host = "";
} 

首先,需要将读入的value值强制类型转换为OrcStruct,然后根据表结构实例化StructObjectInspector对象为inspector,最后使用StructObjectInspector类提供的API对字段进行读取。

写ORC文件

与读入过程相对应,写ORC文件代码如下:

OrcSerde orcSerde = new OrcSerde();
Writable row;
int sum = 0;
StructObjectInspector inspector = 
        (StructObjectInspector) ObjectInspectorFactory
        .getReflectionObjectInspector(MyRow.class,
          ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
for(IntWritable val:values){
    sum = sum + val.get();
}
String[] result = {key.toString, Integer.toString(sum)};
row = orcSerde.serialize(new MyRow(result), inspector);
context.write(NullWritable.get(), row);

我们需要根据自定义的数据类型MyRow类实例化StructObjectInspector为inspector,然后使用OrcSerde对象将最终计算结果进行序列化并写入HDFS。其中MyRow类是通过扩展Writable类,对输出数据类型进行了定义,在类中完成了对输出表结构字段的定义和赋值,代码如下:

public class MyRow implements Writable {
    String appname;
    int cnt;
    MyRow(String[] val){
        this.appname = val[0];
        this.cnt = Integer.parseInt(val[1]);
    }
    @Override
    public void readFields(DataInput arg0) throws IOException {
        throw new UnsupportedOperationException("no write");
    }
    @Override
    public void write(DataOutput arg0) throws IOException {
        throw new UnsupportedOperationException("no read");
    }
}

在上面的章节中,我们介绍了MapReduce开发在北京移动大数据平台上的应用背景和部分应用案例。尽管MapReduce由于处理机制中大量的磁盘读写带来了数据处理效率的瓶颈,但在日常离线数据处理任务中由于其成熟稳定的性能,MapReduce仍然扮演着十分重要的角色。

随着技术的不断发展,诸如Spark等更加快速的计算引擎也将逐步取代MapReduce的地位,我们也在一步步尝试替换和优化我们的应用场景,也欢迎大家多提意见和建议,谢谢!

作者简介:孙昊,毕业于Auburn Univeristy,获得EE工程硕士学位,专攻信息论与信息安全,现就职于北京移动网运中心。熟悉信息论与数理统计,善用Java、Scala语言编程,熟悉C、C++、Python语言以及H5、Javascript等网络编程语言,精通MapReduce编程框架,熟悉Storm、Spark Streaming大数据实时处理技术。2014年底加入北京移动eBDA大数据分析团队,担任团队大数据软件开发工程师,参与完成了搜索关键字产品、互联网流量视图、应用匹配产品、用户标签体系产品等大数据项目。eBDA大数据分析团队是一支扎根于北京移动网运中心,致力于提升数据价值、改善用户体验,取之于民用之于民的有理想有能力的团队。

图片描述

责编:郭芮,CSDN编辑,关注大数据领域,寻求报道或者投稿请发邮件guorui@csdn.net。

评论