QUARTZ基于SPARK LAUNCHER 驱动SPARK离线作业
场景:不同业务下输出了若干Spark的离线任务,原有的方式我们是基于Linux的CRON任务来驱动,不好进行监管、日志不好收集、服务不集中、难于管理、不好配置、资源调度不灵活。SparkLauncher 提供了编程方式提交Spark应用的方式提交Spark应用程序,可以结合Springboot、Quartz等技术来管理Spark任务的提交行为。SparkLauncher APIhttp:/...
场景:不同业务下输出了若干Spark的离线任务,原有的方式我们是基于Linux的CRON任务来驱动,不好进行监管、日志不好收集、服务不集中、难于管理、不好配置、资源调度不灵活。SparkLauncher 提供了编程方式提交Spark应用的方式提交Spark应用程序,可以结合Springboot、Quartz等技术来管理Spark任务的提交行为。
SparkLauncher API
http://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/launcher/SparkLauncher.html
核心类为SparkLauncher,核心方法如下:
| Modifier and Type | Method and Description |
|---|---|
SparkLauncher |
addAppArgs(String... args)
Adds command line arguments for the application. |
SparkLauncher |
addFile(String file)
Adds a file to be submitted with the application. |
SparkLauncher |
addJar(String jar)
Adds a jar file to be submitted with the application. |
SparkLauncher |
addPyFile(String file)
Adds a python file / zip / egg to be submitted with the application. |
SparkLauncher |
addSparkArg(String arg)
Adds a no-value argument to the Spark invocation. |
SparkLauncher |
addSparkArg(String name, String value)
Adds an argument with a value to the Spark invocation. |
SparkLauncher |
directory(java.io.File dir)
Sets the working directory of spark-submit. |
Process |
launch()
Launches a sub-process that will start the configured Spark application. |
SparkLauncher |
redirectError()
Specifies that stderr in spark-submit should be redirected to stdout. |
SparkLauncher |
redirectError(java.io.File errFile)
Redirects error output to the specified File. |
SparkLauncher |
redirectError(ProcessBuilder.Redirect to)
Redirects error output to the specified Redirect. |
SparkLauncher |
redirectOutput(java.io.File outFile)
Redirects error output to the specified File. |
SparkLauncher |
redirectOutput(ProcessBuilder.Redirect to)
Redirects standard output to the specified Redirect. |
SparkLauncher |
redirectToLog(String loggerName)
Sets all output to be logged and redirected to a logger with the specified name. |
SparkLauncher |
setAppName(String appName)
Set the application name. |
SparkLauncher |
setAppResource(String resource)
Set the main application resource. |
SparkLauncher |
setConf(String key, String value)
Set a single configuration value for the application. |
static void |
setConfig(String name, String value)
Set a configuration value for the launcher library. |
SparkLauncher |
setDeployMode(String mode)
Set the deploy mode for the application. |
SparkLauncher |
setJavaHome(String javaHome)
Set a custom JAVA_HOME for launching the Spark application. |
SparkLauncher |
setMainClass(String mainClass)
Sets the application class name for Java/Scala applications. |
SparkLauncher |
setMaster(String master)
Set the Spark master for the application. |
SparkLauncher |
setPropertiesFile(String path)
Set a custom properties file with Spark configuration for the application. |
SparkLauncher |
setSparkHome(String sparkHome)
Set a custom Spark installation location for the application. |
SparkLauncher |
setVerbose(boolean verbose)
Enables verbose reporting for SparkSubmit. |
SparkAppHandle |
startApplication(SparkAppHandle.Listener... listeners)
Starts a Spark application. |
方法见名知意,核心实现是构造一个类似spark-submit提交spark任务的行为。参数构造完成后,可以使用launcher方法或者startApplicatioin(...)方法提交Spark应用程序。后者可直接监听任务的运行状态。
Spark Launcher 官方示例
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
public class MyLauncher {
public static void main(String[] args) throws Exception {
SparkAppHandle handle = new SparkLauncher()
.setAppResource("/my/app.jar")
.setMainClass("my.spark.app.Main")
.setMaster("local")
.setConf(SparkLauncher.DRIVER_MEMORY, "2g")
.startApplication();
// Use handle API to monitor / control application.
}
}
import org.apache.spark.launcher.SparkLauncher;
public class MyLauncher {
public static void main(String[] args) throws Exception {
Process spark = new SparkLauncher()
.setAppResource("/my/app.jar")
.setMainClass("my.spark.app.Main")
.setMaster("local")
.setConf(SparkLauncher.DRIVER_MEMORY, "2g")
.launch();
spark.waitFor();
}
}
实例
- 创建Spark任务模版
public abstract class SparkTask implements Runnable {
private Log log = LogFactory.getLog(SparkTask.class);
// 允许执行一个初始化脚本
private String cmds;
// YARN 任务的用户名
private String user;
private String javaHome;
// SPARK HOME
private String sparkHome;
private String appName = "测试";
private String deployMode;
private String master;
private String mainClass;
private String appResource;
private String propertiesFile;
private boolean verbose;
private String filePath;
private String jarPath;
private String sparkArgs;
private String appArgs;
public abstract void before();
public void start() {
before();
run();
}
public void builderParams(SparkLauncher launcher) {
if (!StringUtils.isEmpty(getSparkHome())) {
launcher.setSparkHome(getSparkHome());
}
if (!StringUtils.isEmpty(getAppName())) {
launcher.setAppName(getAppName());
}
if (!StringUtils.isEmpty(getDeployMode())) {
launcher.setDeployMode(getDeployMode());
}
if (!StringUtils.isEmpty(getMaster())) {
launcher.setMaster(getMaster());
}
if (!StringUtils.isEmpty(getMainClass())) {
launcher.setMainClass(getMainClass());
}
if (!StringUtils.isEmpty(getAppResource())) {
launcher.setAppResource(getAppResource());
}
if (!StringUtils.isEmpty(getPropertiesFile())) {
Path path = Paths.get(getPropertiesFile());
if (Files.exists(path) && !Files.isDirectory(path)) {
launcher.setPropertiesFile(getPropertiesFile());
} else {
log.warn("propertiesFile parh load error." + getPropertiesFile());
}
}
if (!StringUtils.isEmpty(getSparkArgs())) {
launcher.addSparkArg(getSparkArgs());
}
if (!StringUtils.isEmpty(getAppArgs())) {
launcher.addAppArgs(getAppArgs());
}
launcher.setVerbose(isVerbose());
if (!StringUtils.isEmpty(getFilePath())) {
Path path = Paths.get(getFilePath());
// 目录
if (Files.exists(path) && Files.isDirectory(path)) {
File files[] = path.toFile().listFiles();
for (File file : files) {
launcher.addFile(file.getAbsolutePath());
}
} else if (Files.exists(path) && path.toFile().isFile()) {
launcher.addFile(getFilePath());
} else {
log.warn("file parh load error." + getFilePath());
}
}
if (!StringUtils.isEmpty(getJarPath())) {
Path path = Paths.get(getJarPath());
// 目录
if (Files.exists(path) && Files.isDirectory(path)) {
File files[] = path.toFile().listFiles();
for (File file : files) {
launcher.addJar(file.getAbsolutePath());
}
} else if (Files.exists(path) && path.toFile().isFile()) {
launcher.addJar(getFilePath());
} else {
log.warn("jar parh load error." + getJarPath());
}
}
}
}
- 创建作业BEAN
public class TransitVehicle extends SparkTask {
private static final Log log = LogFactory.getLog(TransitVehicle.class);
private SparkLauncher launcher = null;
@Override
public void before() {
// 加载环境变量
Map<String, String> emvs = new HashMap<>();
if (!StringUtils.isEmpty(getCmds())) {
try {
Runtime.getRuntime().exec(getCmds());
} catch (IOException e) {
e.printStackTrace();
log.equals(e);
}
}
if (!StringUtils.isEmpty(getUser())) {
emvs.put("HADOOP_USER_NAME", getUser());
}
// 初始化SparkLauncher
launcher = new SparkLauncher(emvs);
// 构造作业参数
builderParams(launcher);
}
@Override
public void run() {
DateTime current = new DateTime();
log.info("计算服务启动:" + current.toString("yyyy-MM-dd HH:mm:ss"));
long start = System.currentTimeMillis();
final CountDownLatch countDownLatch = new CountDownLatch(1);
try {
launcher.startApplication(new SparkAppHandle.Listener() {
@Override
public void stateChanged(SparkAppHandle sparkAppHandle) {
if (sparkAppHandle.getState().isFinal()) {
countDownLatch.countDown();
}
log.info("state:" + sparkAppHandle.getState().toString());
}
@Override
public void infoChanged(SparkAppHandle sparkAppHandle) {
/// System.out.println("Info:" +
/// sparkAppHandle.getState().toString());
}
});
} catch (Exception e) {
log.error(e);
e.printStackTrace();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
log.info("计算服务结束,计算用时:" + (end - start) + "毫秒");
}
@Override
public String toString() {
return "TransitVehicle [launcher=" + launcher + ", getJavaHome()=" + getJavaHome() + ", getSparkHome()="
+ getSparkHome() + ", getAppName()=" + getAppName() + ", getDeployMode()=" + getDeployMode()
+ ", getMaster()=" + getMaster() + ", getMainClass()=" + getMainClass() + ", getAppResource()="
+ getAppResource() + ", getPropertiesFile()=" + getPropertiesFile() + ", isVerbose()=" + isVerbose()
+ ", getFilePath()=" + getFilePath() + ", getJarPath()=" + getJarPath() + ", getAppArgs()="
+ getAppArgs() + ", getSparkArgs()=" + getSparkArgs() + ", getCmds()=" + getCmds() + ", getUser()="
+ getUser() + ", toString()=" + super.toString() + ", getClass()=" + getClass() + ", hashCode()="
+ hashCode() + "]";
}
}
- 配置QUARTZ
@Configuration
public class SparkCronDriveConfiguration {
@Bean
public TaskScheduler scheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10);
scheduler.setThreadGroupName("offline-task");
return scheduler;
}
@Bean
@Lazy
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
}
- Spring.xml配置调度周期,不开放,所有属性基于属性文件进行读取
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.3.xsd">
<bean abstract="true" id="base">
<property name="user" value="${spark.user:'hdfs'}"></property>
<property name="sparkHome" value="${spark.home:''}"></property>
<property name="master" value="${spark.master:'local[*]'}"></property>
</bean>
<!-- 任务配置 -->
<bean id="transitVehicle"
class="com.ehl.tvc.offlineanalysis.transitvehicle.server.TransitVehicle"
parent="base">
<property name="appName" value="${transitvehicle.appname:'TransitVehicle'}"></property>
<property name="cmds" value="${transitvehicle.cmds:''}"></property>
<property name="filePath" value="${transitvehicle.filepath:''}"></property>
<property name="jarPath" value="${transitvehicle.jarpath:''}"></property>
<property name="mainClass" value="${transitvehicle.mainclass}"></property>
<property name="propertiesFile" value="${transitvehicle.propertiesfile}"></property>
<property name="appResource" value="${transitvehicle.appresource}"></property>
<property name="appArgs" value="${transitvehicle.appargs:''}"></property>
</bean>
<!-- 注册JOBDETIL -->
<bean id="transitVehicleJob"
class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
<property name="targetObject" ref="transitVehicle" />
<property name="targetMethod" value="start" />
</bean>
<!--注触发器 -->
<bean id="transitVehicleTrigger"
class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="cronExpression" value="${transitvehicle.cronexpression}"></property>
<property name="jobDetail" ref="transitVehicleJob"></property>
</bean>
<!-- 调度器 -->
<bean id="scheduler"
class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<list>
<ref bean="transitVehicleTrigger" />
</list>
</property>
<property name="autoStartup" value="false"></property>
</bean>
</beans>
- 作业属性配置,开放配置文件,使用属性文件进行配置
#logging.config: ${user.dir}/config/logback.xml
spring.metrics.servo.enabled=false
management.security.enabled: false
#SPARK基础信息配置
spark.home=/usr/hdp/current/spark-client
spark.user=hdfs
spark.master=yarn-client
#作业配置
transitvehicle.appname=transitVehicle
transitvehicle.basepath=/app/tvc-offline-analysis
transitvehicle.cmds=cp -rf ${transitvehicle.basepath}/conf/* ${transitvehicle.basepath}/lib
transitvehicle.filepath=${transitvehicle.basepath}/conf
transitvehicle.jarpath=${transitvehicle.basepath}/lib
transitvehicle.mainclass=com.ehl.tvc.offlineanalysis.transitvehicle.TransitVehicleMain
transitvehicle.propertiesfile=./transitvehicle.properties
transitvehicle.appresource=${transitvehicle.basepath}/lib/tvc-offline-analysis-1.1.1.jar
transitvehicle.appargs=2018-08-10
transitvehicle.cronexpression=0 0/10 * * * ?
- 作业资源配置,对应上面的transitvehicle.propertiesfile文件内容
spark.driver.memory=2g
spark.executor.cores=2
spark.driver.extraClassPath=/app/lib
spark.driver.extraJavaOptions=-Xms2048m -Xmx2048m
- 启动任务,启动调度
@SpringBootApplication
@ImportResource(locations = "classpath:config/spring.xml")
public class SparkCronDrive {
public static void main(String[] args) throws SchedulerException {
ConfigurableApplicationContext context = SpringApplication.run(SparkCronDrive.class, args);
Scheduler scheduler = context.getBean(Scheduler.class);
// 启动Schedule 服务
scheduler.start();
}
}更多推荐



所有评论(0)