一、Spark核心API
-----------------------------------------------
    [SparkContext]
        连接到spark集群,入口点.

    [HadoopRDD] extends RDD
        读取hadoop hdfs上的数据,hbase的数据,s3的数据

    [MapPartitionsRDD]
        分区RDD: 针对父RDD的每个分区,提供了函数,生成的新类型RDD.

    [PairRDDFunctions]
        对偶RDD函数类。
        可用于KV类型RDD的附加函数。可以通过隐式转化得到.

    [ShuffleRDD]
        从Shuffle中计算结果的RDD.

    [RDD]
        是分区的集合.
        弹性分布式数据集.
        不可变的数据分区集合.
        基本操作(map , filter , persist等)
        分区列表               //数据
        应用给每个切片的计算函数   //行为
        到其他RDD的依赖列表          //依赖关系
        (可选)针对kv类型RDD的分区类
        (可选)首选位置列表

    [DAGScheduler]调度器
        1.调度器工作原理:
            高级调度器层面,实现按照shuffle(网络间分发)划分阶段(stage).
            a.对每个JOB的各阶段,计算出有向无环图(DAG),并且跟踪RDD和每个阶段的输出。
            b.找出最小调度来合理的运行作业(优化)
            c.将Stage对象以TaskSet方式提交给底层的任务调度器TaskScheduler。
            d.底层调度器TaskScheduler,实现在cluster上运行job.
            e.TaskSet已经包含了全部的单独的task,这些Task都能够基于cluster的数据进行正确运行。

        2.Stage的创建过程
            在shuffle的边界处,打碎RDD Graph, 从而划分Stage

        3.RDD操作与Stage划分
            a.具有'窄依赖'的RDD操作(不需要shuffle的操作,比如map /filter),会被管道化至一个taskset中.只有一个Stage
            b.而具有shuffle(网络分发,一端输出,一端读取)依赖的操作, 则对应多个Stage
            c.每个stage中,都有一个针对其他stage的shuffle依赖,可以计算多个操作。

        4.调度器调度流程
            a.Dag调度器检测首选位置来运行rask
            b.基于当前的缓存状态,将首选位置传递给底层的task调度器来实现合理的调度

        5.故障处理
            a.shuffle过程中丢失文件而引发的故障,由DAG调度器处理
            b.stage内因为丢失文件引发的故障,由task调度器处理.在取消整个stage之前,task会进行少量次数的重试操作。

        6.容错
            a.为了容错,同一stage可能会运行多次,称之为"attemp"

            b.如果task调度器报告了一个故障(该故障是由于上一个stage丢失输出文件而导致的,shuffle),DAG调度就会重新提交丢失的stage
            这个故障是 通过具有 FetchFailed的CompletionEvent对象或者ExecutorLost进行检测的。

            c.DAG调度器会等待一段时间看其他节点或task是否失败,然后对丢失的stage重新提交taskset,计算丢失的task。


