返回 登录
1

spark优化

Spark调优

一,分配更多的资源

1,在哪里分配
在提交任务时,在这三个参数上分配(–total-executor-cores –executor-memory –driver-memory)
2,分配那些资源
CPU core 和 memory
3,怎么分配

4,分配之后有什么效果
A, 给executor分配更多的内存,能够减少executor频繁的GC,因为发生频繁的GC,会导致spark性能立马下降
B, 给executor分配更多的内存,那么就会将尽量多的RDD的数据保存在内存中,可以避免磁盘IO(频繁的IO会使spark性能下降)
C, 给executor分配更多的内存,可以减少任务在拉取上一个stage数据时,将数据存入磁盘(即,减少shuffle阶段数据落地)
D, 分配更多的CPU core,意味着同一时间执行任务的数量就会提高(即,任务的并行度提升)

二,调节任务的并行度
我们在给任务分配更多资源的时候,就意味着任务能够具备更多的资源来执行,但是我们需要考虑一点,例如:给任务分配了5个executor,每个executor有10 个core,那么整个任务就会有5*10=50 个核,也就意味着同一时间能够并行执行的任务是50个,当我们的任务只有20个时,那么还有30个core是空闲的,浪费了,即,任务的数量不能满足并行度,针对这种情况,我们应该调节任务的数量,提高任务的并行度

怎么提高任务的并行度?
1,调节shuffle阶段任务的并行度,一般那些shuffle算子,有两个参数,后面一个参数就是用来调节任务的并行度的,例如:reduceByKey(+,500)
2,调节任务的分区数,可以使用这个算子,coalesce(500,true)
3,可以通过这个参数来调节任务的并行度 spark.default.parallelism ==> sparkConf.set(“spark.default.parallelism”,5)

三,对公用的RDD进行持久化
持久化场景:对于一个RDD被多次引用到,并且这个RDD计算过程复杂,计算时间特别耗 时,那么就可以对这个RDD进行持久化

如何进行持久化:调用RDD.persist(), 或者RDD.cache()
【注意】cache方法的底层就是调用persist方法

如果对RDD做持久化,默认持久化级别是StorageLevel.MEMEORY_ONLY ,也就是持久化到内存中去,这种持久化级别效率是最快的,但是由于是纯Java对象保存到内存中,那么内存可能保存的数据就会较少
如果当我们集群资源有限时,那么我们可以采用MEMORY_ONLY_SER,也就是将Java对象进行序列化之后再持久化到内存中去,这种持久化的好处是能够持久化更多的数据到内存中,但是持久化时需要序列化,取出来又需要反序列化这一过程,性能相对于MEMORY_ONLY这种持久化要稍微弱点,但是还是比较高效的

如何选择RDD持久化策略
Spark提供的多种持久化级别,主要是在CPU和内存之间进行取舍,下面是一些通用的持久化级别的选择建议:
1、有限使用MEMORY_ONLY,如果可以缓存所有数据的话,那么就使用这种策略,因为春内村速度最快,而且没有序列化,不需要消耗CPU进行反序列化操作
2、如果MEMORY_ONLY策略,无法存储所有的数据的话,那么使用MEMORY_ONLY_SER,将数据进行序列化存储,纯内存操作还是非常快的,只是要消耗CPU进行反序列化
3、如果需要进行快速的失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了
4、能不适用DISK相关的策略,就不要使用,有的时候,从磁盘读取数据还不如重新计算一次

四,广播大变量
使用背景:当RDD 引用到了一个外部变量,并且这个外部变量数据量还不小,同时这个RDD对应的task数量特别多,那么此时使用广播变量就在合适不过了

使用原理:因为每个task都要拷贝一个副本到executor去执行,那么我们可以想象一下,如果有1000个task在某个worker上执行,而这个副本有100M,那么就意味着我们要拷贝100G的数据到某个worker上去执行,这样的话会大大消耗我们的网络流量,同时会加大executor的内存消耗,从而增加我们spark作业的运行时间,大大降低了spark作业的运行效率,增加了作业失败的概率,如何解决以上的问题?也就是什么时候该使用广播变量,我们可以将这种大的外部变量做成广播变量,那么每个executor的内存中只会有一个外部变量,这样的一个副本,针对所有的task都是共享的,这样的话就减少了网络流量的消耗,降低了executor的内存消耗,提高了spark 作业运行效率和 缩短了运行时间,同时降低了作业失败的概率

