一 ,RDD

0 ,RDD API

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD

1 ,rdd 是什么 :

RDD ( Resilient Distributed Dataset ),弹性分布式数据集。

2 ,rdd 分区 :

  1. 分区数 : 读过几个 block ,rdd 就产生几个分区
  2. 图解 :
    在这里插入图片描述

3 ,rdd 的五大特性 :

  1. RDD 是由一系列的 partition 组成的。
    rdd 是逻辑概念,分区才是物理存在的数据
  2. 函数是作用在每一个 partition(split)上的。
    算子 : spark 方法
    算子是作用在分区上的
  3. RDD 之间有一系列的依赖关系。
    不丢数据,
  4. 分区器是作用在 K,V 格式的 RDD 上。
    1 ,分区器 : 决定了数据去往哪个分区
    2 ,k-v 格式 rdd :rdd 中的数据是,二元组数据
  5. RDD 提供一系列最佳的计算位置。
    最佳位置 : 数据所在节点,进行计算 ( 移动计算,不移动数据 )

4 ,弹性 : 不丢数据

不管哪个 RDD 的数据丢了,都会利用算子逻辑,从他的上一层,从新计算出丢掉的数据。

5 ,小节 :

  1. hdfs - rdd : 关系 ?
    每个 fileSplit 对应一个 block ,也就对应一个 rdd 分区
  2. k-v 形式 rdd ?
    rdd 的数据是 二元组
  3. 弹性 : 体现在哪里
    1 ,容错 : rdd 之间有依赖关系,下游数据丢了,还可以从上层数据重新生成
    2 ,分区 : 可多可少,分区数量是可以控制的。分区越多,处理数据的时候,并行度就越高。
    3 ,什么时候,适合多分区 : 大量数据,需要并行处理的时候
    4 ,什么时候,适合少分区 : 少量数据,但是分区却很多,我们需要进行减少分区的操作。
  4. 哪里体现分布式 :
    rdd 数据,分布在不同节点上。

6 ,运行模式 :哪里给我提供的资源

  1. Local
    多用于本地测试,如在eclipse,idea中写程序测试等。
  2. Standalone
    Standalone是Spark自带的一个资源调度框架,它支持完全分布式。
  3. Yarn
    Hadoop生态圈里面的一个资源调度框架,Spark也是可以基于Yarn来计算的。
  4. Mesos
    资源调度框架。

7 ,spark 执行原理图 : 以 4 个节点为例

  1. Driver 和 Worker 是启动在节点上的进程,运行在 JVM 中的进程。
  2. Driver 与集群节点之间有频繁的通信。
  3. Driver负责任务(tasks)的分发和结果的回收。任务的调度。如果task的计算结果非常大就不要回收了。因为会造成 oom ( 内存溢出 )。
  4. Worker 是 Standalone 资源调度框架里面资源管理的从节点。也是 JVM 进程。
  5. Master 是 Standalone 资源调度框架里面资源管理的主节点。也是 JVM 进程。
    在这里插入图片描述

8 ,配置项 :

http://spark.apache.org/docs/latest/configuration.html

9 ,设置内存大小

conf.set("spark.driver.memory","1g")

在这里插入图片描述

10 ,lineage :spark 的血统 ( dag 有向无环图 )

rdd 链
在这里插入图片描述

二 ,算子 ( 三类 ) :

1 ,分类 :

  1. Transformations转换算子
  2. Action行动算子
  3. 控制算子

2 ,Transformations 转换算子 ( rdd --> rdd )

  1. 特点 : 懒执行,当他下边有 action 算子的时候,才执行。
  2. 例如 : flatMap 算子
    map , flatMap , filter , reduceByKey , sample , groupByKey

3 ,Action 行动算子 ( rdd --> 值 )

  1. 例子 :
    foreach , count , collect , reduce , save

4 ,application - job

  1. application : 1 个整体的任务
  2. job : 每个 action 算子对应一个 job
  3. application - job :1 个application 由多个 job 组成

5 ,一共有几个单词 :

package day01.demo01.sz

