Spark RDD算子 键值对聚合操作 combineByKey

combineByKey

聚合数据一般在集中式数据比较方便,如果涉及到分布式的数据集,该如何去实现呢。这里介绍一下combineByKey, 这个是各种聚集操作的鼻祖,应该要好好了解一下,参考scala API
简要介绍

def combineByKey[C](createCombiner: (V) => C,  
                    mergeValue: (C, V) => C,   
                    mergeCombiners: (C, C) => C): RD

createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就
和之前的某个元素的键相同。如果这是一个新的元素, combineByKey() 会使用一个叫作 createCombiner() 的函数来创建
那个键对应的累加器的初始值
mergeValue: 如果这是一个在处理当前分区之前已经遇到的键, 它会使用 mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并
mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更
多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各
个分区的结果进行合并。
计算学生平均成绩例子
这里举一个计算学生平均成绩的例子,例子参考至https://www.edureka.co/blog/apache-spark-combinebykey-explained, github源码 我对此进行了解析

Scala版本
object combineByKeyRDDScala {
//创建一个学生成绩说明的样例类
  case class ScoreDetail(sname:String,subject:String,score:Int)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("stu")
    val sc = new SparkContext(conf)
    
    //下面是一些测试数据,加载测试数据集合 key = Students name and value = ScoreDetail instance
    val scores = sc.parallelize(List(
      ScoreDetail("xiaobai", "Math", 98),
      ScoreDetail("xiaobai", "English", 88),
      ScoreDetail("xiaohei", "Math", 99),
      ScoreDetail("xiaohei", "English", 67),
      ScoreDetail("xiaolan", "Math", 58),
      ScoreDetail("xiaolan", "English", 87),
      ScoreDetail("xiaozi", "Math", 78),
      ScoreDetail("xiaozi", "English", 56)
    ))
    
//将集合转换成二元组, 也可以理解成转换成一个map, 利用了for 和 yield的组合
    val scoresWithKey
    =for {i<- scores} yield (i.sname,i)

    scoresWithKey.foreachPartition(x=>{x.foreach(y=>{println(y._2)})})

//创建RDD
    val scoresum = scoresWithKey.combineByKey(
      (x: ScoreDetail) => (x.score, 1), //createCombineByKey
      (acc1: (Int, Int), x: ScoreDetail) => (acc1._1 + x.score, acc1._2 + 1),
      (acc2: (Int, Int), acc3: (Int, Int)) => (acc2._1 + acc3._1, acc2._2 + acc3._2)
    )
    //聚合求平均值让后打印
    //val stuAvg = scoresum.map(x=>(x._1,x._2._1/x._2._2))
    val stuAvg = scoresum.map({case(key,value)=>(key,value._1/value._2)})
    stuAvg.foreach(println)
  }
}
java版本
public class ScoreDetailJava implements Serializable {
    public String sname;
    public Integer score;
    public String subject;

    public ScoreDetailJava(String sname, String subject,Integer score) {
        this.sname = sname;
        this.score = score;
        this.subject = subject;
    }
}
public class combineByKeyRDDjava {
    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("stu").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        ArrayList<StuDetailJava> scoreDetail=new ArrayList<>();
        scoreDetail.add(new StuDetailJava("xiaobai", "Math", 98));
        scoreDetail.add(new StuDetailJava("xiaobai", "English", 88));
        scoreDetail.add(new StuDetailJava("xiaohei", "Math", 99));
        scoreDetail.add(new StuDetailJava("xiaohei", "English", 67));
        scoreDetail.add(new StuDetailJava("xiaolan", "Math", 58));
        scoreDetail.add(new StuDetailJava("xiaolan", "English", 87));
        scoreDetail.add(new StuDetailJava("xiaozi", "Math", 78));
        scoreDetail.add(new StuDetailJava("xiaozi", "English", 56));

        JavaRDD<StuDetailJava> scoreDetailsRdd 
        = sc.parallelize(scoreDetail);
        JavaPairRDD<String, StuDetailJava> pairRDD 
        = scoreDetailsRdd.mapToPair(new PairFunction<StuDetailJava, String, StuDetailJava>() {
            @Override
            public Tuple2<String, StuDetailJava> call(StuDetailJava stuDetailJava) throws Exception {
                return new Tuple2<>(stuDetailJava.sname, stuDetailJava);
            }
        });


        //createCombine
        Function<StuDetailJava, Tuple2<Integer, Integer>> createCombine 
        = new Function<StuDetailJava, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(StuDetailJava v1) throws Exception {
                return new Tuple2<>(v1.score,1);
            }
        };

        //mergeValue
        Function2<Tuple2<Integer, Integer>, StuDetailJava, Tuple2<Integer, Integer>> mergeValue 
        = new Function2<Tuple2<Integer, Integer>, StuDetailJava, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, StuDetailJava v2) throws Exception {
                return new Tuple2<>(v2.score + v1._1, v1._2 + 1);
            }
        };


        //mergeCombinners
        Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> mergeCombinner 
        = new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) throws Exception {
                return new Tuple2<>(v1._1 + v2._1, v1._2 + v2._2);
            }
        };


        JavaPairRDD<String, Tuple2<Integer, Integer>> stringTuple2JavaPairRDD 
        = pairRDD.combineByKey(createCombine, mergeValue, mergeCombinner);
        List<Tuple2<String, Tuple2<Integer, Integer>>> collect 
        = stringTuple2JavaPairRDD.collect();
        for (Tuple2<String, Tuple2<Integer, Integer>> tp2 :
                collect) {
            System.out.println(tp2._1+" "+tp2._2._1/tp2._2._2);
        }
    }
}
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