使用过程:
1、某个executor的第一个task先执行的时候,首先从自己的blockManager中查找外部变量,如果没有就从邻居的executor的blockManager的内存中获取这个外部变量,如果还是获取不到,就从driver端获取,拷贝这个外部变量到本地的executor的blockManager中
2、当这个executor的其他task执行时,就不需要再从外面获取这个外部变量的副本了,直接从本地的blockManager中获取即可

【注意】广播变量可读,不可写

五,使用Kryo序列化
默认情况下,spark内部是使用Java的序列化机制,objectOutputStream/ObjectInputStream对象输入输出机制,来进行序列化
这种默认的序列化机制的好处在于,处理起来比较方便,也不需要手动去做什么事情,只是,你在算子里面使用的变量,必须实现Serializable接口的,可序列化即可
但是缺点是,默认的序列化的效率不高,序列化速度比较慢,序列化以后的数据,占用的内存空间相对还是比较大的
Spark支持使用Kryo序列化机制。
Kyro序列化机制,比默认的Java序列化机制,速度要快,序列化后的数据要更小,大概是Java序列化机制的1/10。所以Kryo序列化优化以后,可以让网络传输的数据变少,在集群中耗费的内存资源大大减少

Kryo序列化机制,一旦启用以后,会生效的几个地方:
1、算子函数中使用到的外部变量
2、持久化RDD时进行序列化,StorageLevel.MEMORY_ONLY_SER
3、Shuffle

1、算子函数中使用到的外部变量,使用Kryo以后:优化网络传输的性能,可以优化集群中的内存的占用和消耗
2、持久化RDD,优化内存的占用和消耗,持久化RDD占用的内存越少,task执行的时候,创建的对象,就不至于频繁的占满内存,频繁的发生GC
3、Shuffle:可以优化网络传输的性能
Kryo之所以没有被作为默认的序列化类库的原因:
主要是因为Kryo要求,如果要达到他的最佳性能的话,那么就一定要注册你自定义的类(比如,你的算子函数中使用到了外部自定义的对象变量,这时,就要求必须注册你的类,否则Kryo达不到最佳的性能)。

首先,第一步:在sparkConf中设置一个属性:
spark.serializer,org.apache.spark.serializer.KryoSerializer类
例如:sparkConf.set(“spark.serializer”,”org.apahe.spark.serializer.KryoSerializer”)

其次,第二步:注册你使用到的。需要通过Kryo序列化的,一些自定义类:
sparkConf.registerKryoClasses()
例如:sparkConf.registerKryoClasses(Array(classOf[CategorySecondSort],classOf[…]))

六,减少shuffle的发生
1,尽量避免使用会发生shuffle的算子
2,在数据源头,对我们的数据提前进行聚合

七,优化数据结构
1、避免对象套对象
2、减少数据集合的使用,尽量使用数组
3、因为对象需要序列化,能不用对象就尽量不用,建议使用字符串拼接
4、可以使用第三方提供的占用内存小,序列化速度快的数据结构类库,例如 fastUtil类库

八,避免数据倾斜
1,数据倾斜的概念
有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时spark作业性能会比期望查很多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,一保证spark作业的性能

2,数据倾斜发生的现象:
A、绝大多数task执行的都非常快,但个别task执行极慢。比如,总共有1000个task,997个task都在一分钟内执行完了,但是剩余两三个task却要一两个小时,这种情况很常见
B、原本能够正常执行的spark作业,某天突然爆出 OOM(内存溢出)异常,官场异常栈,是我们写的业务代码造成的,这种情况比较少见

3,数据倾斜发生的原理
数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或者join等操作,此时,如果某个key对应的数据量特别大的话,就会发生数据倾斜,比如大部分key对应10 条数据,但是个别的key却对应了100万调数据,那么大部分task可能就只会分配到10-条数据,然后1秒就运行完了,但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个spark作业的运行进度是由运行时间最长的那个task决定的

4,然后定位导致数据倾斜的代码
数据倾斜只会发生在shuffle过程中,这里给大家罗列了一些常用的并且可能会触发shuffle操作的算子:groupByKey, reduceByKey, aggregateByKey, join, cogroup, repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子的某一个导致的

查看数据倾斜的数据:eventLogRDD.sample(false,0.1).countByKey().foreach(println(_))

5,数据倾斜的解决方案

解决方案一:使用HIve ETL预处理数据

