700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > Spark Submit任务提交流程

Spark Submit任务提交流程

时间:2020-11-13 07:40:32

相关推荐

Spark Submit任务提交流程

1,简介

在上一篇博客中,我们详细介绍了Spark Standalone模式下集群的启动流程。在Spark 集群启动后,我们要想在集群上运行我们自己编写的程序,该如何做呢?本篇博客就主要介绍Spark Submit提交任务的流程。

2,Spark 任务的提交

我们可以从spark 的官网看到,spark-submit的提交格式如下:

./bin/spark-submit

–class

–master

–deploy-mode

–conf =

… # other options

[application-arguments]

• --class: The entry point for your application (e.g. org.apache.spark.examples.SparkPi) 应用程序的入口

• --master: The master URL for the cluster (e.g. spark://23.195.26.187:7077) master 的URL

• --deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client) † 集群的部署模式

• --conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown). Spark的配置文件

• application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes. 自己编写的jar包的路径

• application-arguments: Arguments passed to the main method of your main class, if any 需要传入的参数

一个具体的实例如下:

#Run on a Spark standalone cluster in cluster deploy mode with supervise

./bin/spark-submit

–class org.apache.spark.examples.SparkPi

–master spark://207.184.161.138:7077

–deploy-mode cluster

–supervise

–executor-memory 20G

–total-executor-cores 100

/path/to/examples.jar

1000

提交任务要使用$SPARK_HOME下bin目录里面的spark-submit脚本,我们来分析一下这个脚本:

//判断SPARK_HOME的目录是否存在if [ -z "${SPARK_HOME}" ]; thensource "$(dirname "$0")"/find-spark-homefi# disable randomized hash for string in Python 3.3+export PYTHONHASHSEED=0//调用bin目录下的spark-class脚本exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

我们再次进入spark-class的脚本:

//判断SPARK_HOME的目录是否存在if [ -z "${SPARK_HOME}" ]; thensource "$(dirname "$0")"/find-spark-homefi//加载spark-env.sh 文件. "${SPARK_HOME}"/bin/load-spark-env.sh# Find the java binary//检测java的路径if [ -n "${JAVA_HOME}" ]; thenRUNNER="${JAVA_HOME}/bin/java"elseif [ "$(command -v java)" ]; thenRUNNER="java"elseecho "JAVA_HOME is not set" >&2exit 1fifi# Find Spark jars.//检测jars是否存在if [ -d "${SPARK_HOME}/jars" ]; thenSPARK_JARS_DIR="${SPARK_HOME}/jars"elseSPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"fiif [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; thenecho "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2echo "You need to build Spark with the target \"package\" before running this program." 1>&2exit 1elseLAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"fi# Add the launcher build dir to the classpath if requested.if [ -n "$SPARK_PREPEND_CLASSES" ]; thenLAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"fi# For testsif [[ -n "$SPARK_TESTING" ]]; thenunset YARN_CONF_DIRunset HADOOP_CONF_DIRfi# The launcher library will print arguments separated by a NULL character, to allow arguments with# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating# an array that will be used to exec the final command.## The exit code of the launcher is appended to the output, so the parent shell removes it from the# command array and checks the value to see if the launcher succeeded.build_command() {//执行org.apache.spark.launcher.Main的main函数,解析参数"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"printf "%d\0" $?}# Turn off posix mode since it does not allow process substitutionset +o posixCMD=()while IFS= read -d '' -r ARG; do把命令添加到CMD中CMD+=("$ARG")//调用方法创建执行命令done < <(build_command "$@")COUNT=${#CMD[@]}LAST=$((COUNT - 1))LAUNCHER_EXIT_CODE=${CMD[$LAST]}# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes# the code that parses the output of the launcher to get confused. In those cases, check if the# exit code is an integer, and if it's not, handle it as a special error case.if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; thenecho "${CMD[@]}" | head -n-1 1>&2exit 1fiif [ $LAUNCHER_EXIT_CODE != 0 ]; thenexit $LAUNCHER_EXIT_CODEfiCMD=("${CMD[@]:0:$LAST}")//执行命令exec "${CMD[@]}"