二、Spark术语介绍
-------------------------------------------------------
    [job]
        提交给调度器的顶层的工作项目,由ActiveJob表示。
        是Stage集合,一个job分为很多个阶段。

    [Stage]
        是task的集合,计算job中的中间结果。每个阶段可以有很多个任务
        同一RDD的每个分区都会应用相同的计算函数。
        Stage在shuffle的边界处进行隔离(因此引入了隔断,需要上一个stage完成后,才能得到output结果)
        如果这些job共用了同一个rdd的话,Rdd的stage通常可以在这些job中共享。

        有两种类型的stage:
            1)ResultStage,用于执行action动作的最终stage。
            2)ShuffleMapStage,对shuffle进行输出文件的写操作的。

        Stage是并行任务的集合,都会计算同一函数。
        内部所有task都有同样的shuffle依赖,
        每一个task都由调度器来运行,并且在shuffle边界处划分成不同阶段。
        调度器是以拓扑顺序执行的.

        每个stage可以shuffleMapStage,该阶段下输出是下一个stage的输入。对于shuffleMapStage,需要跟踪每个输出分区所在的节点。
        也可以是resultStage,该阶段task直接执行spark action。

        每个stage都有FirstJobId,区分于首次提交的id

    [ShuffleMapStage]
        产生数据输出,在每次shuffle之前发生。
        内部含有shuffleDep字段,和记录产生多少输出以及多少输出可用的字段
        DAGScheduler.submitMapStage()方法可以单独提交submitMapStage()

    [ResultStage]
        该阶段在RDD的一些分区中应用函数来计算Action的结果
        有些stage并不会在所有分区上执行。例如first(),lookup();

    [Task]
        单独的工作单元,每个发送给一台主机。

    [Cache tracking]
        Dag调度器找出哪些RDD被缓存,避免不必要的重复计算,同时,也会记住哪些shuffleMap已经输出了
        结果,避免map端shuffle的重复处理。

    [Preferred locations]
        dag调度器根据rdd的中首选位置属性计算task在哪里运行。

    [Cleanup]
        运行的job如果完成就会清楚数据结构避免内存泄漏,主要是针对耗时应用。

    [ActiveJob]
        在Dag调度器中运行job。

        作业分为两种类型,主要使用finalStage字段进行类型划分。
            1)result job,计算ResultStage来执行action.
            2)map-state job,为shuffleMapState结算计算输出结果以供下游stage使用。

        job只跟踪客户端提交的"leaf" stage,通过调用Dag调度器的submitjob或者submitMapStage()方法实现.
        job类型引发之前stage的执行,而且多个job可以共享之前的stage。这些依赖关系由DAG调度器内部管理。

    [LiveListenerBus]
        异步传输spark监听事件到监听器事件集合中。

    [EventLoop]
        从caller接受事件,在单独的事件线程中处理所有事件,该类的唯一子类是DAGSchedulerEventProcessLoop。

    [LiveListenerBus]
        监听器总线,存放Spark监听器事件的队列。用于监控。

    [OutputCommitCoordinator]
        输出提交协调器.决定提交的输出是否进入hdfs。


    [TaskScheduler]
        底层的调度器,唯一实现TaskSchedulerImpl。可插拔,同Dag调度器接受task,发送给cluster,
        运行任务,失败重试,返回事件给DAG调度器。

    [TaskSchedulerImpl]
        TaskScheduler调度器的唯一实现,通过BackendScheduler(后台调度器)实现各种类型集群的任务调度。


    [SchedulerBackend]
        可插拔的后台调度系统,本地调度,mesos调度,。。。
        在任务调度器之后启动,
        实现有三种
        1.LocalSchedulerBackend
            本地后台调度器
            启动task.

        2.StandaloneSchedulerBackend
            独立后台调度器

        3.CoarseGrainedSchedulerBackend
            粗粒度后台调度器

    [Executor]
        spark程序执行者,通过线程池执行任务。


