Hive on Spark源码分析(三)—— SparkClilent与SparkClientImpl(上)
SparkClient接口定义了远程Spark客户端的API// 提交一个异步执行的job,返回一个用于监控job的JobHandleT extends Serializable> JobHandleT> submit(JobT> job);// 请求远程context执行job。该方法忽视job队列,建议仅用于执行快速结束的任务。// 返回一个用于监控job的Future结果T exte
·
Hive on Spark源码分析(一)—— SparkTask
Hive on Spark源码分析(二)—— SparkSession与HiveSparkClient
Hive on Spark源码分析(三)—— SparkClilent与SparkClientImpl(上)
Hive on Spark源码分析(四)—— SparkClilent与SparkClientImpl(下)
Hive on Spark源码分析(五)—— RemoteDriver
Hive on Spark源码分析(六)—— RemoteSparkJobMonitor与JobHandle
SparkClient接口定义了远程Spark客户端的API
// 提交一个异步执行的job,返回一个用于监控job的JobHandle<T extends Serializable> JobHandle<T> submit(Job<T> job);// 请求远程context执行job。该方法忽视job队列,建议仅用于执行快速结束的任务。// 返回一个用于监控job的Future结果<T extends Serializable> Future<T> run(Job<T> job);/*** Stops the remote context.** Any pending jobs will be cancelled, and the remote context will be torn down.*/void stop();/*** Adds a jar file to the running remote context.** Note that the URL should be reachable by the Spark driver process. If running the driver* in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist* on that node (and not on the client machine).** @param uri The location of the jar file.* @return A future that can be used to monitor the operation.*/Future<?> addJar(URI uri);/*** Adds a file to the running remote context.** Note that the URL should be reachable by the Spark driver process. If running the driver* in cluster mode, it may reside on a different host, meaning "file:" URLs have to exist* on that node (and not on the client machine).** @param uri The location of the file.* @return A future that can be used to monitor the operation.*/Future<?> addFile(URI uri);/*** Get the count of executors.*/Future<Integer> getExecutorCount();/*** Get default parallelism. For standalone mode, this can be used to get total number of cores.*/Future<Integer> getDefaultParallelism();/*** Check if remote context is still active.*/boolean isActive();
这里代码本身的注释都很清晰,就不做过多解释。
SparkClientImpl是SparkClient接口的具体实现类。我们来看一下它的具体内容。
首先是构造方法,内容较多,首先对属性赋值:
this.conf = conf;this.hiveConf = hiveConf;this.childIdGenerator = new AtomicInteger();this.jobs = Maps.newConcurrentMap();String clientId = UUID.randomUUID().toString();//产生secret用于与remoteDriver建立连接时的身份认证String secret = rpcServer.createSecret();//startDriver用来在新的进程中启动RemoteDriver,并返回一个接受执行结果的线程this.driverThread = startDriver(rpcServer, clientId, secret);//创建ClientProtocol用于rpc通信,发送消息(提交任务),以及响应远端发来的消息this.protocol = new ClientProtocol();
try {// The RPC server will take care of #rpc client connection# timeouts here.//向远端RPCServer注册rpc客户端this.driverRpc = rpcServer.registerClient(clientId, secret, protocol).get();} catch (Throwable e) {if (e.getCause() instanceof TimeoutException) {LOG.error("Timed out waiting for client to connect.\\nPossible reasons include network " +"issues, errors in remote driver or the cluster has no available resources, etc." +"\\nPlease check YARN or Spark driver\'s logs for further information.\\nReason2 from SparkClientImpl", e);} else {LOG.error("Error while waiting for client to connect.", e);}//终端driverThread线程driverThread.interrupt();try {//等待driverThread挂掉driverThread.join();} catch (InterruptedException ie) {// Give up.LOG.debug("Interrupted before driver thread was finished.");}throw Throwables.propagate(e);}
在继续往下看之前我们先来看一下注册rpc client的具体实现。这个过程实际是在RpcServer中的另一个registerClient方法中实现。首先创建一个promise监控注册任务执行状态
@VisibleForTestingFuture<Rpc> registerClient(final String clientId, String secret,RpcDispatcher serverDispatcher, long clientTimeoutMs) {final Promise<Rpc> promise = group.next().newPromise();
Runnable timeout = new Runnable() {@Overridepublic void run() {promise.setFailure(new TimeoutException("Timed out waiting for client connection."));}};//在clientTimeoutMs时间后执行timeout,单位是ms,且仅执行一次.//根据timeout的run方法,这里就是在timeout时间后,如果promise还没有完成,则执行promise.setFailureScheduledFuture<?> timeoutFuture = group.schedule(timeout,clientTimeoutMs,TimeUnit.MILLISECONDS);final ClientInfo client = new ClientInfo(clientId, promise, secret, serverDispatcher,timeoutFuture);//判断是否已经注册过该client了if (pendingClients.putIfAbsent(clientId, client) != null) {throw new IllegalStateException(String.format("Client \'%s\' already registered.", clientId));}
其中clientTimeoutMs是通过在配置文件中的hive.spark.client.server.connect.timeout属性配置的
final ClientInfo client = new ClientInfo(clientId, promise, secret, serverDispatcher,timeoutFuture);//判断是否已经注册过该client了if (pendingClients.putIfAbsent(clientId, client) != null) {throw new IllegalStateException(String.format("Client \'%s\' already registered.", clientId));}promise.addListener(new GenericFutureListener<Promise<Rpc>>() {@Overridepublic void operationComplete(Promise<Rpc> p) {if (!p.isSuccess()) {pendingClients.remove(clientId);}}});return promise;
driverRpc.addListener(new Rpc.Listener() {@Overridepublic void rpcClosed(Rpc rpc) {//如果rpc通信关闭时,当前SparkClient仍然是alive的,则打印warn信息if (isAlive) {LOG.warn("Client RPC channel closed unexpectedly.");isAlive = false;}}});//实例化后标记为isAliveisAlive = true;
下面看一下startDriver方法的实现。首先是获得rpcServer的host和port,后面需要用到传给RemoteDriver
private Thread startDriver(final RpcServer rpcServer, final String clientId, final String secret)throws IOException {Runnable runnable;final String serverAddress = rpcServer.getAddress();final String serverPort = String.valueOf(rpcServer.getPort());
然后判断是否设置了spark.client.do_not_use.run_driver_in_process,是的话直接在当前进程启动RemoteDriver。此模式建议仅作测试用,不要应用到生产环境
LOG.warn("!!!! Running remote driver in-process. !!!!");runnable = new Runnable() {@Overridepublic void run() {List<String> args = Lists.newArrayList();args.add("--remote-host");args.add(serverAddress);args.add("--remote-port");args.add(serverPort);args.add("--client-id");args.add(clientId);args.add("--secret");args.add(secret);for (Map.Entry<String, String> e : conf.entrySet()) {args.add("--conf");args.add(String.format("%s=%s", e.getKey(), conf.get(e.getKey())));}try {RemoteDriver.main(args.toArray(new String[args.size()]));} catch (Exception e) {LOG.error("Error running driver.", e);}}};}
String sparkHome = conf.get(SPARK_HOME_KEY);if (sparkHome == null) {sparkHome = System.getenv(SPARK_HOME_ENV);}if (sparkHome == null) {sparkHome = System.getProperty(SPARK_HOME_KEY);}String sparkLogDir = conf.get("hive.spark.log.dir");if (sparkLogDir == null) {if (sparkHome == null) {sparkLogDir = "./target/";} else {sparkLogDir = sparkHome + "/logs/";}}
创建一个文件用来保存spark-submit用到的所有配置:
File properties = File.createTempFile("spark-submit.", ".properties");if (!properties.setReadable(false) || !properties.setReadable(true, true)) {throw new IOException("Cannot change permissions of job properties file.");}properties.deleteOnExit();//用来保存配置Properties allProps = new Properties();
首先加载spark-default.conf中的配置:
try {URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf");if (sparkDefaultsUrl != null) {LOG.info("Loading spark defaults: " + sparkDefaultsUrl);allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl)));}} catch (Exception e) {String msg = "Exception trying to load spark-defaults.conf: " + e;throw new IOException(msg, e);}
for (Map.Entry<String, String> e : conf.entrySet()) {allProps.put(e.getKey(), conf.get(e.getKey()));}allProps.put(SparkClientFactory.CONF_CLIENT_ID, clientId);allProps.put(SparkClientFactory.CONF_KEY_SECRET, secret);allProps.put(DRIVER_OPTS_KEY, driverJavaOpts);allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts);... ... ...
Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8);try {allProps.store(writer, "Spark Context configuration");} finally {writer.close();}
由于要新建子进程启动RemoteDriver,因此下面需要决定以何种方式将配置选项传给子进程。如果是以local模式或者yarn-client模式运行,则需要在命令行显示的传递参数;如果是yarn-cluster模式则交给spark-submit来处理。
首先创建一个参数列表argv(最终会转化为在命令行执行的命令),根据不同情况添加不同的参数:
List<String> argv = Lists.newArrayList();
如果启用kerberos认证,添加kerberos认证的相应参数:
if (hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION).equalsIgnoreCase("kerberos")) {argv.add("kinit");String principal = SecurityUtil.getServerPrincipal(hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL),"0.0.0.0");String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);argv.add(principal);argv.add("-k");argv.add("-t");argv.add(keyTabFile + ";");}
接下来添加spark-submit命令。如果配置了spark home则直接添加spark_home/bing/spark-submit命令到argv,否则先添加java_home/bin/java
if (sparkHome != null) {argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath());} else {LOG.info("No spark.home provided, calling SparkSubmit directly.");argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());
获取spark.master
String master = conf.get("spark.master");Preconditions.checkArgument(master != null, "spark.master is not defined.");
如果运行模式为local、mesos、yarn-client、standalone模式中的一种,则需要配置spark.driver.memory、spark.driver.extraPath、spark.driver.extraLibPath等
if (sparkHome != null) {argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath());} else {LOG.info("No spark.home provided, calling SparkSubmit directly.");argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());//如果运行模式为local或client模式(其实就是除了yarn-cluster以为的模式)if (master.startsWith("local") || master.startsWith("mesos") || master.endsWith("-client") || master.startsWith("spark")) {String mem = conf.get("spark.driver.memory");if (mem != null) {argv.add("-Xms" + mem);argv.add("-Xmx" + mem);}//配置classpathString cp = conf.get("spark.driver.extraClassPath");if (cp != null) {argv.add("-classpath");argv.add(cp);}String libPath = conf.get("spark.driver.extraLibPath");if (libPath != null) {argv.add("-Djava.library.path=" + libPath);}String extra = conf.get(DRIVER_OPTS_KEY);if (extra != null) {for (String opt : extra.split("[ ]")) {if (!opt.trim().isEmpty()) {argv.add(opt.trim());}}}}
然后添加spark-submit命令(所有模式下)
argv.add("org.apache.spark.deploy.SparkSubmit")
if (master.equals("yarn-cluster")) {String executorCores = conf.get("spark.executor.cores");if (executorCores != null) {argv.add("--executor-cores");argv.add(executorCores);}String executorMemory = conf.get("spark.executor.memory");if (executorMemory != null) {argv.add("--executor-memory");argv.add(executorMemory);}String numOfExecutors = conf.get("spark.executor.instances");if (numOfExecutors != null) {argv.add("--num-executors");argv.add(numOfExecutors);}}
如果设置了hive.server2.enable.doas为true的话,也就是允许使用发起请求的用户来执行Hive操作,则需要将用户名添加到配置
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {try {String currentUser = Utils.getUGI().getShortUserName();// do not do impersonation in CLI modeif (!currentUser.equals(System.getProperty("user.name"))) {LOG.info("Attempting impersonation of " + currentUser);argv.add("--proxy-user");argv.add(currentUser);}} catch (Exception e) {String msg = "Cannot obtain username: " + e;throw new IllegalStateException(msg, e);}}
接下来添加前面创建用来保存配置的文件的路径,以及配置主类为RemoteDriver,配置当前类的jar包,远程host和port
argv.add("--properties-file");argv.add(properties.getAbsolutePath());argv.add("--class");argv.add(RemoteDriver.class.getName());String jar = "spark-internal";if (SparkContext.jarOfClass(this.getClass()).isDefined()) {jar = SparkContext.jarOfClass(this.getClass()).get();}argv.add(jar);argv.add("--remote-host");argv.add(serverAddress);argv.add("--remote-port");argv.add(serverPort);
最后,将以hive.spark开头的配置项,以“--conf key=value”的形式添加到参数列表中以传递给RemoteDriver
for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) {String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey);argv.add("--conf");argv.add(String.format("%s=%s", hiveSparkConfKey, value));}
String cmd = Joiner.on(" ").join(argv);LOG.info("Running client driver with argv: {}", cmd);ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd);
启动进程
// 使Hive配置在spark中不可见,以防互相影响pb.environment().remove("HIVE_HOME");pb.environment().remove("HIVE_CONF_DIR");if (isTesting != null) {pb.environment().put("SPARK_TESTING", isTesting);}final Process child = pb.start();int childId = childIdGenerator.incrementAndGet();final List<String> childErrorLog = new ArrayList<String>();//重定向新进程的输出redirect("stdout-redir-" + childId, new Redirector(child.getInputStream()));redirect("stderr-redir-" + childId, new Redirector(child.getErrorStream(), childErrorLog));
runnable = new Runnable() {@Overridepublic void run() {try {//阻塞等待进程的执行结果int exitCode = child.waitFor();if (exitCode != 0) {StringBuilder errStr = new StringBuilder();for (String s : childErrorLog) {errStr.append(s);errStr.append(\'\\n\');}rpcServer.cancelClient(clientId,"Child process exited before connecting back with error log " + errStr.toString());LOG.warn("Child process exited with code {}", exitCode);}} catch (InterruptedException ie) {LOG.warn("Waiting thread interrupted, killing child process.");Thread.interrupted();child.destroy();} catch (Exception e) {LOG.warn("Exception while waiting for child process.", e);}}};}Thread thread = new Thread(runnable);thread.setDaemon(true);thread.setName("Driver");thread.start();return thread;
至此,在SparkClientImpl中启动driver的过程就结束了。
在下篇中我们回到任务提交的流程,去分析 SparkClientImpl中提交任务的方法,以及与提交任务息息相关的内部类ClientProtocol。
更多推荐



所有评论(0)