spark-class的脚本,最主要的就是解析参数,以及创建执行命令,把命令交给spark-class的CMD进行执行。我们进入org.apache.spark.launcher.Main这个类里面,然后看一main函数:

public static void main(String[] argsArray) throws Exception {//检查参数checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");//创建一个链表存放参数List<String> args = new ArrayList<>(Arrays.asList(argsArray));//我们提交任务时,第一个参数就是要进入org.apache.spark.deploy.SparkSubmit,它也是第一个参数,把这个参数移除,剩下的参数组成argsString className = args.remove(0);boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));Map<String, String> env = new HashMap<>();List<String> cmd;//classNameif (className.equals("org.apache.spark.deploy.SparkSubmit")) {try {//创建一个命令解析器,创建spark-class中exec执行的commandAbstractCommandBuilder builder = new SparkSubmitCommandBuilder(args);cmd = buildCommand(builder, env, printLaunchCommand);} catch (IllegalArgumentException e) {printLaunchCommand = false;System.err.println("Error: " + e.getMessage());System.err.println();MainClassOptionParser parser = new MainClassOptionParser();try {//解析参数parser.parse(args);} catch (Exception ignored) {// Ignore parsing exceptions.}List<String> help = new ArrayList<>();if (parser.className != null) {//把CLASS和classname加入到help链表中help.add(parser.CLASS);help.add(parser.className);}help.add(parser.USAGE_ERROR);AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(help);cmd = buildCommand(builder, env, printLaunchCommand);}} else {AbstractCommandBuilder builder = new SparkClassCommandBuilder(className, args);cmd = buildCommand(builder, env, printLaunchCommand);}//如果是window环境,那么就创建windows环境下的命令if (isWindows()) {System.out.println(prepareWindowsCommand(cmd, env));} else {// In bash, use NULL as the arg separator since it cannot be used in an argument.List<String> bashCmd = prepareBashCommand(cmd, env);for (String c : bashCmd) {System.out.print(c);System.out.print('\0');}}}

上面最重要的是完成了一些参数的解析,参数解析正确后会把需要执行的命令加进CMD的数组中,在spark-class的脚本中进行执行,然后进入到org.apache.spark.deploy.SparkSubmit这个类中,看一下main函数:

override def main(args: Array[String]): Unit = {//创建一个SparkSubmit的对象,val submit = new SparkSubmit() {self =>override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {//创建Spark的提交参数new SparkSubmitArguments(args) {override protected def logInfo(msg: => String): Unit = self.logInfo(msg)override protected def logWarning(msg: => String): Unit = self.logWarning(msg)}}override protected def logInfo(msg: => String): Unit = printMessage(msg)override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")override def doSubmit(args: Array[String]): Unit = {try {//调用父类的doSubmit方法super.doSubmit(args)} catch {case e: SparkUserAppException =>exitFn(e.exitCode)}}}submit.doSubmit(args)}

再进入到doSubmit的方法中

def doSubmit(args: Array[String]): Unit = {// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to// be reset before the application starts.val uninitLog = initializeLogIfNecessary(true, silent = true)//获取解析的参数val appArgs = parseArguments(args)if (appArgs.verbose) {logInfo(appArgs.toString)}//根据SparkSubmitAction的动作进行模式匹配,我们这里是SUBMIT,所以要进入submit的方法appArgs.action match {case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)case SparkSubmitAction.KILL => kill(appArgs)case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)case SparkSubmitAction.PRINT_VERSION => printVersion()}}根据SparkSubmitAction的动作进行模式匹配,进入submit的方法:private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {//调用prepareSubmitEnvironment方法,根据传入的解析参数,获取以下四个变量val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)def doRunMain(): Unit = {if (args.proxyUser != null) {val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,UserGroupInformation.getCurrentUser())try {proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {override def run(): Unit = {runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)}})} catch {case e: Exception =>// Hadoop's AuthorizationException suppresses the exception's stack trace, which// makes the message printed to the output by the JVM not very helpful. Instead,// detect exceptions with empty stack traces here, and treat them differently.if (e.getStackTrace().length == 0) {error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")} else {throw e}}} else {runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)}}// Let the main class re-initialize the logging system once it starts.if (uninitLog) {Logging.uninitialize()}// In standalone cluster mode, there are two submission gateways:// (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper// (2) The new REST-based gateway introduced in Spark 1.3// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over// to use the legacy gateway if the master endpoint turns out to be not a REST server.//如果是StandaloneCluster模式,并且使用REST网关,进入下面这个分支if (args.isStandaloneCluster && args.useRest) {try {logInfo("Running Spark using the REST application submission protocol.")doRunMain()} catch {// Fail over to use the legacy submission gatewaycase e: SubmitRestConnectionException =>logWarning(s"Master endpoint ${args.master} was not a REST server. " +"Falling back to legacy submission gateway instead.")args.useRest = falsesubmit(args, false)}// In all other modes, just run the main class as prepared} else {doRunMain()}}

上面实际上是首先准备spark的环境,即调用prepareSubmitEnvironment的方法,进入到这个方法里面:

private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments,conf: Option[HadoopConfiguration] = None): (Seq[String], Seq[String], SparkConf, String) = {// Return values//定义了以下这四个需要返回的变量val childArgs = new ArrayBuffer[String]()val childClasspath = new ArrayBuffer[String]()val sparkConf = new SparkConf()var childMainClass = ""// Set the cluster manager//设置集群的资源管理器val clusterManager: Int = args.master match {case "yarn" => YARNcase "yarn-client" | "yarn-cluster" =>logWarning(s"Master ${args.master} is deprecated since 2.0." +" Please use master \"yarn\" with specified deploy mode instead.")YARNcase m if m.startsWith("spark") => STANDALONEcase m if m.startsWith("mesos") => MESOScase m if m.startsWith("k8s") => KUBERNETEScase m if m.startsWith("local") => LOCALcase _ =>error("Master must either be yarn or start with spark, mesos, k8s, or local")-1}

prepareSubmitEnvironment这个方法里面,只要做的事情是根据解析的参数,获取集群的部署模式,返回这四个参数:childArgs, childClasspath, sparkConf, childMainClass供后面程序的使用。

本篇博客是按照Standalone的集群部署模式进行介绍,因此,进入以下代码:

if (args.isStandaloneCluster) {if (args.useRest) {//如果使用REST网关,则采用RestSubmissionClientApp的方式提交childMainClass = REST_CLUSTER_SUBMIT_CLASSchildArgs += (args.primaryResource, args.mainClass)} else {//如果不使用REST的网关,则用ClientApp的方式进行提交。// In legacy standalone cluster mode, use Client as a wrapper around the user classchildMainClass = STANDALONE_CLUSTER_SUBMIT_CLASSif (args.supervise) {childArgs += "--supervise" }Option(args.driverMemory).foreach {m => childArgs += ("--memory", m) }Option(args.driverCores).foreach {c => childArgs += ("--cores", c) }childArgs += "launch"childArgs += (args.master, args.primaryResource, args.mainClass)}if (args.childArgs != null) {childArgs ++= args.childArgs}}

上面的代码主要完成,根据是否可以使用REST网关的条件,来匹配不同的提交方式,讨论ClientApp的方式进行提交,这里的childMainClass就是我们自己编写的程序的主函数。回到doRunmain的方法:

private def runMain(childArgs: Seq[String],childClasspath: Seq[String],sparkConf: SparkConf,childMainClass: String,verbose: Boolean): Unit = {if (verbose) {logInfo(s"Main class:\n$childMainClass")logInfo(s"Arguments:\n${childArgs.mkString("\n")}")// sysProps may contain sensitive information, so redact before printinglogInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")logInfo("\n")}//创建加载器,用于加载jar包val loader =if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {new ChildFirstURLClassLoader(new Array[URL](0),Thread.currentThread.getContextClassLoader)} else {new MutableURLClassLoader(new Array[URL](0),Thread.currentThread.getContextClassLoader)}Thread.currentThread.setContextClassLoader(loader)//根据指定的路径加载jar包for (jar <- childClasspath) {addJarToClasspath(jar, loader)}var mainClass: Class[_] = nulltry {//获取mainClassmainClass = Utils.classForName(childMainClass)} catch {case e: ClassNotFoundException =>logWarning(s"Failed to load $childMainClass.", e)if (childMainClass.contains("thriftserver")) {logInfo(s"Failed to load main class $childMainClass.")logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")}throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)case e: NoClassDefFoundError =>logWarning(s"Failed to load $childMainClass: ${e.getMessage()}")if (e.getMessage.contains("org/apache/hadoop/hive")) {logInfo(s"Failed to load hive class.")logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")}throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)}val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {mainClass.newInstance().asInstanceOf[SparkApplication]} else {// SPARK-4170if (classOf[scala.App].isAssignableFrom(mainClass)) {logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")}//创建一个new JavaMainApplication(mainClass)}}

进入JavaMainApplication的方法中

private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {override def start(args: Array[String], conf: SparkConf): Unit = {//获取main 方法val mainMethod = klass.getMethod("main", new Array[String](0).getClass)if (!Modifier.isStatic(mainMethod.getModifiers)) {throw new IllegalStateException("The main method in the given main class must be static")}val sysProps = conf.getAll.toMapsysProps.foreach {case (k, v) =>sys.props(k) = v}//通过反射机制调用该方法mainMethod.invoke(null, args)}

通过反射机制调用用户编写的main 函数。

假如我们采用的是ClientAPP的方式提交,进入到org.apache.spark.deploy.Client:

进入main函数

object Client {def main(args: Array[String]) {// scalastyle:off printlnif (!sys.props.contains("SPARK_SUBMIT")) {println("WARNING: This client is deprecated and will be removed in a future version of Spark")println("Use ./bin/spark-submit with \"--master spark://host:port\"")}// scalastyle:on println//创建一个ClientAPPnew ClientApp().start(args, new SparkConf())}}private[spark] class ClientApp extends SparkApplication {override def start(args: Array[String], conf: SparkConf): Unit = {val driverArgs = new ClientArguments(args)if (!conf.contains("spark.rpc.askTimeout")) {conf.set("spark.rpc.askTimeout", "10s")}Logger.getRootLogger.setLevel(driverArgs.logLevel)//创建rpcEnv的通信环境val rpcEnv =RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))//创建masterEndpoints和ClientEndpointval masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))rpcEnv.awaitTermination()}

}

在onStart的方法中:

override def onStart(): Unit = {driverArgs.cmd match {case "launch" =>// TODO: We could add an env variable here and intercept it in `sc.addJar` that would// truncate filesystem paths similar to what YARN does. For now, we just require// people call `addJar` assuming the jar is in the same directory.val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"val classPathConf = "spark.driver.extraClassPath"val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap {cp =>cp.split(java.io.File.pathSeparator)}val libraryPathConf = "spark.driver.extraLibraryPath"val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap {cp =>cp.split(java.io.File.pathSeparator)}val extraJavaOptsConf = "spark.driver.extraJavaOptions"val extraJavaOpts = sys.props.get(extraJavaOptsConf).map(Utils.splitCommandString).getOrElse(Seq.empty)val sparkJavaOpts = Utils.sparkJavaOpts(conf)val javaOpts = sparkJavaOpts ++ extraJavaOptsval command = new Command(mainClass,Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,sys.env, classPathEntries, libraryPathEntries, javaOpts)//封装Driver的信息val driverDescription = new DriverDescription(driverArgs.jarUrl,driverArgs.memory,driverArgs.cores,driverArgs.supervise,command)//向Master发送启动Driver的请求asyncSendToMasterAndForwardReply[SubmitDriverResponse](RequestSubmitDriver(driverDescription))case "kill" =>val driverId = driverArgs.driverIdasyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))}}

在master接收到Client发送来的启动Driver的信息后,整个作业的提交就完成了。接下来,就是Driver的注册。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。