import org.apache.spark.{SparkConf, SparkContext}

object CountTest {
    def main(args: Array[String]): Unit = {
        //  配置对象
        var conf = new SparkConf()
        //  任务名字
        conf.setAppName("wc")
        //  运行模式
        conf.setMaster("local")
        //  spark 上下文 :是通往 spark 集群的唯一通道
        var sc = new SparkContext(conf)
        //  统计一共有几个单词
        val c = sc.textFile("./data/words").flatMap(_.split(" ")).map((_,1)).count()
        println(c)
        //  释放资源
        sc.stop()
    }
}

6 ,用 key 排序 :

val res = sc.textFile("./data/words").flatMap(_.split(" ")).map((_,1)).sortByKey()
res.foreach(println)

7 ,用 value 排序 : 多的在前 ( sortBy )

package day01.demo01.sz

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CountTest {
    def main(args: Array[String]): Unit = {
        //  配置对象
        var conf = new SparkConf()
        //  任务名字
        conf.setAppName("wc")
        //  运行模式
        conf.setMaster("local")
        //  spark 上下文 :是通往 spark 集群的唯一通道
        var sc = new SparkContext(conf)
        //  统计一共有几个单词
        val value: RDD[(String, Int)] = sc.textFile("./data/words").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false)
        value.foreach(println)
        //  释放资源
        sc.stop()
    }
}

8 ,设置日志打印级别

sc.setLogLevel("error")

在这里插入图片描述

9 ,元组反转再反转( swap ),sortByKey ,排序 value :

  1. 思路 :
    先反转,在排序,排序后,再反转,就达到了用 value 排序的效果
  2. 精华代码 : swap
val value = sc.textFile("./data/words").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).map(_.swap).sortByKey().map(_.swap)
  1. 全部代码 :
package day01.demo01.sz

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CountTest {
    def main(args: Array[String]): Unit = {
        //  配置对象
        var conf = new SparkConf()
        //  任务名字
        conf.setAppName("wc")
        //  运行模式
        conf.setMaster("local")
        //  spark 上下文 :是通往 spark 集群的唯一通道
        var sc = new SparkContext(conf)
        sc.setLogLevel("error")
        //  统计一共有几个单词
        val value = sc.textFile("./data/words").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).map(_.swap).sortByKey().map(_.swap)
        value.foreach(println)
        //  释放资源
        sc.stop()
    }
}

10 ,随机抽样

  1. 目的 : 通过样本看整体。
  2. 随机抽样,同一个元素可以被重复抽取,我要抽取总量的 33.3%
val res = value.sample(true,0.333)
  1. 参数解释 :
    1 ,true - 可放回抽样 ; false - 不可放回抽样
    2 ,出去多少条数据,总体的比例 ( 得到的不是绝对的个数,是大概多少条,有可能多抽取几条,或者少了几条 ) 。
    3 ,种子 : long 类型,随机数生成器的种子,有了它,每次生成的随机数就是一定的
    如果使用了种子,我们每次运行程序,抽取的数据都是一样的
    不然,每次都变。
  2. 带有种子的随机抽样 : 每次抽取的数据都一样
package day01.demo01.sz

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CountTest {
    def main(args: Array[String]): Unit = {
        //  配置对象
        var conf = new SparkConf()
        //  任务名字
        conf.setAppName("wc")
        //  运行模式
        conf.setMaster("local")
        //  spark 上下文 :是通往 spark 集群的唯一通道
        var sc = new SparkContext(conf)
        val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
        //  抽样
        val res = rdd1.sample(true,0.2,1000l)
        res.foreach(println)
        //  释放资源
        sc.stop()
    }
}

11 ,action 算子 :

  1. 共几条数据 :count
val c = sc.textFile("./data/words").flatMap(_.split(" ")).map((_,1)).count()
  1. 前 n 个元素 : take
rdd.take(2).foreach(println)
  1. 遍历 :foreach
rdd.foreach(println)
  1. 将计算结果回收到 Driver 端。当数据量很大时就不要回收了,会造成 oom : collect
