700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > 【原创】大数据基础之Spark(1)Spark Submit即Spark任务提交过程

【原创】大数据基础之Spark(1)Spark Submit即Spark任务提交过程

时间:2022-07-18 13:28:37

相关推荐

【原创】大数据基础之Spark(1)Spark Submit即Spark任务提交过程

Spark2.1.1

一 Spark Submit本地解析

1.1 现象

提交命令:

spark-submit--masterlocal[10]--driver-memory 30g--class app.package.AppClass app-1.0.jar

进程:

hadoop 225653 0.0 0.0 11256 364 ? S Aug24 0:00 bash /$spark-dir/bin/spark-classorg.apache.spark.deploy.SparkSubmit --master local[10] --driver-memory 30g --classapp.package.AppClass app-1.0.jar

hadoop 225654 0.0 0.0 34424 2860 ? Sl Aug24 0:00 /$jdk_dir/bin/java -Xmx128m -cp /spark-dir/jars/* org.apache.spark.launcher.Mainorg.apache.spark.deploy.SparkSubmit--master local[10] --driver-memory 30g --classapp.package.AppClass app-1.0.jar

1.2 执行过程

1.2.1 脚本执行

-bash-4.1$ cat bin/spark-submit
#!/usr/bin/env bash

if [ -z "${SPARK_HOME}" ]; then

source "$(dirname "$0")"/find-spark-home

fi

# disable randomized hash for string in Python 3.3+

export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

注释:这里执行了另一个脚本spark-class,具体如下:

-bash-4.1$ cat bin/spark-class

...

build_command() {

"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"

printf "%d\0" $?

}

CMD=()

while IFS= read -d '' -r ARG; do

CMD+=("$ARG")

done < <(build_command "$@")

...

CMD=("${CMD[@]:0:$LAST}")

exec "${CMD[@]}"

注释:这里执行java class:org.apache.spark.launcher.Main,并传入参数,具体如下:

1.2.2 代码执行

org.apache.spark.launcher.Main

...builder = new SparkSubmitCommandBuilder(help);...List<String> cmd = builder.buildCommand(env);...List<String> bashCmd = prepareBashCommand(cmd, env);for (String c : bashCmd) {System.out.print(c);System.out.print('\0');}...

注释:其中会调用SparkSubmitCommandBuilder来生成Spark Submit命令,具体如下:

org.apache.spark.launcher.SparkSubmitCommandBuilder

...private List<String> buildSparkSubmitCommand(Map<String, String> env)...addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));addOptionString(cmd, System.getenv("SPARK_JAVA_OPTS"));...String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);...if (isClientMode) {...addOptionString(cmd, driverExtraJavaOptions);...}...addPermGenSizeOpt(cmd);cmd.add("org.apache.spark.deploy.SparkSubmit");cmd.addAll(buildSparkSubmitArgs());return cmd;...

注释:这里创建了本地命令,其中java class:org.apache.spark.deploy.SparkSubmit,同时会把各种JavaOptions放到启动命令里(比如SPARK_JAVA_OPTS,DRIVER_EXTRA_JAVA_OPTIONS等),具体如下:

org.apache.spark.deploy.SparkSubmit

def main(args: Array[String]): Unit = {val appArgs = new SparkSubmitArguments(args) //parse command line parameterif (appArgs.verbose) {// scalastyle:off printlnprintStream.println(appArgs)// scalastyle:on println}appArgs.action match {case SparkSubmitAction.SUBMIT => submit(appArgs)case SparkSubmitAction.KILL => kill(appArgs)case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)}}private def submit(args: SparkSubmitArguments): Unit = {val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) //merge all parameters from: command line, properties file, system property, etc...def doRunMain(): Unit = {...runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)...}...private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments): (Seq[String], Seq[String], Map[String, String], String) = {if (deployMode == CLIENT || isYarnCluster) {childMainClass = args.mainClass...if (isYarnCluster) {childMainClass = "org.apache.spark.deploy.yarn.Client"...private def runMain(childArgs: Seq[String],childClasspath: Seq[String],sysProps: Map[String, String],childMainClass: String,verbose: Boolean): Unit = {// scalastyle:off printlnif (verbose) {printStream.println(s"Main class:\n$childMainClass")printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")printStream.println(s"System properties:\n${sysProps.mkString("\n")}")printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}")printStream.println("\n")}// scalastyle:on printlnval loader =if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {new ChildFirstURLClassLoader(new Array[URL](0),Thread.currentThread.getContextClassLoader)} else {new MutableURLClassLoader(new Array[URL](0),Thread.currentThread.getContextClassLoader)}Thread.currentThread.setContextClassLoader(loader)for (jar <- childClasspath) {addJarToClasspath(jar, loader)}for ((key, value) <- sysProps) {System.setProperty(key, value)}var mainClass: Class[_] = nulltry {mainClass = Utils.classForName(childMainClass)} catch {...val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)...mainMethod.invoke(null, childArgs.toArray)...

注释:这里首先会解析命令行参数,比如mainClass,准备运行环境包括System Property以及classpath等,然后使用一个新的classloader:ChildFirstURLClassLoader来加载用户的mainClass,然后反射调用mainClass的main方法,这样用户的app.package.AppClass的main方法就开始执行了。

org.apache.spark.SparkConf

class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {import SparkConf._/** Create a SparkConf that loads defaults from system properties and the classpath */def this() = this(true)...if (loadDefaults) {loadFromSystemProperties(false)}private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {// Load any spark.* system propertiesfor ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {set(key, value, silent)}this}

注释:这里可以看到spark是怎样加载配置的

1.2.3 --verbose

spark-submit --master local[*] --classapp.package.AppClass --jars /$other-dir/other.jar --driver-memory 1g--verboseapp-1.0.jar

输出示例:

Main class:

app.package.AppClass

Arguments:

System properties:

spark.executor.logs.rolling.maxSize -> 1073741824

spark.driver.memory -> 1g

spark.driver.extraLibraryPath -> /$hadoop-dir/lib/native

spark.eventLog.enabled -> true

press -> true

spark.executor.logs.rolling.time.interval -> daily

SPARK_SUBMIT -> true

spark.app.name->app.package.AppClass

spark.driver.extraJavaOptions -> -XX:+PrintGCDetails -XX:+UseG1GC -XX:G1HeapRegionSize=32M -XX:+UseGCOverheadLimit -XX:+ExplicitGCInvokesConcurrent -XX:+HeapDumpOnOutOfMemoryError -XX:-UseCompressedClassPointers -XX:CompressedClassSpaceSize=3G -XX:+PrintGCTimeStamps-Xloggc:/export/Logs/hadoop/g1gc.log

spark.jars ->file:/$other-dir/other.jar

spark.sql.adaptive.enabled -> true

spark.submit.deployMode -> client

spark.executor.logs.rolling.maxRetainedFiles -> 10

spark.executor.extraClassPath -> /usr/lib/hadoop/lib/hadoop-lzo.jar

spark.eventLog.dir ->hdfs://myhdfs/spark/history

spark.master -> local[*]

spark.sql.crossJoin.enabled -> true

spark.driver.extraClassPath -> /usr/lib/hadoop/lib/hadoop-lzo.jar

Classpath elements:

file:/$other-dir/other.jar

file:/app-1.0.jar

启动时添加--verbose参数后,可以输出所有的运行时信息,有助于判断问题。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。