三、Spark三级调度流程源码分析
-------------------------------------------
    1.sc.textFile:
        val rdd1 = textFile(); --> new HadoopRDD -->  val rdd1 = new MapParttionsRDD1

    2.rdd1.flatMap:
        val rdd2 = rdd1.flatMap() --> val rdd2 = new MapParttionsRDD2

    3.rdd1.map:
        val rdd3 = rdd2.map() --> val rdd3 = new MapParttionsRDD3

    4.rdd1.reduceByKey:
        val rdd4 = rdd3.reduceByKey() --> 隐式函数rddToPairRDDFunction --> PairRDDFunctions对象
         --> val rdd4 = new RDD

    5.三级调度流程源码分析
        [主线程]a. cilent ---创建--->  sc = new SparkContext()  -----> HadoopRDD hd = sc.textFile() --> 进行一系列[上述1-4]的变换
            -----> 返回一个RDD对象给sc ---->返回RDD给Client

        [主线程]b. client拿到rdd -----> [client]rdd.collect() ----> [rdd.collect]sc.runJob() ----> sc.runJob.runJob(rdd,func,分区序列0...n)
            ----> ... ----> DAGSchedule.runJob

        [主线程]c. DAGSchedule.runJob ---->  DAGSchedule.submit():返回一个JobSubmitted[Event]对象:对DAG调度器事件信息的封装
            ----> DAGSchedule.DAGSchedulerEventProcessLoop.post():将JobSubmitted[Event]传递给事件轮询器[DAGSchedulerEventProcessLoop]的EventQuene

        [EventLoopThread]d. DAGSchedulerEventProcessLoop:开启分线程 -----> EventLoopThread
            ----> EventLoop.EventQuene.take():开始轮询,取得event,并传递给DAGSchedulerEventProcessLoop
            ----> 一直轮询,无休止循环线程

        [EventLoopThread]e. DAGSchedulerEventProcessLoop.onReceive(event):收到事件event,并传递给DAGScheduler

        [EventLoopThread]f. DAGScheduler.doOnReceive(event):模式匹配,看事件类型,然后抽取event中封装的信息
            ----> DAGScheduler.handelJobSubmitted(从event中抽取的信息ed: func,rdd,partions...)
            ----> DAGScheduler.createResultStage(func,rdd,partions...):生成finalState
            ----> DAGScheduler.new ActiveJob(finalState,...): 返回job对象
            ----> DAGScheduler.SparkListenerJobStart(job,...):spark监听器事件
            ----> DAGScheduler.SparkListenerBus.post():DAGScheduler通过post的方式将监听器事件,提交给SparkListenerBus.eventQueue(活跃监听总线的事件队列),目的是事件的类型转换,将调度器事件转换成监听器事件
            ----> 开启分线程LiveListenerBusThread,进行状态监控
            ----> 同时继续线程h

        [LiveListenerBusThread]g. SparkListenerBus.eventQueue.poll():无休止循环轮询,取得事件event ----> SpakeListenerBus.postToAll(event)
            ----> 开始监控整个event的进度和状态

        [分线程a]h. DAGScheduler.submitStage(finalStage): 递归调用,逐次提交final之前的state, 直至finalState之前的state(mapStates)都已经提交完毕
            ----> DAGScheduler.submitMissingTasks(state,jobId):对state进行模式匹配,shuffleStage/resultState进行不同的一系列的处理,但是最终都将state转换成taskSet任务集
            ----> DAGScheduler.TaskScheduler.submitTasks(taskSet):DAGScheduler调用自身变量TaskScheduler的提交任务方法,将任务集提交给任务调度器TaskScheduler
            ----> 至此,开启二级Task调度

        [分线程a]i. TaskSchedulerImpl.submitTasks(taskSet) ----> TaskSchedulerImpl.submitTasks.createTaskManager():得到任务集管理器manager
            ----> TaskSchedulerImpl.submitTasks.schedulableBuilder.addTaskSetManger(manager) ----> ....
            ----> TaskSchedulerImpl.submitTasks.SchedulerBackend.reviveOffers():最终调用后台调度器SchedulerBackend的reviveOffers方法
            ----> 至此,开启三级后台调度

        [分线程b]j. SchedulerBackend.reviveOffers() : 开启进程间通信,序列化tasks,得到消息reviveOffers,并且根据进程类型开启相应的后台调度器进程[local,standlone,Coarse],并将消息[reviveOffers]发送给已经开启的后台调度器进程
            ----> [以本地后台调度器为例] ----> LocalSchedulerBackend.localEndPoint.send(reviveOffers) ----> RPCEndPoint通过Netty框架,开始发送reviveOffers给对应的RPC终端localEndPoint[local,stand...]

        [分线程b]k. LocalSchedulerBackend.localEndPoint.receive() : 本地后台调度器RPC终端,接收进程间的消息 ----> LocalSchedulerBackend.reviveOffers():反序列化reviveOffers,取得调度器tasks
            ----> 遍历tasks任务集,并将每个任务提交给Executor[执行程序]

        [分线程c]l. Executor.launchTask() :开启任务,开始执行任务 ----> Executor.new TaskRunner():得到任务运行器tr ----> Executor.runningTasks.put(tr): 将tr put到一个任务列表中
            ----> Executor.threadPool.executor(tr) : 在未来的某个时刻,执行tr.run(),在run函数中调用ShuffleMapTask.run()方法

        [分线程c--主线程]m. ShuffleMapTask.run() ----> ShuffleMapTask.runTask() -----> ShuffleMapTask.new shuffleWriter().write()
            ----> 至此,终于开始调用在WorldCount client中定义的flatMap(x => x.spilt(" "))函数

    6.总结:
        a.Cilent触发rdd的Action行为,通过SpackContext提交作业,开启DAGScheduler一级调度

        b.DAGScheduler首先将SpackContent的作业信息封装成一个作业提交事件JobSubmitted[Event],然后添加到事件队列中,然后开启分线程开始轮询该事件队列

        c.DAGScheduler.OnReceive取得轮询取出的事件并查看事件类型,根据事件类型,抽取事件中的信息,然后进行一系列的封装,最终封装成一个State对象

        d.DAGScheduler.submitStage,DAG调度器开始提交State给TaskScheduler,开启二级调度。

        e.一级调度总结就是:将SpackCotent提交的job信息先封装成Event,然后封装成State,然后将State提交给二级调度

        f.TaskScheduler,二级调度提交任务集,并且将任务集封装成reviveOffers, 通过进程间通信[Netty],将reviveOffers传递给相应的三级底层调度器[local,standlone,ceros...],开启三级调度

        g.三级调度首先创建一个TaskRunner线程,在线程中运行run函数,在run函数中对reviveOffers进行解析,然后将结果传递给ShuffleMapTask

        h.ShuffleMapTask开始新建混洗写入器,开始shuffle操作,ShuffleMapTask.new shuffleWriter().write()

        i.至此开始调用自定义的WorldCount.flatMap函数





Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