输入文件 rain_in/data.txt、data1.txt、data2.txt(62m) 会产生4个切片
1、进入job的任务提交方法job.waitForCompletion
在这里插入图片描述

2、首先检查job的状态,然后进入提交方法submit();
在这里插入图片描述

3、ensureState确保状态
setUseNewAPI使用新的API
Connect()获得链接的方法,从此方法中能够获得本地或是yarn工作的链接(本地或是yarn)
进入connect();
在这里插入图片描述

4、首先进行判断集群cluster是否为null,如果为null就创建一个新的集群,通过Cluster()方法(对此方法传了初始值conf),getConfiguration()是将核心加入进去(site、core、yarn等.xml文件)
在这里插入图片描述

5、进入Cluster集群构建方法
在这里插入图片描述

 在Cluster的构造方法内有initialize()集群初始化方法(对其传了初始值jobTrackAddr,conf)

6、进入initialize方法,此方法分为两部分
第一部分:得到客户端协议提供者ClientProtocolProvider与客户端协议clientProtocol(选择工作在本地上还是yarn上),因为在本地执行任务所以会进入创建本地集群的.create()方法。
在这里插入图片描述

进入.create(conf)方法,构建了一个执行mr任务的框架名称为local和返回了一个本地对象。
在这里插入图片描述

此时的协议提供者clientProtocol(执行mr任务的集群)就为本地
在这里插入图片描述

第二部分:集群获取完毕并进行赋值以及日志的输出(输出的执行mr任务集群的名字)
在这里插入图片描述

此时跑mr任务的集群已经搭建完毕initialize方法结束
在这里插入图片描述

继续向下运行会返回一个Subject.doAs对象,connect方法结束(链接获取完毕,确定好mr任务要运行在哪一个集群 )
7、进入getJobSubmitter()方法(封装集群对象)
为其传了初始值cluster.getFileSystem()(已搭建好集群的文件)、cluster.getClient()(已搭建好集群的协议“本地”)
在这里插入图片描述

8、进入JobSubmitter()方法(封装集群对象)
为其提供了初始值文件系统、提交协议
在这里插入图片描述

9、进入submitJobInternal()提交作业内部方法
在这里插入图片描述

为其赋初始值要进行的job以及运行job的集群
(1)、检查job是否合法;
(2)、将job的核心设置conf拿到;
(3)、将conf加载到MR框架的分布式缓存中;
(4)、然后通过JobSubmissionFiles.getStagingDir(cluster, conf)再本地创建一个运行mr任务的一级目录,因为hadoop是基于磁盘进行数据分析的。
(5)、通过inet地址得到本的主机名以及ip地址
在这里插入图片描述
(6)、进行赋值
在这里插入图片描述

(7)、创建job的id以及创建运行job的二级目录(job-id)
在这里插入图片描述
此时会在自己的电脑tem文件夹中看到这样一个目录但是其中没有内容。因为还没有加载进去。
在这里插入图片描述
10、进入writeSplits()方法(获取切片信息并返回切片数量)
将需要运行的job以及提交的工作目录赋给writeSplits初值
在这里插入图片描述
(1)、获取配置文件信息conf,然后通过writeNewSplits获取切片数量
在这里插入图片描述
(2)、进入writeNewSplits方法并为其赋初值,job为需要运行的任务,jobSubmitDir为存放运行内容的文件(tmp下的文件)
在这里插入图片描述
在这里插入图片描述

11、进入getSplits()方法(实际对需要进行计算的文件进行切片的方法)
赋初值将所需要进行的工作传入getSplits中
在这里插入图片描述
在这里插入图片描述

(1)、监测者(相当于单片机中的看门狗)用来监测程序是否执行完毕
在这里插入图片描述

(2)、创建两个数组用来存放切片以及目标文件
在这里插入图片描述

(3)、将需要进行切片的目录得到并将其中的文件挨个进行切片处理(iter循环)
在这里插入图片描述

(4)、得到文件地址、得到文件大小
在这里插入图片描述

(5)、用instanceof方法判断文件是否在本地中,在的话用BlockLocation[]数组接收文件的block信息
在这里插入图片描述

(6)、判断job路径与path路径是否一致,用blockSize存放文件块的大小(win下32m、hdfs下128m),splitsSize切片大小默认与block大小一致
在这里插入图片描述

(7)、对文件进行切片处理,bytesRemaining剩余字节=文件大小。
用byteremaining/splitsSize切片大小如果大于切片的1.1倍,那么就将此文件逻辑上分为几个切片(由文件大小决定)。
Length-bytesRemaining偏移量相当于.seek()方法,从指定的位置开始切割;bytesRemaining -= splitSize 实时更新文件剩余的大小。
在这里插入图片描述

(8)、if循环在while的基础上进行。
如果上述while循环最后一次循环bytesRemaining/splits小于splits的1.1倍那么将会执行次循环;将剩余文件作为一个单独的切片来处理,处理方式与while中的一致,也将这个切片添加到切片集合中去。
else中内容,如果bytesRemaining大小为零那么将会直接创建一个大小为零的切片来存放信息。
在这里插入图片描述

(9)、if(isSplitable(job,path))的esle
如果一开始进来的文件大小就为0,那么直接创建一个大小为0的切片
在这里插入图片描述

(10)、将信息储存
在这里插入图片描述

(11)、方法运行结束sw.stop停止输出日志,返回splits
在这里插入图片描述

(12)、控制台信息
在这里插入图片描述

getSplits方法结束
12、writeNewSplits剩余语句
将返回的splits集合用toarry方法转换成数组;将数组内切片用sort方法进行排序;将信息提交到任务目录中;方会数组长度maptask数量。
在这里插入图片描述

WriteNewSplits方法结束
13、writeSplits方法结束
返回maptask的个数
在这里插入图片描述

14、运行完成后将工作目录中的内容删除
在这里插入图片描述

15、设置完成后程序状态变为运行状态
在这里插入图片描述

16、monitorAndPrintJob方法执行提交的任务
在这里插入图片描述
* 可能有很多不对的地方还望大家海涵,同时也欢迎给位大佬批评指正!!*

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