大数据—— Spark Core 知识点整理
1. Spark 和 Hadoop 相比有什么优势运行速度快:Spark拥有DAG执行引擎,支持在内存中对数据进行迭代计算。官方提供的数据表明,如果数据由磁盘读取,速度是Hadoop MapReduce的10倍以上,如果数据从内存中读取,速度可以高达100多倍。适用场景广泛:大数据分析统计,实时数据处理,图计算及机器学习易用性:编写简单,支持80种以上的高级算子,支持多种语言,数据源丰富,可部署在
1. Spark 和 Hadoop 相比有什么优势
运行速度快:
Spark拥有DAG执行引擎,支持在内存中对数据进行迭代计算。官方提供的数据表明,如果数据由磁盘读取,速度是Hadoop MapReduce的10倍以上,如果数据从内存中读取,速度可以高达100多倍。
适用场景广泛:
大数据分析统计,实时数据处理,图计算及机器学习
易用性:
编写简单,支持80种以上的高级算子,支持多种语言,数据源丰富,可部署在多种集群中
容错性高:
Spark引进了弹性分布式数据集RDD (Resilient Distributed Dataset) 的抽象,它是分布在一组节点中的只读对象集合,这些集合是弹性的,如果数据集一部分丢失,则可以根据“血统”(即充许基于数据衍生过程)对它们进行重建。另外在RDD计算时可以通过CheckPoint来实现容错,而CheckPoint有两种方式:CheckPoint Data,和Logging The Updates,用户可以控制采用哪种方式来实现容错。
2. Spark的适用场景
目前大数据处理场景有以下几个类型:
- 复杂的批量处理(Batch Data Processing),偏重点在于处理海量数据的能力,至于处理速度可忍受,通常的时间可能是在数十分钟到数小时;
- 基于历史数据的交互式查询(Interactive Query),通常的时间在数十秒到数十分钟之间
- 基于实时数据流的数据处理(Streaming Data Processing),通常在数百毫秒到数秒之间
3. Spark 中有几种部署模式
本地模式(方便本地调试):
Spark 程序运行于本地,通过local[]指定线程的数量,本地模式分为三类:
(1)local:只启动一个 executor
(2)local[k]:启动 k 个 executor
(3)local[*]:启动跟 cpu 数量相同的 executor
StandAlone 模式:
分布式部署集群,自带完整的服务,资源管理和任务监控是 Spark 自己监控,也是其他模式的基础
Spark on yarn 模式:
分布式部署集群,资源和任务监控交给 yarn 管理,Spark 客户端直接连接 Yarn,不需要额外构建 Spark 集群。有 yarn-client 和 yarn-cluster 两种模式,主要区别在于:Driver 程序的运行节点。
(1)cluster 适合生产,Driver 运行在集群子节点,具有容错功能
(2)client 适合调试,Driver 运行于客户端
4. Spark 的架构及架构中的基本组件
Spark的架构
采用了分布式计算中的Master-Slave模型,Master是对应集群中的含有Master进程的节点,Slave是集群中含有Worker进程的节点。Master作为整个集群的控制器,负责整个集群的正常运行;Worker相当于是计算节点,接收主节点命令与进行状态汇报;Executor负责任务的执行;Client作为用户的客户端负责提交应用,Driver负责控制一个应用的执行,组成图如下:
Spark集群部署后,需要在主节点和从节点分别启动Master进程和Worker进程,对整个集群进行控制。在一个Spark应用的执行过程中,Driver和Worker是两个重要角色。Driver程序是应用逻辑执行的起点,负责作业的调度,即Task任务的分发,而多个Worker用来管理计算节点和创建Executor并行处理任务。在执行阶段,Driver会将Task和Task所依赖的file和jar序列化后传递给对应的Worker机器,同时Executor对相应数据分区的任务进行处理。
Spark的架构中的基本组件:
- Cluster Manager:在standalone模式中即为Master主节点,控制整个集群,监控worker。在YARN模式中为资源管理器
- Worker:从节点,负责控制计算节点,启动Executor或者Driver。在YARN模式中为NodeManager,负责计算节点的控制。
- Driver:运行Application的main()函数并创建SparkContext。
- Executor:执行器,在worker node上执行任务的组件、用于启动线程池运行任务。每个Application拥有独立的一组Executor。
- SparkContext:整个应用的上下文,控制应用的生命周期。
- RDD:Spark的基础计算单元,一组RDD可形成执行的有向无环图RDD Graph。
- DAG Scheduler:根据作业(task)构建基于Stage的DAG,并提交Stage给TaskScheduler。
- TaskScheduler:将任务(task)分发给Executor执行。
- SparkEnv:线程级别的上下文, 存储运行时的重要组件的引用。
5. Spark 的运行流程
- 构建Spark Application的运行环境,启动SparkContext
- SparkContext向资源管理器(可以是Standalone,Mesos,Yarn)申请运行Executor资源,并启动StandaloneExecutorbackend
- Executor向SparkContext申请Task
- SparkContext将应用程序分发给Executor
- SparkContext构建成DAG图,将DAG图分解成Stage、将Taskset发送给Task Scheduler,最后由Task Scheduler将Task发送给Executor运行
- Task在Executor上运行,运行完释放所有资源
6. Spark 中RDD的概述和特性
概念:
RDD(Resilient Distributed Dataset),弹性分布式数据集,是 Spark 中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合
RDD 五大特性:
- A list of partitions 一个分区列表,RDD 中的数据都存在一个分区列表里面
- A function for computing each split 作用在每一个分区中的函数
- A list of dependencies on other RDDs 一个 RDD 依赖与其他多个 RDD,这个点很重要,RDD 的容错机制就是依据这个特性而来的
- Optionally, a Partitioner for key-value RDDs (e.g to say that the RDD is hash-partitioned) 可选的,针对kv类型的RDD才具有的特性,作用四决定了数据的来源以及数据处理后的去向
- Optionally, a list of preferred locations to compute each split on (e.g block locations for an HDFS file) 可选项,数据本地性,数据位置最优
7. Spark 中 RDD 的运行流程
- 创建RDD对象
- DAGScheduler模块介入运算,计算RDD之间的依赖关系,RDD之间的依赖关系就形成了DAG
- 每一个Job被分为多个Stage。划分Stage的一个主要依据是当前计算因子的输入是否是确定的,如果是则将其分在同一个Stage,避免多个Stage之间的消息传递开销
8. Driver 的功能
- 一个 Spark 作业运行时包括一个 Driver 进程,也是作业的主进程,具有 main 函数,并且有 SparkContext 的实例,是程序的入口点
- 负责向集群申请资源,向 master 注册信息,负责了作业的调度,负责作业的解析、解析 Stage 并调度 Task 到 Executor 上。包括 DAGScheduler,TaskScheduler
9. Spark 中如何区分宽、窄依赖
宽依赖:
指子 RDD 的分区依赖于RDD 的所有分区,这是因为 shuffle 类操作
窄依赖:
指父 RDD 的每一个分区最多被一个子 RDD 的分区所用,表现为一个父 RDD 的分区对应一个子 RDD 分区,和多个父 RDD 对应一个子 RDD 分区,map/filter 和 union 属于第一类,对输入进行协同划分(co-partitioned)的 join 属于第二类
10. spark 如何防止内存溢出
driver端的内存溢出
可以增大driver的内存参数:spark.driver.memory (default 1g)这个参数用来设置Driver的内存。在Spark程序中,SparkContext,DAGScheduler都是运行在Driver端的。对应rdd的Stage切分也是在Driver端运行,如果用户自己写的程序有过多的步骤,切分出过多的Stage,这部分信息消耗的是Driver的内存,这个时候就需要调大Driver的内存。
map过程产生大量对象导致内存溢出
这种溢出的原因是在单个 map 中产生了大量的对象导致的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),这个操作在 rdd 中,每个对象都产生了10000个对象,这肯定很容易产生内存溢出的问题。针对这种问题,在不增加内存的情况下,可以通过减少每个 Task 的大小,以便达到每个 Task 即使产生大量的对象Executor的内存也能够装得下。具体做法可以在会产生大量对象的map操作之前调用 repartition 方法,分区成更小的块传入 map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。面对这种问题注意,不能使用rdd.coalesce方法,这个方法只能减少分区,不能增加分区,不会有 shuffle 的过程。
数据不平衡导致内存溢出
数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,解决方法和上面说的类似,就是调用 repartition 重新分区。
shuffle后内存溢出
shuffle 内存溢出的情况可以说都是 shuffle 后,单个文件过大导致的。在 Spark 中,join,reduceByKey 这一类型的过程,都会有 shuffle 的过程,在 shuffle 的使用,需要传入一个partitioner,大部分 Spark 中的 shuffle 操作,默认的 partitioner 都是 HashPatitioner,默认值是父RDD 中最大的分区数,这个参数通过 spark.default.parallelism 控制(在 spark-sql 中用spark.sql.shuffle.partitions) , spark.default.parallelism 参数只对 HashPartitioner 有效,所以如果是别的Partitioner或者自己实现的Partitioner就不能使用spark.default.parallelism这个参数来控制shuffle 的并发量了。如果是别的 partitioner 导致的 shuffle 内存溢出,就需要从 partitioner 的代码增加 partitions 的数量。
standalone 模式下资源分配不均匀导致内存溢出
在 standalone 的模式下如果配置了 --total-executor-cores 和 --executor-memory 这两个参数,但是没有配置 --executor-cores 这个参数的话,就有可能导致,每个 Executor 的 memory 是一样的,但是 cores 的数量不同,那么在 cores 数量多的Executor中,由于能够同时执行多个Task,就容易导致内存溢出的情况。这种情况的解决方法就是同时配置 --executor-cores 或者spark.executor.cores 参数,确保 Executor 资源分配均匀。
使用 rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) 代替 rdd.cache()
rdd.cache()和rdd.persist(Storage.MEMORY_ONLY) 是等价的,在内存不足的时候rdd.cache() 的数据会丢失,再次使用的时候会重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) 在内存不足的时候会存储在磁盘,避免重算,只是消耗点 IO 时间。
11. Spark 中groupByKey、reduceByKey 和 combineKey 的区别
groupByKey:
对每个 key 对应的多个 value 进行操作,但只能汇总成一个 sequence,本身不能自定义函数,只能额外通过map/mapValues来实现
reduceByKey:
用于对每个 key 对应的多个 value 进行 merge(合并)操作,最重要的是现在分区内进行 merge 操作,并且 merge操作可以自定义
combineKey:
reduceByKey 底层使用的就是 combineKey
总结:优先使用 reduceByKey,因为 reduceByKey 会在 shuffle 之前对数据合并,大大减少了 IO 消耗
12. foreach 和 map 的区别
相同点:
都是用于遍历集合对象,并对每一项执行指定的方法
不同点:
- foreach 无返回值,map 返回集合对象。foreach 用于遍历集合,map 用于集合的转换
- foreach 是 action 算子,map 是 transformation 算子
- foreach 中的处理逻辑是串行的,map 中的处理逻辑是并行的,根本原因是由于foreach 是 action 算子,而 map 是 transformation 算子
13. map 与 mapPartitions 的区别
区别:
- map 是对 rdd 中的每一个元素进行操作
- mapPartitions 则是对 rdd 中的每一个分区的迭代器进行操作
RDD 中每个分区数据量不大的情形:
- map 操作性能低下,如果一个 partition 中有1万条数据,那么在分析每个分区时,function 要执行和计算1万次
- mapPartition 性能较高,使用 mapPartition 操作之后,一个 task 仅仅会执行一次 function,function 一次接收所有的 partition 数据。只要执行一次就可以了,性能比较高
RDD 中的每个分区数据量超大的情形,比如一个 Partition 有 100万条数据:
- map 能正常执行完
- mapPartition 一次传入一个 function 后,可能一下子内存不够用,造成内存溢出(OOM)
14. foreach 和 foreachPartition 的区别
相同:
foreach 和 foreachPartition 都属于 action 算子
不同点:
- foreach 每次处理 RDD 中的一条数据
- foreachPartition 每次处理 RDD 中每个分区迭代器中的数据
15. Spark 中 repartition、coalesce、partitionBy 这三种分区有什么区别,哪种效率更高,什么情况下使用重分区
区别:
repartition 的底层调用了 coalesce,并且默认发生 shuffle,如果分区数由多变少,建议使用 coalesce,不走 shuffle
针对键值类型的 RDD 可以使用 partitionBy,将相同的key分配到一个分区内,后面再执行类型相同 key 聚合操作的时候就可以避免发生 shuffle
效率比较:
如果后面有对 可以 进行聚合的操作,建议使用partitionBy,可以避免 shuffle,效率会更高(重点在避免 shuffle)
使用场景:
一般就是在数据初始加载,或者进行 filter 之后数据分配不均匀,需要对其进行重分区操作
16. Spark 中血统(Lineage)的概念
什么是血统
利用内存加快数据加载,在其它的In-Memory类数据库或Cache类系统中也有实现。Spark的主要区别在于它采用血统(Lineage)来时实现分布式运算环境下的数据容错性(节点失效、数据丢失)问题。RDD Lineage被称为RDD运算图或RDD依赖关系图,是RDD所有父RDD的图。它是在RDD上执行transformations函数并创建逻辑执行计划(logical execution plan)的结果,是RDD的逻辑执行计划。相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage找到丢失的父RDD的分区进行局部计算来恢复丢失的数据,这样可以节省资源提高运行效率。这种粗颗粒的数据模型,限制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。
依赖的类型
依赖关系决定Lineage的复杂程度,同时也是的RDD具有了容错性。因为当某一个分区里的数据丢失了,Spark程序会根据依赖关系进行局部计算来恢复丢失的数据。依赖的关系主要分为2种,分别是 宽依赖(Wide Dependencies)和窄依赖(Narrow Dependencies)。
容错原理
在Spark的容错机制中,当一个节点宕机了,进行容错恢复时,对于窄依赖来讲,进行重计算时只要把丢失的父RDD分区重算即可,不依赖于其他节点。而对于Shuffle Dependency来说,进行重计算时需要父RDD的分区都存在,这样计算量就太大了比较耗费性能。
同时在RDD计算,也通过checkpoint进行容错,做checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错,默认是logging the updates方式,通过记录跟踪所有生成RDD的转换(transformations)也就是记录每个RDD的lineage(血统)来重新计算生成丢失的分区数据。但是,在使用checkpoint算子来做检查点,不仅需要考虑Lineage长度,还也要考虑Lineage的复杂度(是否有宽依赖),对于Shuffle Dependency加Checkpoint是一个值得提倡的做法。
17. Spark 中的持久化机制
cache() 和 persist()
针对一个 RDD 反复执行多个操作的场景,可以进行 cache() 或 persist() 进行持久化,在该 RDD 第一次被计算出来时,就会直接缓存在每个节点中。而且 Spark 的持久化机制还是自动容错的,如果持久化的 RDD 的任何时候 partition 丢失了,那么 Spark 会自动通过其源 RDD,使用 transformation 操作重新计算该 partition
cache() 和 persist() 区别在于:cache() 是 persist() 的一种简化方式,cache() 底层调用的就是 persist() 的无参版本,同时就是调用 persist(MEMORY_ONLY),将数据持久化到内存中,如果需要从内存中去除缓存,那么可以使用 unpersist() 方法
checkpoint
持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等
Checkpoint 的产生就是为了更加可靠的数据持久化,在 Checkpoint 的时候一般把数据放在在 HDFS 上,这就天然的借助了 HDFS 天生的高容错、高可靠来实现数据最大程度上的安全,实现了 RDD 的容错和高可用
18. 持久化和 Checkpoint 的区别:
- 位置:Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存--实验中) Checkpoint 可以保存数据到 HDFS 这类可靠的存储上。
- 生命周期:Cache 和 Persist 的 RDD 会在程序结束后会被清除或者手动调用 unpersist 方法 Checkpoint 的 RDD 在程序结束后依然存在,不会被删除。
19. DAG 的生成和划分 Stage
DAG 介绍
DAG 是什么
DAG(Directed Acyclic Graph 有向无环图)指的是数据转换执行的过程,有方向,无闭环(其实就是 RDD 执行的流程);
原始的 RDD 通过一系列的转换操作就形成了 DAG 有向无环图,任务执行时,可以按照 DAG 的描述,执行真正的计算(数据被操作的一个过程)。
DAG 的边界
开始:通过 SparkContext 创建的 RDD;
结束:触发 Action,一旦触发 Action 就形成了一个完整的 DAG。
DAG 划分 Stage
原因:为了并行计算
一个复杂的业务逻辑如果有 shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照 shuffle 进行划分(也就是按照宽依赖就行划分),就可以将一个 DAG 划分成多个 Stage/阶段,在同一个 Stage 中,会有多个算子操作,可以形成一个 pipeline 流水线,流水线内的多个平行的分区可以并行执行。
如何划分 Stage
Spark 会根据 shuffle/宽依赖使用回溯算法来对 DAG 进行 Stage 划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的 RDD 加入到当前的 stage/阶段中
19. RDD 累加器和广播变量
在默认情况下,当 Spark 在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。
为了满足这种需求,Spark 提供了两种类型的变量:
(1)累加器 accumulators:累加器支持在所有不同节点之间进行累加计算(比如计数或者求和)。
(2)广播变量 broadcast variables:广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。
累加器:val xx: Accumulator[Int] = sc.accumulator(0)
通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果:
import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
object AccumulatorTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
//使用scala集合完成累加
var counter1: Int = 0;
var data = Seq(1,2,3)
data.foreach(x => counter1 += x )
println(counter1)//6
println("+++++++++++++++++++++++++")
//使用RDD进行累加
var counter2: Int = 0;
val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3]
dataRDD.foreach(x => counter2 += x)
println(counter2)//0
//注意:上面的RDD操作运行结果是0
//因为foreach中的函数是传递给Worker中的Executor执行,用到了counter2变量
//而counter2变量在Driver端定义的,在传递给Executor的时候,各个Executor都有了一份counter2
//最后各个Executor将各自个x加到自己的counter2上面了,和Driver端的counter2没有关系
//那这个问题得解决啊!不能因为使用了Spark连累加都做不了了啊!
//如果解决?---使用累加器
val counter3: Accumulator[Int] = sc.accumulator(0)
dataRDD.foreach(x => counter3 += x)
println(counter3)//6
}
}
广播变量:sc.broadcast()
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object BroadcastVariablesTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
//不使用广播变量
val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")))
val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap
//scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana)
val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3))
//根据水果编号取水果名称
val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x))
fruitNames.foreach(println)
//注意:以上代码看似一点问题没有,但是考虑到数据量如果较大,且Task数较多,
//那么会导致,被各个Task共用到的fruitMap会被多次传输
//应该要减少fruitMap的传输,一台机器上一个,被该台机器中的Task共用即可
//如何做到?---使用广播变量
//注意:广播变量的值不能被修改,如需修改可以将数据存到外部数据源,如MySQL、Redis
println("=====================")
val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap)
val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x))
fruitNames2.foreach(println)
}
}
20. Spark 中 Yarn-Cluster 方式提交任务的整个流程
- 客户端提交 Application 应用程序,发送请求给 ResourceManager,请求启动 ApplicationMaster
- ResourceManager 收到请求后,随机在一台 NodeManager 上启动 ApplicationMaster(相当于 Driver 端)
- ApplicationMaster 启动,ApplicationMaster 发送请求到 ResourceManager,请求一批 Container 用于启动 Executor
- ResourceManager 返回一批 NodeManager 给 ApplicationMaster
- ApplicationMaster 连接到 NodeManager,发送请求到 NodeManager 启动 Executor
- Executor 反向注册到 ApplicationMaster 所在的节点的 Driver,Driver 发送 task 到 Executor
21. Spark join的优化经验
- 参数优化
- RDD优化
- 算子优化
- shuffle优化
- 数据倾斜优化
具体可参考先前文章《大数据—— Spark 优化》
22.Spark的shuffle
Spark2.1.1 版本之后只有 SortShuffle,之前版本还有 HashShuffle
- 会对数据进行排序
- 在写入缓存之前,如果是 reduceByKey 之类的算子,则会先写入到一个 Map 内存数据结构中,而如果是 join 之类的算子,则先写入到 Array 内存数据结构中。在每条数据写入前先判断是否到达一定阈值,到达则写入缓冲区
- 复用一个 core 的 Task 会写到同一个文件里,并生成一个索引文件。其中记录了下一个 ShuffleMapStage 中每一个 task 所要拉取数据的 start offset 和 end offset
23. Spark 中数据倾斜解决方案
(1)聚合源数据
在生成源数据时按照 key 进行分组聚合,就不需要在 Spark 中使用 reduceByKey 和 groupByKey
(2)过滤导致倾斜的 key
如果业务允许或与客户沟通同意后,可以把这些导致倾斜的 key进行过滤
(3)提高 shuffle 操作 reduce 的并行度
增加 task 的并行度,如果运行时间仍然很长,则需要考虑其他方案
(4)双重聚合
将 key 通过增加随机数前缀的方式进行打散,这样重复多的 key 会被分入多个组,然后进行局部聚合(第一次聚合),接着移除 前缀,然后进行全局聚合
(5)将 reduce join 转换成 map join(小表 和 大表 join 时出现数据倾斜,优先考虑广播)
如果两个 RDD 进行 join,其中有一个 RDD 比较小的话,可以通过将小的 RDD broadcast 广播出去,这样每个节点的 blockManager 中都有一份,这样就不存在发生 shuffle,也就不会有数据倾斜的问题了
(6)sample 抽样分解聚合
将导致数据倾斜的 key 可以单拉出来,然后用一个 RDD 进行打乱 join
(7)使用随机数和扩容表进行 join
通过 flatMap 进行扩容,然后将随机数打入进去,再进行 join,这样的话就不能根本的解决数据倾斜,但可以有效的缓解数据倾斜的问题,也能提高性能
24. Spark 读取数据生成 RDD 分区默认多少
默认分区数取 CPU 核数和 2 的最小值
往期面试题整理:
更多推荐
所有评论(0)