方案使用场景:导致数据倾斜的是Hive表,如果该Hive表中的数据本身很不均匀(比如某个key对应了100万数据,其他的key才对应了10条数据),而且业务场景需要频繁使用spark对Hive表执行某个分析操作,那么比较适合使用这种方案

方案实现思路:此时可以评估一下,是否可以通过Hive来进行数据预处理(即通过Hive ETL预先对数据按照key进行聚合,或者是预先和其他表进行join),然后在spark作业中针对的数据源就不是原来的Hive表了,而是预处理后Hive表。此时由于数据已经预先进行了聚合或join操作,那么spark作业中也就不需要原先的shuffle类算子执行的这类操作了

方案实现原理:这种方案从根本上解决了数据倾斜,因为彻底避免了在spark中执行shuffle类算子,那么就不会有数据倾斜的问题了,但是这里也要提醒一下大家,这种方式属于治标不治本,因为毕竟数据本身就存在不均匀的问题,所以Hive ETL中进行groupBy 或者join等shuffle操作时,还是会出现数据倾斜,导致Hive ETL的速度很慢,我们只是把数据倾斜的发生提前到了Hive ETL中,避免了spark程序发生数据倾斜而已

方案优点:实现起来简单便捷,效果还非常好,完全规避了数据倾斜,spark作业的性能会大幅提升

方案缺点:治标不治本,Hive ETL 中还是会发生数据倾斜

解决方案二:过滤少数导致倾斜的key

方案使用的场景:如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不是很大的话,那么很适合使用这种方案,比如99% 的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜

方案实现思路:如果我们判断那少数的几个数据量特别多的key,对作业的执行和就算结果不是特别重要的话,那么干脆就直接过滤掉那少数的几个key,比如,在spark SQL中可以使用where 子句过滤掉这些key或者在sparkRDD执行filter算子过滤掉这些key,如果每次作业执行时,动态判定那些key的数据量最多然后再进行过滤,那么可以直接使用sample算子对RDD进行采样,然后计算出每个key的数量,取数据量最多的key过滤掉即可

方案实现原理:将导致数据倾斜的key过滤掉之后,这些key就不参与计算了,自然不可能产生数据倾斜

方案优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。

方案缺点:使用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个

解决方案三:提高shuffle操作的并行度

方案使用场景:如果我们必须要对数据倾斜迎难而上,那么就建议优先使用这种方案,因为这是处理数据倾斜最简单的一种方案

方案实现思路:在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如:reduceByKey(+,1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量,对于sparkSQL中的shuffle类语句,比如GroupByKeyDemo,join,等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task 的并行度,该值默认是200,对于很多场景来说都有点过小

方案实现原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果有5个key,每个key对应10条数据,这5个key都是分配给一个task的。那么这个task就要处理50条数据,而增加了shuffle read task 以后。每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。

方案优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响

方案缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据时间来看,其效果有限

解决方案四:两阶段聚合(局部集合+全局聚合)

方案使用场景:对RDD执行reduceByKey等聚合类shuffle算子或者在sparkSQL中使用GroupBy 语句进行分组聚合时,比较适用这种场景

方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先的key就变得不一样了,比如(hello,1),(hello,1),(hello,1),(hello,1)… 就会变成(1_hello,1),(1_hello,1),(2_hello,1),(2_hello,1)… 接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么聚合结果,就会变成(1_hello,2),(2_hello,2)… 然后将各个key的前缀给去掉,就会变成(hello,2),(hello,2)… ,再次进行全局聚合操作,就可以得到最终结果了,比如(hello,4)

方案实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不听的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据过多的问题,接着取出掉随机前缀,再次进行全局聚合,就可以得到最终的结果

方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的,通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将spark作业的性能提升数倍以上

方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案

九,开启推测机制

推测机制后,如果集群中,某一台机器的几个task特别慢,推测机制会将任务分配到其他机器执行,最后spark会选取最快的作为最终结果

在spark-default.conf 中添加:spark.speculation true

推测机制与一下几个参数有关:
1、spark.speculation.interval 100: 检测周期,单位毫秒
2、Spark.speculation.quantile 0.75: 完成task的百分比时启动推测
3、Spark.speculation.multiplier 1.5: 比其他的慢多少倍时启动推测

十,算子使用技巧

1、非必要的情况下,不要使用collect
2、尽量使用,例如:foreachPartition 就不要使用foreach,在创建连接对象(数据连接对象等),尽量在分区上创建

评论