map
map接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD。

文件内容:

hello world
hello scala
hello spark
java good
python
scala

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;


import java.util.Arrays;
import java.util.List;
public class demoJava1 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("demo1");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //todo map1
        List<String> strings = Arrays.asList("hello world", "hadoop spark", "scala java");
        JavaRDD<String> mapRdd = sc.parallelize(strings);
        //Function<String, String> 分别为输入类型和输出类型
        JavaRDD<String> result1 = mapRdd.map(new Function<String, String>() {
            @Override
            public String call(String s) throws Exception {
                String[] s1 = s.split(" ");
                String key = s1[0];
                String value = s1[1];
                return key + ":" + value;
            }
        });
        List<String> collect = result1.collect();
        for (String s : collect) {
            System.out.println(s);
        }
        //todo map2  \s 表示匹配所有空白符,包括换行。和\S不同,它表示匹配非空白符
        JavaRDD<String> lines = sc.textFile("data/word.txt");
        //  查看Iterable源码:Iterator<T> iterator();
        JavaRDD<Iterable<String>> mapRdd2 = lines.map(new Function<String, Iterable<String>>() {
            @Override
            public Iterable call(String s) throws Exception {
                String[] s2 = s.split("\\s+");
                return Arrays.asList(s2);
            }
        });
        List<Iterable> collect1 = mapRdd2.collect();
        for (Iterable it : collect1) {
            Iterator iterator = it.iterator();
            while (iterator.hasNext()) {
                System.out.println(iterator.next());
            }
        }
        //当设置Iterable<String>时
        List<Iterable<String>> collect2 = mapRdd2.collect();
        for (Iterable<String> str2 : collect2) {
            System.out.println(str2);
        }
    }
}

flatMap
有时候,我们希望对某个元素生成多个元素,实现该功能的操作叫作 flatMap()
faltMap的函数应用于每一个元素,对于每一个元素返回的是多个元素组成的迭代器.

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

public class DemoJava2 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("demo2");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> lines = sc.textFile("data/word.txt");
        JavaRDD<String> flatMapRdd = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                String[] splits = s.split("\\s+");
                return Arrays.asList(splits).iterator();
            }
        });
        List<String> collect = flatMapRdd.collect();
        for (String s : collect) {
            System.out.println(s);
        }
    }
}

distinct
用于去重,不过此方法涉及到混洗,操作开销很大。
union
两个RDD合并
intersection
返回两个RDD的交集,并且去重。它同样也需要混洗数据,比较浪费性能。
subtract
RDD1.subtract(RDD2),返回在RDD1中出现,但是不在RDD2中出现的元素,不去重。即RDD1中有而RDD2中没有的元素。
cartesian
返回两个RDD的笛卡尔积,开销非常大。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;

public class DemoJava3 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("demo3");
        JavaSparkContext sc = new JavaSparkContext(conf);

        List<String> strings = Arrays.asList("a", "b", "a", "c", "b");
        //todo distinct
        JavaRDD<String> disRdd = sc.parallelize(strings);
        JavaRDD<String> distinct = disRdd.distinct();
        List<String> collect = distinct.collect();
        for (String s : collect) {
            System.out.println(s);
        }
        //todo union
        List<String> strings1 = Arrays.asList("a","b","e", "f");
        JavaRDD<String> rdd = sc.parallelize(strings1);
        JavaRDD<String> unionRdd = disRdd.union(rdd);
        List<String> collect1 = unionRdd.collect();
        for (String s2 : collect1) {
            System.out.println(s2);
        }

        //todo intersection
        JavaRDD<String> intersectionRdd = disRdd.intersection(rdd);
        List<String> collect2 = intersectionRdd.collect();
        for (String s3 : collect2) {
            System.out.println(s3);
        }

        //todo subtract

        JavaRDD<String> subtractRdd = disRdd.subtract(rdd);
        List<String> collect3 = subtractRdd.collect();
        for (String s4 : collect3) {
            System.out.println(s4);
        }
        //todo cartesian
        JavaPairRDD<String, String> cartesianRdd = disRdd.cartesian(rdd);
        List<Tuple2<String, String>> collect4 = cartesianRdd.collect();
        for (Tuple2<String, String> tuple2 : collect4) {
            System.out.println(tuple2);
            //System.out.println(tuple2._1);
        }
    }
}