val arr: Array[Int] = rdd.collect()
arr.foreach(println)
  1. 根据 key ,累加 value :countByKey
package day01.demo01.sz

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CountTest {
    def main(args: Array[String]): Unit = {
        //  配置对象
        var conf = new SparkConf()
        //  任务名字
        conf.setAppName("wc")
        //  运行模式
        conf.setMaster("local")
        //  spark 上下文 :是通往 spark 集群的唯一通道
        var sc = new SparkContext(conf)
        val rdd = sc.parallelize(List(("a",1),("a",1),("b",1),("b",1)))
        val res: collection.Map[String, Long] = rdd.countByKey()
        res.foreach(println)
        //  释放资源
        sc.stop()
    }
}

12 ,持久化算子 : 需求

  1. 原始代码 :
val rdd1: RDD[String] = sc.textFile("./data/words")
val rdd2: RDD[String] = rdd1.flatMap(_.split(" "))
val i: Long = rdd2.count()
val arr: Array[String] = rdd2.collect()
  1. 原始代码的缺陷 :
    1 ,rdd 不存数据
    2 ,每次的 action 代码触发执行逻辑,从头读数据
    3 ,我们上面的代码,实际上,读取了两次数据

12 ,持久化算子 : 都是懒执行的

  1. cache : 缓存算子,默认将数据存在内存中
  2. perist : 序列化算子,最常用的是 MEMORY_ONLY 和 MEMORY_AND_DISK。也可以有副本数
  3. checkpoint :硬盘算子,checkpoint 将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。checkpoint目录数据当application执行完之后不会被清除。

13 ,缓存算子 : cache ( 内存存储,懒执行 )

  1. 思路 : 第二次求总条数的时候,缓存和不缓存,有巨大的时间差距
  2. 目的 : 看看缓存与不缓存的时间差距是多少
  3. 不缓存,每次遇到 action 算子,都从头读一次数据
  4. 不缓存,代码 :
package day01.demo01.hs

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Hs {
    def main(args: Array[String]): Unit = {
        //  准备工作
        val conf: SparkConf = new SparkConf().setAppName("hs").setMaster("local")
        val sc: SparkContext = new SparkContext(conf)
        //  一共多少条数据
        var rdd1: RDD[String] = sc.textFile("./data/persistData.txt")
        //  rdd1 = rdd1.cache()
        val i:Long = rdd1.count()
        println("数据条数:"+i)
        //  第二次求总条数
        val start: Long = System.currentTimeMillis()
        val j:Long = rdd1.count()
        println("数据条数:"+j)
        val end: Long = System.currentTimeMillis()
        println("耗时:"+(end-start))
        sc.stop()
    }
}
  1. 耗时 : 1918 ,1770 ,2034
  2. 缓存,代码 : rdd1 = rdd1.cache()
package day01.demo01.hs

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Hs {
    def main(args: Array[String]): Unit = {
        //  准备工作
        val conf: SparkConf = new SparkConf().setAppName("hs").setMaster("local")
        val sc: SparkContext = new SparkContext(conf)
        //  一共多少条数据
        var rdd1: RDD[String] = sc.textFile("./data/persistData.txt")
        rdd1 = rdd1.cache()
        val i:Long = rdd1.count()
        println("数据条数:"+i)
        //  第二次求总条数
        val start: Long = System.currentTimeMillis()
        val j:Long = rdd1.count()
        println("数据条数:"+j)
        val end: Long = System.currentTimeMillis()
        println("耗时:"+(end-start))
        sc.stop()
    }
}
  1. 耗时 : 168 ,314 ,162
  2. 结论 :
    使用数据缓存技术,大大地提高了效率

