Spark算子:Java版本(map、flatMap、distinct、subtract、combineByKey等)
mapmap接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD。文件内容:hello worldhello scalahello sparkjava goodpythonscalaimport org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spa
map
map接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD。
文件内容:
hello world
hello scala
hello spark
java good
python
scala
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.util.Arrays;
import java.util.List;
public class demoJava1 {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("demo1");
JavaSparkContext sc = new JavaSparkContext(conf);
//todo map1
List<String> strings = Arrays.asList("hello world", "hadoop spark", "scala java");
JavaRDD<String> mapRdd = sc.parallelize(strings);
//Function<String, String> 分别为输入类型和输出类型
JavaRDD<String> result1 = mapRdd.map(new Function<String, String>() {
@Override
public String call(String s) throws Exception {
String[] s1 = s.split(" ");
String key = s1[0];
String value = s1[1];
return key + ":" + value;
}
});
List<String> collect = result1.collect();
for (String s : collect) {
System.out.println(s);
}
//todo map2 \s 表示匹配所有空白符,包括换行。和\S不同,它表示匹配非空白符
JavaRDD<String> lines = sc.textFile("data/word.txt");
// 查看Iterable源码:Iterator<T> iterator();
JavaRDD<Iterable<String>> mapRdd2 = lines.map(new Function<String, Iterable<String>>() {
@Override
public Iterable call(String s) throws Exception {
String[] s2 = s.split("\\s+");
return Arrays.asList(s2);
}
});
List<Iterable> collect1 = mapRdd2.collect();
for (Iterable it : collect1) {
Iterator iterator = it.iterator();
while (iterator.hasNext()) {
System.out.println(iterator.next());
}
}
//当设置Iterable<String>时
List<Iterable<String>> collect2 = mapRdd2.collect();
for (Iterable<String> str2 : collect2) {
System.out.println(str2);
}
}
}
flatMap
有时候,我们希望对某个元素生成多个元素,实现该功能的操作叫作 flatMap()
faltMap的函数应用于每一个元素,对于每一个元素返回的是多个元素组成的迭代器.
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
public class DemoJava2 {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("demo2");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("data/word.txt");
JavaRDD<String> flatMapRdd = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
String[] splits = s.split("\\s+");
return Arrays.asList(splits).iterator();
}
});
List<String> collect = flatMapRdd.collect();
for (String s : collect) {
System.out.println(s);
}
}
}
distinct
用于去重,不过此方法涉及到混洗,操作开销很大。
union
两个RDD合并
intersection
返回两个RDD的交集,并且去重。它同样也需要混洗数据,比较浪费性能。
subtract
RDD1.subtract(RDD2),返回在RDD1中出现,但是不在RDD2中出现的元素,不去重。即RDD1中有而RDD2中没有的元素。
cartesian
返回两个RDD的笛卡尔积,开销非常大。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
public class DemoJava3 {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("demo3");
JavaSparkContext sc = new JavaSparkContext(conf);
List<String> strings = Arrays.asList("a", "b", "a", "c", "b");
//todo distinct
JavaRDD<String> disRdd = sc.parallelize(strings);
JavaRDD<String> distinct = disRdd.distinct();
List<String> collect = distinct.collect();
for (String s : collect) {
System.out.println(s);
}
//todo union
List<String> strings1 = Arrays.asList("a","b","e", "f");
JavaRDD<String> rdd = sc.parallelize(strings1);
JavaRDD<String> unionRdd = disRdd.union(rdd);
List<String> collect1 = unionRdd.collect();
for (String s2 : collect1) {
System.out.println(s2);
}
//todo intersection
JavaRDD<String> intersectionRdd = disRdd.intersection(rdd);
List<String> collect2 = intersectionRdd.collect();
for (String s3 : collect2) {
System.out.println(s3);
}
//todo subtract
JavaRDD<String> subtractRdd = disRdd.subtract(rdd);
List<String> collect3 = subtractRdd.collect();
for (String s4 : collect3) {
System.out.println(s4);
}
//todo cartesian
JavaPairRDD<String, String> cartesianRdd = disRdd.cartesian(rdd);
List<Tuple2<String, String>> collect4 = cartesianRdd.collect();
for (Tuple2<String, String> tuple2 : collect4) {
System.out.println(tuple2);
//System.out.println(tuple2._1);
}
}
}
mapToPair
flatMapToPair
类似于xxx连接 mapToPair是一对一,一个元素返回一个元素,而flatMapToPair可以一个元素返回多个,相当于先flatMap,在mapToPair.
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class DemoJava4 {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("demo4");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("data/word.txt");
JavaPairRDD<String, Integer> mapToPairRdd = lines.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s.split("\\s+")[0], 1);
}
});
List<Tuple2<String, Integer>> collect = mapToPairRdd.collect();
// for (Tuple2<String, Integer> tuple2 : collect) {
// System.out.println(tuple2);
// }
//todo flatMapToPair
JavaPairRDD<String, Integer> flatMapToPairRdd = lines.flatMapToPair(new PairFlatMapFunction<String, String, Integer>() {
@Override
public Iterator<Tuple2<String, Integer>> call(String s) throws Exception {
ArrayList<Tuple2<String, Integer>> list = new ArrayList<>();
String[] splits = s.split(" ");
for (int i = 0; i < splits.length; i++) {
String key = splits[i];
Tuple2<String, Integer> tup2 = new Tuple2<>(key, 1);
list.add(tup2);
}
return list.iterator();
}
});
List<Tuple2<String, Integer>> collect1 = flatMapToPairRdd.collect();
for (Tuple2<String, Integer> tup2 : collect1) {
System.out.println("key:"+tup2._1+" value:"+tup2._2);
}
}
}
combineByKey
聚合操作,是各种聚集操作的鼻祖。
- createCombiner: combineByKey()
会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey() 会使用一个叫作createCombiner() 的函数来创建
那个键对应的累加器的初始值 - mergeValue: 如果这是一个在处理当前分区之前已经遇到的键, 它会使用 mergeValue()
方法将该键的累加器对应的当前值与这个新的值进行合并 - mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更
多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各 个分区的结果进行合并。
用Spark进行的操作:
import org.apache.spark.HashPartitioner
import org.apache.spark.sql.SparkSession
object Combine {
case class ScoreInfo(name:String,course:String,score:Int)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[2]")
.appName("combineByKey").getOrCreate()
val sc = spark.sparkContext
val scores = sc.makeRDD(List(
ScoreInfo("sam", "Math", 88),
ScoreInfo("sam", "English", 90),
ScoreInfo("sally", "Math", 85),
ScoreInfo("sally", "English", 95),
ScoreInfo("john", "Math", 78),
ScoreInfo("john", "English", 60),
ScoreInfo("mary", "Math", 93),
ScoreInfo("mary", "English", 81),
ScoreInfo("zs", "Math", 76),
ScoreInfo("zs", "English", 81),
ScoreInfo("ls", "Math", 94),
ScoreInfo("ls", "English", 90)
))
//将集合转换成二元组
val scoreWithKey = for {i <- scores} yield (i.name, i)
//分区数
val result = scoreWithKey.partitionBy(new HashPartitioner(3)).cache()
//result.collect.foreach(println) //name为key,ScoreInfo()为value
//result.foreachPartition(x=>println(x.length)) //输出的各分区处理的信息数量
//遍历每个分区中,各二元组中的数据。
result.foreachPartition(partContext=>
//partContext.foreach(x=>(println(x._1,x._2)))
partContext.foreach(x=>(println(x._1,x._2.course,x._2.score)))
)
//combineByKey聚合
val scoreInfo = scoreWithKey.combineByKey(
(x: ScoreInfo) => (x.score, 1),
(acc1: (Int, Int), x: ScoreInfo) => (acc1._1 + x.score, acc1._2 + 1),
(acc2: (Int, Int), acc3: (Int, Int)) => (acc2._1 + acc3._1, acc2._2 + acc3._2)
)
//求平均值
val stuAvg1 = scoreInfo.map({case(key,value)=>(key,value._1/value._2)})
val stuAvg2 = scoreInfo.map(x=>(x._1,x._2._1/x._2._2))
stuAvg2.foreach(println)
}
}
java版本
先新建一个类
import java.io.Serializable;
public class ScoreInfo implements Serializable {
public String name;
public String course;
public Integer score;
public ScoreInfo(String name, String course, Integer score) {
this.name = name;
this.course = course;
this.score = score;
}
}
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.List;
public class CombineJava {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[1]").setAppName("combineByKey");
JavaSparkContext sc = new JavaSparkContext(conf);
List<ScoreInfo> scores = new ArrayList<>();
scores.add(new ScoreInfo("zs", "Math", 99));
scores.add(new ScoreInfo("zs", "English", 97));
scores.add(new ScoreInfo("li", "Math", 85));
scores.add(new ScoreInfo("li", "English", 80));
scores.add(new ScoreInfo("ww", "Math", 90));
scores.add(new ScoreInfo("ww", "English", 70));
scores.add(new ScoreInfo("ml", "Math", 83));
scores.add(new ScoreInfo("ml", "English", 79));
JavaRDD<ScoreInfo> stuRdd = sc.parallelize(scores);
JavaPairRDD<String, ScoreInfo> stuRddInfo = stuRdd.mapToPair(new PairFunction<ScoreInfo, String, ScoreInfo>() {
@Override
public Tuple2<String, ScoreInfo> call(ScoreInfo s) throws Exception {
return new Tuple2<>(s.name, s);
}
});
// List<Tuple2<String, ScoreInfo>> collect = stuRddInfo.collect();
// for (Tuple2<String, ScoreInfo> tup2 : collect) {
// System.out.println(tup2._1+":"+tup2._2);
// }
//todo createCombine
//function见下方源码
Function<ScoreInfo, Tuple2<Integer, Integer>> createCombine = new Function<ScoreInfo, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(ScoreInfo s) throws Exception {
return new Tuple2<>(s.score, 1);
}
};
//todo mergeValue
//function2见下方源码
Function2<Tuple2<Integer, Integer>, ScoreInfo, Tuple2<Integer, Integer>> mergeValue = new Function2<Tuple2<Integer, Integer>, ScoreInfo, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> t, ScoreInfo s) throws Exception {
return new Tuple2<>(t._1 + s.score, t._2 + 1);
}
};
//todo mergeCombine
Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> mergeCombine = new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> t1, Tuple2<Integer, Integer> t2) throws Exception {
return new Tuple2<>(t1._1 + t2._1, t1._2 + t2._2);
}
};
JavaPairRDD<String, Tuple2<Integer, Integer>> CombineByKey = stuRddInfo.combineByKey(createCombine, mergeValue, mergeCombine);
JavaPairRDD<String, Integer> avg = CombineByKey.mapValues(new Function<Tuple2<Integer, Integer>, Integer>() {
@Override
public Integer call(Tuple2<Integer, Integer> t) throws Exception {
return t._1 / t._2;
}
});
for (Tuple2<String, Integer> tuple2 : avg.collect()) {
System.out.println(tuple2);
}
List<Tuple2<String, Tuple2<Integer, Integer>>> collect = CombineByKey.collect();
for (Tuple2<String, Tuple2<Integer, Integer>> tup2 : collect) {
System.out.println(tup2);
}
}
}
Function的源码:
输入为T1类型,输出为R类型
public interface Function <T1, R> extends java.io.Serializable {
R call(T1 t1) throws java.lang.Exception;
}
Function2源码:
输入类型为T1,T2,有两个输入,输出类型为R
public interface Function2 <T1, T2, R> extends java.io.Serializable {
R call(T1 t1, T2 t2) throws java.lang.Exception;
}
更多推荐
所有评论(0)