mapToPair
flatMapToPair
类似于xxx连接 mapToPair是一对一,一个元素返回一个元素,而flatMapToPair可以一个元素返回多个,相当于先flatMap,在mapToPair.

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class DemoJava4 {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("demo4");
        JavaSparkContext sc = new JavaSparkContext(conf);

        JavaRDD<String> lines = sc.textFile("data/word.txt");
        JavaPairRDD<String, Integer> mapToPairRdd = lines.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s.split("\\s+")[0], 1);
            }
        });
        List<Tuple2<String, Integer>> collect = mapToPairRdd.collect();
//        for (Tuple2<String, Integer> tuple2 : collect) {
//            System.out.println(tuple2);
//        }
        //todo flatMapToPair
        JavaPairRDD<String, Integer> flatMapToPairRdd = lines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
            @Override
            public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
                ArrayList<Tuple2<String, Integer>> list = new ArrayList<>();
                String[] splits = s.split(" ");
                for (int i = 0; i < splits.length; i++) {
                    String key = splits[i];
                    Tuple2<String, Integer> tup2 = new Tuple2<>(key, 1);
                    list.add(tup2);
                }
                return list.iterator();
            }
        });
        List<Tuple2<String, Integer>> collect1 = flatMapToPairRdd.collect();
        for (Tuple2<String, Integer> tup2 : collect1) {
            System.out.println("key:"+tup2._1+" value:"+tup2._2);
        }

    }
}

combineByKey
聚合操作,是各种聚集操作的鼻祖。

  • createCombiner: combineByKey()
    会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey() 会使用一个叫作createCombiner() 的函数来创建
    那个键对应的累加器的初始值
  • mergeValue: 如果这是一个在处理当前分区之前已经遇到的键, 它会使用 mergeValue()
    方法将该键的累加器对应的当前值与这个新的值进行合并
  • mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更
    多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各 个分区的结果进行合并。

用Spark进行的操作:

import org.apache.spark.HashPartitioner
import org.apache.spark.sql.SparkSession

object Combine {
case class ScoreInfo(name:String,course:String,score:Int)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]")
      .appName("combineByKey").getOrCreate()
    val sc = spark.sparkContext

    val scores = sc.makeRDD(List(
      ScoreInfo("sam", "Math", 88),
      ScoreInfo("sam", "English", 90),
      ScoreInfo("sally", "Math", 85),
      ScoreInfo("sally", "English", 95),
      ScoreInfo("john", "Math", 78),
      ScoreInfo("john", "English", 60),
      ScoreInfo("mary", "Math", 93),
      ScoreInfo("mary", "English", 81),
      ScoreInfo("zs", "Math", 76),
      ScoreInfo("zs", "English", 81),
      ScoreInfo("ls", "Math", 94),
      ScoreInfo("ls", "English", 90)

    ))
    //将集合转换成二元组
    val scoreWithKey = for {i <- scores} yield (i.name, i)
    //分区数
    val result = scoreWithKey.partitionBy(new HashPartitioner(3)).cache()
    //result.collect.foreach(println) //name为key,ScoreInfo()为value
    //result.foreachPartition(x=>println(x.length)) //输出的各分区处理的信息数量
    //遍历每个分区中,各二元组中的数据。
    result.foreachPartition(partContext=>
      //partContext.foreach(x=>(println(x._1,x._2)))
      partContext.foreach(x=>(println(x._1,x._2.course,x._2.score)))
    )
    //combineByKey聚合
   val scoreInfo = scoreWithKey.combineByKey(
      (x: ScoreInfo) => (x.score, 1),
      (acc1: (Int, Int), x: ScoreInfo) => (acc1._1 + x.score, acc1._2 + 1),
      (acc2: (Int, Int), acc3: (Int, Int)) => (acc2._1 + acc3._1, acc2._2 + acc3._2)
    )
    //求平均值
    val stuAvg1 = scoreInfo.map({case(key,value)=>(key,value._1/value._2)})
    val stuAvg2 = scoreInfo.map(x=>(x._1,x._2._1/x._2._2))
    stuAvg2.foreach(println)
  }
}