14 ,缓存算子 : persist ( 手动指定缓存级别 )

  1. 实际上 ,cache 就是 persist 的仅使用内存形式
  2. persist 呢人存储级别 : 内存存储
  3. 一共有多少存储级别 : 源码 :( 硬盘,内存,堆外-外部存储,不序列化,副本数 )
  def useDisk: Boolean = _useDisk
  def useMemory: Boolean = _useMemory
  def useOffHeap: Boolean = _useOffHeap
  def deserialized: Boolean = _deserialized
  def replication: Int = _replication
  1. 组合级别 : 我们开发者能够使用的 :
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
  1. 注意 :
    MEMORY_AND_DISK : 内存放不下,才把数据放到磁盘
  2. 使用 : 内存放不下的时候,磁盘帮忙

1 ,精华代码 :

rdd1.persist(StorageLevel.MEMORY_AND_DISK)

2 ,全部代码 :

package day01.demo01.hs

import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

object Hs {
    def main(args: Array[String]): Unit = {
        //  准备工作
        val conf: SparkConf = new SparkConf().setAppName("hs").setMaster("local")
        val sc: SparkContext = new SparkContext(conf)
        //  一共多少条数据
        var rdd1: RDD[String] = sc.textFile("./data/persistData.txt")
        //  rdd1.cache()
        rdd1.persist(StorageLevel.MEMORY_AND_DISK)
        val i:Long = rdd1.count()
        println("数据条数:"+i)
        //  第二次求总条数
        val start: Long = System.currentTimeMillis()
        val j:Long = rdd1.count()
        println("数据条数:"+j)
        val end: Long = System.currentTimeMillis()
        println("耗时:"+(end-start))
        sc.stop()
    }
}
  1. persist 的常用级别 :
    MEMORY_ONLY
    MEMORY_AND_DISK

15 ,cache 和 persist 的注意事项

  1. cache 和 persist 都是懒执行,必须有一个 action 类算子触发执行。
  2. cache 和 persist 算子的返回值可以赋值给一个变量,在其他 job 中直接使用这个变量就是使用持久化的数据了。持久化的单位是 partition。
  3. cache 和 persist 算子后不能立即紧跟 action 算子。
    错误示范 : rdd.cache().count()
  4. cache 和 persist 算子持久化的数据当 applilcation 执行完成之后会被清除。

16 ,checkPoint : 检查点

  1. 效果 :
    1 ,checkpoint 将 RDD 持久化到磁盘。
    2 ,还可以切断 RDD 之间的依赖关系。
    3 ,checkpoint 目录数据当 application 执行完之后不会被清除。
  2. 目的 :
    当一个计算很复杂的时候,我们把中间结果保存到磁盘,下次使用就直接从磁盘中拿数据。
  3. 图解 :
    在这里插入图片描述
  4. 执行原理 :
    1 ,当 RDD 的 job 执行完毕后,会从 finalRDD 从后往前回溯。
    2 ,当回溯到某一个 RDD 调用了 checkpoint 方法,会对当前的 RDD 做一个标记。
    3 ,Spark 框架会自动启动一个新的 job,重新计算这个 RDD 的数据,将数据持久化到 HDFS 上。
  5. 优化 :
    对 RDD 执行 checkpoint 之前,最好对这个 RDD 先执行 cache,这样新启动的 job 只需要将内存中的数据拷贝到 HDFS 上就可以,省去了重新计算这一步。
  6. 注意 :
    1 ,不要太多的使用 checkPoint ,磁盘受不了
    2 ,对 rdd 的持久化,用的不多
    3 ,通常是特俗场景下使用的,以后再说
    4 ,checkPoint 前,先 cache 一下
    5 ,checkPoint 真正的用处,不再 rdd 上,而是在别处
  7. 代码 : 指定外存目录
object Ck {
    def main(args: Array[String]): Unit = {
        //  准备工作
        val conf: SparkConf = new SparkConf().setAppName("hs").setMaster("local")
        val sc: SparkContext = new SparkContext(conf)
        sc.setCheckpointDir("./data")
        val rdd1: RDD[String] = sc.textFile("./data/words")
        rdd1.checkpoint()
        val arr: Array[String] = rdd1.collect()
        arr.foreach(println)
        sc.stop()
    }
}
  1. 效果 : 出现缓存目录在这里插入图片描述

17 ,从内存中移除 :

rdd.unpersist

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