3 ,SparkCore: RDD ,lineage ,设置内存大小,三类算子,日志打印级别,持久化算子
一 ,RDD0 ,RDD APIhttp://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD1 ,rdd 是什么 :RDD ( Resilient Distributed Dataset ),弹性分布式数据集。2 ,rdd 分区 :分区数 : 读过几个 block ,rdd 就产生几...
·
一 ,RDD
0 ,RDD API
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD
1 ,rdd 是什么 :
RDD ( Resilient Distributed Dataset ),弹性分布式数据集。
2 ,rdd 分区 :
- 分区数 : 读过几个 block ,rdd 就产生几个分区
- 图解 :
3 ,rdd 的五大特性 :
- RDD 是由一系列的 partition 组成的。
rdd 是逻辑概念,分区才是物理存在的数据 - 函数是作用在每一个 partition(split)上的。
算子 : spark 方法
算子是作用在分区上的 - RDD 之间有一系列的依赖关系。
不丢数据, - 分区器是作用在 K,V 格式的 RDD 上。
1 ,分区器 : 决定了数据去往哪个分区
2 ,k-v 格式 rdd :rdd 中的数据是,二元组数据 - RDD 提供一系列最佳的计算位置。
最佳位置 : 数据所在节点,进行计算 ( 移动计算,不移动数据 )
4 ,弹性 : 不丢数据
不管哪个 RDD 的数据丢了,都会利用算子逻辑,从他的上一层,从新计算出丢掉的数据。
5 ,小节 :
- hdfs - rdd : 关系 ?
每个 fileSplit 对应一个 block ,也就对应一个 rdd 分区 - k-v 形式 rdd ?
rdd 的数据是 二元组 - 弹性 : 体现在哪里
1 ,容错 : rdd 之间有依赖关系,下游数据丢了,还可以从上层数据重新生成
2 ,分区 : 可多可少,分区数量是可以控制的。分区越多,处理数据的时候,并行度就越高。
3 ,什么时候,适合多分区 : 大量数据,需要并行处理的时候
4 ,什么时候,适合少分区 : 少量数据,但是分区却很多,我们需要进行减少分区的操作。 - 哪里体现分布式 :
rdd 数据,分布在不同节点上。
6 ,运行模式 :哪里给我提供的资源
- Local
多用于本地测试,如在eclipse,idea中写程序测试等。 - Standalone
Standalone是Spark自带的一个资源调度框架,它支持完全分布式。 - Yarn
Hadoop生态圈里面的一个资源调度框架,Spark也是可以基于Yarn来计算的。 - Mesos
资源调度框架。
7 ,spark 执行原理图 : 以 4 个节点为例
- Driver 和 Worker 是启动在节点上的进程,运行在 JVM 中的进程。
- Driver 与集群节点之间有频繁的通信。
- Driver负责任务(tasks)的分发和结果的回收。任务的调度。如果task的计算结果非常大就不要回收了。因为会造成 oom ( 内存溢出 )。
- Worker 是 Standalone 资源调度框架里面资源管理的从节点。也是 JVM 进程。
- Master 是 Standalone 资源调度框架里面资源管理的主节点。也是 JVM 进程。
8 ,配置项 :
http://spark.apache.org/docs/latest/configuration.html
9 ,设置内存大小
conf.set("spark.driver.memory","1g")
10 ,lineage :spark 的血统 ( dag 有向无环图 )
rdd 链
二 ,算子 ( 三类 ) :
1 ,分类 :
- Transformations转换算子
- Action行动算子
- 控制算子
2 ,Transformations 转换算子 ( rdd --> rdd )
- 特点 : 懒执行,当他下边有 action 算子的时候,才执行。
- 例如 : flatMap 算子
map , flatMap , filter , reduceByKey , sample , groupByKey
3 ,Action 行动算子 ( rdd --> 值 )
- 例子 :
foreach , count , collect , reduce , save
4 ,application - job
- application : 1 个整体的任务
- job : 每个 action 算子对应一个 job
- application - job :1 个application 由多个 job 组成
5 ,一共有几个单词 :
package day01.demo01.sz
import org.apache.spark.{SparkConf, SparkContext}
object CountTest {
def main(args: Array[String]): Unit = {
// 配置对象
var conf = new SparkConf()
// 任务名字
conf.setAppName("wc")
// 运行模式
conf.setMaster("local")
// spark 上下文 :是通往 spark 集群的唯一通道
var sc = new SparkContext(conf)
// 统计一共有几个单词
val c = sc.textFile("./data/words").flatMap(_.split(" ")).map((_,1)).count()
println(c)
// 释放资源
sc.stop()
}
}
6 ,用 key 排序 :
val res = sc.textFile("./data/words").flatMap(_.split(" ")).map((_,1)).sortByKey()
res.foreach(println)
7 ,用 value 排序 : 多的在前 ( sortBy )
package day01.demo01.sz
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CountTest {
def main(args: Array[String]): Unit = {
// 配置对象
var conf = new SparkConf()
// 任务名字
conf.setAppName("wc")
// 运行模式
conf.setMaster("local")
// spark 上下文 :是通往 spark 集群的唯一通道
var sc = new SparkContext(conf)
// 统计一共有几个单词
val value: RDD[(String, Int)] = sc.textFile("./data/words").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false)
value.foreach(println)
// 释放资源
sc.stop()
}
}
8 ,设置日志打印级别
sc.setLogLevel("error")
9 ,元组反转再反转( swap ),sortByKey ,排序 value :
- 思路 :
先反转,在排序,排序后,再反转,就达到了用 value 排序的效果 - 精华代码 : swap
val value = sc.textFile("./data/words").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).map(_.swap).sortByKey().map(_.swap)
- 全部代码 :
package day01.demo01.sz
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CountTest {
def main(args: Array[String]): Unit = {
// 配置对象
var conf = new SparkConf()
// 任务名字
conf.setAppName("wc")
// 运行模式
conf.setMaster("local")
// spark 上下文 :是通往 spark 集群的唯一通道
var sc = new SparkContext(conf)
sc.setLogLevel("error")
// 统计一共有几个单词
val value = sc.textFile("./data/words").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).map(_.swap).sortByKey().map(_.swap)
value.foreach(println)
// 释放资源
sc.stop()
}
}
10 ,随机抽样
- 目的 : 通过样本看整体。
- 随机抽样,同一个元素可以被重复抽取,我要抽取总量的 33.3%
val res = value.sample(true,0.333)
- 参数解释 :
1 ,true - 可放回抽样 ; false - 不可放回抽样
2 ,出去多少条数据,总体的比例 ( 得到的不是绝对的个数,是大概多少条,有可能多抽取几条,或者少了几条 ) 。
3 ,种子 : long 类型,随机数生成器的种子,有了它,每次生成的随机数就是一定的
如果使用了种子,我们每次运行程序,抽取的数据都是一样的
不然,每次都变。 - 带有种子的随机抽样 : 每次抽取的数据都一样
package day01.demo01.sz
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CountTest {
def main(args: Array[String]): Unit = {
// 配置对象
var conf = new SparkConf()
// 任务名字
conf.setAppName("wc")
// 运行模式
conf.setMaster("local")
// spark 上下文 :是通往 spark 集群的唯一通道
var sc = new SparkContext(conf)
val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
// 抽样
val res = rdd1.sample(true,0.2,1000l)
res.foreach(println)
// 释放资源
sc.stop()
}
}
11 ,action 算子 :
- 共几条数据 :count
val c = sc.textFile("./data/words").flatMap(_.split(" ")).map((_,1)).count()
- 前 n 个元素 : take
rdd.take(2).foreach(println)
- 遍历 :foreach
rdd.foreach(println)
- 将计算结果回收到 Driver 端。当数据量很大时就不要回收了,会造成 oom : collect
val arr: Array[Int] = rdd.collect()
arr.foreach(println)
- 根据 key ,累加 value :countByKey
package day01.demo01.sz
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CountTest {
def main(args: Array[String]): Unit = {
// 配置对象
var conf = new SparkConf()
// 任务名字
conf.setAppName("wc")
// 运行模式
conf.setMaster("local")
// spark 上下文 :是通往 spark 集群的唯一通道
var sc = new SparkContext(conf)
val rdd = sc.parallelize(List(("a",1),("a",1),("b",1),("b",1)))
val res: collection.Map[String, Long] = rdd.countByKey()
res.foreach(println)
// 释放资源
sc.stop()
}
}
12 ,持久化算子 : 需求
- 原始代码 :
val rdd1: RDD[String] = sc.textFile("./data/words")
val rdd2: RDD[String] = rdd1.flatMap(_.split(" "))
val i: Long = rdd2.count()
val arr: Array[String] = rdd2.collect()
- 原始代码的缺陷 :
1 ,rdd 不存数据
2 ,每次的 action 代码触发执行逻辑,从头读数据
3 ,我们上面的代码,实际上,读取了两次数据
12 ,持久化算子 : 都是懒执行的
- cache : 缓存算子,默认将数据存在内存中
- perist : 序列化算子,最常用的是 MEMORY_ONLY 和 MEMORY_AND_DISK。也可以有副本数
- checkpoint :硬盘算子,checkpoint 将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。checkpoint目录数据当application执行完之后不会被清除。
13 ,缓存算子 : cache ( 内存存储,懒执行 )
- 思路 : 第二次求总条数的时候,缓存和不缓存,有巨大的时间差距
- 目的 : 看看缓存与不缓存的时间差距是多少
- 不缓存,每次遇到 action 算子,都从头读一次数据
- 不缓存,代码 :
package day01.demo01.hs
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Hs {
def main(args: Array[String]): Unit = {
// 准备工作
val conf: SparkConf = new SparkConf().setAppName("hs").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
// 一共多少条数据
var rdd1: RDD[String] = sc.textFile("./data/persistData.txt")
// rdd1 = rdd1.cache()
val i:Long = rdd1.count()
println("数据条数:"+i)
// 第二次求总条数
val start: Long = System.currentTimeMillis()
val j:Long = rdd1.count()
println("数据条数:"+j)
val end: Long = System.currentTimeMillis()
println("耗时:"+(end-start))
sc.stop()
}
}
- 耗时 : 1918 ,1770 ,2034
- 缓存,代码 : rdd1 = rdd1.cache()
package day01.demo01.hs
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Hs {
def main(args: Array[String]): Unit = {
// 准备工作
val conf: SparkConf = new SparkConf().setAppName("hs").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
// 一共多少条数据
var rdd1: RDD[String] = sc.textFile("./data/persistData.txt")
rdd1 = rdd1.cache()
val i:Long = rdd1.count()
println("数据条数:"+i)
// 第二次求总条数
val start: Long = System.currentTimeMillis()
val j:Long = rdd1.count()
println("数据条数:"+j)
val end: Long = System.currentTimeMillis()
println("耗时:"+(end-start))
sc.stop()
}
}
- 耗时 : 168 ,314 ,162
- 结论 :
使用数据缓存技术,大大地提高了效率
14 ,缓存算子 : persist ( 手动指定缓存级别 )
- 实际上 ,cache 就是 persist 的仅使用内存形式
- persist 呢人存储级别 : 内存存储
- 一共有多少存储级别 : 源码 :( 硬盘,内存,堆外-外部存储,不序列化,副本数 )
def useDisk: Boolean = _useDisk
def useMemory: Boolean = _useMemory
def useOffHeap: Boolean = _useOffHeap
def deserialized: Boolean = _deserialized
def replication: Int = _replication
- 组合级别 : 我们开发者能够使用的 :
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
- 注意 :
MEMORY_AND_DISK : 内存放不下,才把数据放到磁盘 - 使用 : 内存放不下的时候,磁盘帮忙
1 ,精华代码 :
rdd1.persist(StorageLevel.MEMORY_AND_DISK)
2 ,全部代码 :
package day01.demo01.hs
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
object Hs {
def main(args: Array[String]): Unit = {
// 准备工作
val conf: SparkConf = new SparkConf().setAppName("hs").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
// 一共多少条数据
var rdd1: RDD[String] = sc.textFile("./data/persistData.txt")
// rdd1.cache()
rdd1.persist(StorageLevel.MEMORY_AND_DISK)
val i:Long = rdd1.count()
println("数据条数:"+i)
// 第二次求总条数
val start: Long = System.currentTimeMillis()
val j:Long = rdd1.count()
println("数据条数:"+j)
val end: Long = System.currentTimeMillis()
println("耗时:"+(end-start))
sc.stop()
}
}
- persist 的常用级别 :
MEMORY_ONLY
MEMORY_AND_DISK
15 ,cache 和 persist 的注意事项
- cache 和 persist 都是懒执行,必须有一个 action 类算子触发执行。
- cache 和 persist 算子的返回值可以赋值给一个变量,在其他 job 中直接使用这个变量就是使用持久化的数据了。持久化的单位是 partition。
- cache 和 persist 算子后不能立即紧跟 action 算子。
错误示范 : rdd.cache().count() - cache 和 persist 算子持久化的数据当 applilcation 执行完成之后会被清除。
16 ,checkPoint : 检查点
- 效果 :
1 ,checkpoint 将 RDD 持久化到磁盘。
2 ,还可以切断 RDD 之间的依赖关系。
3 ,checkpoint 目录数据当 application 执行完之后不会被清除。 - 目的 :
当一个计算很复杂的时候,我们把中间结果保存到磁盘,下次使用就直接从磁盘中拿数据。 - 图解 :
- 执行原理 :
1 ,当 RDD 的 job 执行完毕后,会从 finalRDD 从后往前回溯。
2 ,当回溯到某一个 RDD 调用了 checkpoint 方法,会对当前的 RDD 做一个标记。
3 ,Spark 框架会自动启动一个新的 job,重新计算这个 RDD 的数据,将数据持久化到 HDFS 上。 - 优化 :
对 RDD 执行 checkpoint 之前,最好对这个 RDD 先执行 cache,这样新启动的 job 只需要将内存中的数据拷贝到 HDFS 上就可以,省去了重新计算这一步。 - 注意 :
1 ,不要太多的使用 checkPoint ,磁盘受不了
2 ,对 rdd 的持久化,用的不多
3 ,通常是特俗场景下使用的,以后再说
4 ,checkPoint 前,先 cache 一下
5 ,checkPoint 真正的用处,不再 rdd 上,而是在别处 - 代码 : 指定外存目录
object Ck {
def main(args: Array[String]): Unit = {
// 准备工作
val conf: SparkConf = new SparkConf().setAppName("hs").setMaster("local")
val sc: SparkContext = new SparkContext(conf)
sc.setCheckpointDir("./data")
val rdd1: RDD[String] = sc.textFile("./data/words")
rdd1.checkpoint()
val arr: Array[String] = rdd1.collect()
arr.foreach(println)
sc.stop()
}
}
- 效果 : 出现缓存目录
17 ,从内存中移除 :
rdd.unpersist
更多推荐
已为社区贡献5条内容
所有评论(0)