java版本
先新建一个类

import java.io.Serializable;
public class ScoreInfo implements Serializable {
    public String name;
    public String course;
    public Integer score;

    public ScoreInfo(String name, String course, Integer score) {
        this.name = name;
        this.course = course;
        this.score = score;
    }
}
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.ArrayList;
import java.util.List;

public class CombineJava {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[1]").setAppName("combineByKey");
        JavaSparkContext sc = new JavaSparkContext(conf);

        List<ScoreInfo> scores = new ArrayList<>();
        scores.add(new ScoreInfo("zs", "Math", 99));
        scores.add(new ScoreInfo("zs", "English", 97));
        scores.add(new ScoreInfo("li", "Math", 85));
        scores.add(new ScoreInfo("li", "English", 80));
        scores.add(new ScoreInfo("ww", "Math", 90));
        scores.add(new ScoreInfo("ww", "English", 70));
        scores.add(new ScoreInfo("ml", "Math", 83));
        scores.add(new ScoreInfo("ml", "English", 79));

        JavaRDD<ScoreInfo> stuRdd = sc.parallelize(scores);
        JavaPairRDD<String, ScoreInfo> stuRddInfo = stuRdd.mapToPair(new PairFunction<ScoreInfo, String, ScoreInfo>() {
            @Override
            public Tuple2<String, ScoreInfo> call(ScoreInfo s) throws Exception {
                return new Tuple2<>(s.name, s);
            }
        });
//        List<Tuple2<String, ScoreInfo>> collect = stuRddInfo.collect();
//        for (Tuple2<String, ScoreInfo> tup2 : collect) {
//            System.out.println(tup2._1+":"+tup2._2);
//        }
        //todo createCombine
       //function见下方源码
        Function<ScoreInfo, Tuple2<Integer, Integer>> createCombine = new Function<ScoreInfo, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(ScoreInfo s) throws Exception {
                return new Tuple2<>(s.score, 1);
            }
        };
        //todo mergeValue
		//function2见下方源码
        Function2<Tuple2<Integer, Integer>, ScoreInfo, Tuple2<Integer, Integer>> mergeValue = new Function2<Tuple2<Integer, Integer>, ScoreInfo, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> t, ScoreInfo s) throws Exception {
                return new Tuple2<>(t._1 + s.score, t._2 + 1);
            }
        };
        //todo mergeCombine
        Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> mergeCombine = new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> t1, Tuple2<Integer, Integer> t2) throws Exception {
                return new Tuple2<>(t1._1 + t2._1, t1._2 + t2._2);
            }
        };
        JavaPairRDD<String, Tuple2<Integer, Integer>> CombineByKey = stuRddInfo.combineByKey(createCombine, mergeValue, mergeCombine);
JavaPairRDD<String, Integer> avg = CombineByKey.mapValues(new Function<Tuple2<Integer, Integer>, Integer>() {
            @Override
            public Integer call(Tuple2<Integer, Integer> t) throws Exception {
                return t._1 / t._2;
            }
        });
        for (Tuple2<String, Integer> tuple2 : avg.collect()) {
            System.out.println(tuple2);
        }
        List<Tuple2<String, Tuple2<Integer, Integer>>> collect = CombineByKey.collect();
        for (Tuple2<String, Tuple2<Integer, Integer>> tup2 : collect) {
            System.out.println(tup2);
        }
    }
}

Function的源码:
输入为T1类型,输出为R类型

public interface Function <T1, R> extends java.io.Serializable {
    R call(T1 t1) throws java.lang.Exception;
}

Function2源码:
输入类型为T1,T2,有两个输入,输出类型为R

public interface Function2 <T1, T2, R> extends java.io.Serializable {
    R call(T1 t1, T2 t2) throws java.lang.Exception;
}
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