日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark读取配置源码剖析

發布時間:2024/1/17 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark读取配置源码剖析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

我們知道,有一些配置可以在多個地方配置。以配置executor的memory為例,有以下三種方式:1. spark-submit的--executor-memory選項2. spark-defaults.conf的spark.executor.memory配置3. spark-env.sh的SPARK_EXECUTOR_MEMORY配置

同一個配置可以在多處設置,這顯然會造成迷惑,不知道spark為什么到現在還保留這樣的邏輯。如果我分別在這三處對executor的memory設置了不同的值,最終在Application中生效的是哪個?

處理這一問題的類是SparkSubmitArguments。在其構造函數中就完成了從 『spark-submit --選項』、『spark-defaults.conf』、『spark-env.sh』中讀取配置,并根據策略決定使用哪個配置。下面分幾步來分析這個重要的構造函數。

Step0:讀取spark-env.sh配置并寫入環境變量中

SparkSubmitArguments的參數列表包含一個env: Map[String, String] = sys.env參數。該參數包含一些系統環境變量的值和從spark-env.sh中讀取的配置值,如圖是我一個demo中env值的部分截圖

這一步之所以叫做Step0,是因為env的值在構造SparkSubmitArguments對象之前就確認,即spark-env.sh在構造SparkSubmitArguments對象前就讀取并將配置存入env中。

Step1:創建各配置成員并賦空值

這一步比較簡單,定義了所有要從『spark-submit --選項』、『spark-defaults.conf』、『spark-env.sh』中讀取的配置,并賦空值。下面的代碼展示了其中一部分?:

?

?
  • var master: String = null

  • var deployMode: String = null

  • var executorMemory: String = null

  • var executorCores: String = null

  • var totalExecutorCores: String = null

  • var propertiesFile: String = null

  • var driverMemory: String = null

  • var driverExtraClassPath: String = null

  • var driverExtraLibraryPath: String = null

  • var driverExtraJavaOptions: String = null

  • var queue: String = null

  • var numExecutors: String = null

  • var files: String = null

  • var archives: String = null

  • var mainClass: String = null

  • var primaryResource: String = null

  • var name: String = null

  • var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()

  • var jars: String = null

  • var packages: String = null

  • var repositories: String = null

  • var ivyRepoPath: String = null

  • var packagesExclusions: String = null

  • var verbose: Boolean = false

  • ?
  • ...

  • Step2:調用父類parse方法解析 spark-submit --選項

    ?
  • try {

  • parse(args.toList)

  • } catch {

  • case e: IllegalArgumentException => SparkSubmit.printErrorAndExit(e.getMessage())

  • }

  • ?

    這里調用父類的SparkSubmitOptionParser#parse(List<String> args)。parse函數查找args中設置的--選項和值并解析為name和value,如--master yarn-client會被解析為值為--master的name和值為yarn-client的value。這之后調用SparkSubmitArguments#handle(MASTER, "yarn-client")進行處理。

    來看看handle函數干了什么:

    ?
  • /** Fill in values by parsing user options. */

  • override protected def handle(opt: String, value: String): Boolean = {

  • opt match {

  • case NAME =>

  • name = value

  • ?
  • case MASTER =>

  • master = value

  • ?
  • case CLASS =>

  • mainClass = value

  • ?
  • case DEPLOY_MODE =>

  • if (value != "client" && value != "cluster") {

  • SparkSubmit.printErrorAndExit("--deploy-mode must be either \"client\" or \"cluster\"")

  • }

  • deployMode = value

  • ?
  • case NUM_EXECUTORS =>

  • numExecutors = value

  • ?
  • case TOTAL_EXECUTOR_CORES =>

  • totalExecutorCores = value

  • ?
  • case EXECUTOR_CORES =>

  • executorCores = value

  • ?
  • case EXECUTOR_MEMORY =>

  • executorMemory = value

  • ?
  • case DRIVER_MEMORY =>

  • driverMemory = value

  • ?
  • case DRIVER_CORES =>

  • driverCores = value

  • ?
  • case DRIVER_CLASS_PATH =>

  • driverExtraClassPath = value

  • ?
  • ...

  • ?
  • case _ =>

  • throw new IllegalArgumentException(s"Unexpected argument '$opt'.")

  • }

  • true

  • }

  • ?

    這個函數也很簡單,根據參數opt及value,設置各個成員的值。接上例,parse中調用handle("--master", "yarn-client")后,在handle函數中,master成員將被賦值為yarn-client。

    注意,case MASTER中的MASTER的值在SparkSubmitOptionParser定義為--master,MASTER與其他值定義如下:

    ?
  • protected final String MASTER = "--master";

  • ?
  • protected final String CLASS = "--class";

  • protected final String CONF = "--conf";

  • protected final String DEPLOY_MODE = "--deploy-mode";

  • protected final String DRIVER_CLASS_PATH = "--driver-class-path";

  • protected final String DRIVER_CORES = "--driver-cores";

  • protected final String DRIVER_JAVA_OPTIONS = "--driver-java-options";

  • protected final String DRIVER_LIBRARY_PATH = "--driver-library-path";

  • protected final String DRIVER_MEMORY = "--driver-memory";

  • protected final String EXECUTOR_MEMORY = "--executor-memory";

  • protected final String FILES = "--files";

  • protected final String JARS = "--jars";

  • protected final String KILL_SUBMISSION = "--kill";

  • protected final String NAME = "--name";

  • protected final String PACKAGES = "--packages";

  • protected final String PACKAGES_EXCLUDE = "--exclude-packages";

  • protected final String PROPERTIES_FILE = "--properties-file";

  • protected final String PROXY_USER = "--proxy-user";

  • protected final String PY_FILES = "--py-files";

  • protected final String REPOSITORIES = "--repositories";

  • protected final String STATUS = "--status";

  • protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores";

  • ?
  • ...

  • ?

    總結來說,parse函數解析了spark-submit中的--選項,并根據解析出的name和value給SparkSubmitArguments的各個成員(例如master、deployMode、executorMemory等)設置值。

    Step3:mergeDefaultSparkProperties加載spark-defaults.conf中配置

    Step3讀取spark-defaults.conf中的配置文件并存入sparkProperties中,sparkProperties將在下一步中發揮作用

    ?
  • //< 保存從spark-defaults.conf讀取的配置

  • val sparkProperties: HashMap[String, String] = new HashMap[String, String]()

  • ?
  • //< 獲取配置文件路徑,若在spark-env.sh中設置SPARK_CONF_DIR,則以該值為準;否則為 $SPARK_HOME/conf/spark-defaults.conf

  • def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {

  • env.get("SPARK_CONF_DIR")

  • .orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" })

  • .map { t => new File(s"$t${File.separator}spark-defaults.conf")}

  • .filter(_.isFile)

  • .map(_.getAbsolutePath)

  • .orNull

  • }

  • ?
  • //< 讀取spark-defaults.conf配置并存入sparkProperties中

  • private def mergeDefaultSparkProperties(): Unit = {

  • // Use common defaults file, if not specified by user

  • propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile(env))

  • // Honor --conf before the defaults file

  • defaultSparkProperties.foreach { case (k, v) =>

  • if (!sparkProperties.contains(k)) {

  • sparkProperties(k) = v

  • }

  • }

  • }

  • Step4:loadEnvironmentArguments確認每個配置成員最終值

    先來看看代碼(由于篇幅太長,省略了一部分)

    ?
  • private def loadEnvironmentArguments(): Unit = {

  • master = Option(master)

  • .orElse(sparkProperties.get("spark.master"))

  • .orElse(env.get("MASTER"))

  • .orNull

  • driverExtraClassPath = Option(driverExtraClassPath)

  • .orElse(sparkProperties.get("spark.driver.extraClassPath"))

  • .orNull

  • driverExtraJavaOptions = Option(driverExtraJavaOptions)

  • .orElse(sparkProperties.get("spark.driver.extraJavaOptions"))

  • .orNull

  • driverExtraLibraryPath = Option(driverExtraLibraryPath)

  • .orElse(sparkProperties.get("spark.driver.extraLibraryPath"))

  • .orNull

  • driverMemory = Option(driverMemory)

  • .orElse(sparkProperties.get("spark.driver.memory"))

  • .orElse(env.get("SPARK_DRIVER_MEMORY"))

  • .orNull

  • ?
  • ...

  • ?
  • keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull

  • principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull

  • ?
  • // Try to set main class from JAR if no --class argument is given

  • if (mainClass == null && !isPython && !isR && primaryResource != null) {

  • val uri = new URI(primaryResource)

  • val uriScheme = uri.getScheme()

  • ?
  • uriScheme match {

  • case "file" =>

  • try {

  • val jar = new JarFile(uri.getPath)

  • // Note that this might still return null if no main-class is set; we catch that later

  • mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")

  • } catch {

  • case e: Exception =>

  • SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")

  • }

  • case _ =>

  • SparkSubmit.printErrorAndExit(

  • s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +

  • "Please specify a class through --class.")

  • }

  • }

  • ?
  • // Global defaults. These should be keep to minimum to avoid confusing behavior.

  • master = Option(master).getOrElse("local[*]")

  • ?
  • // In YARN mode, app name can be set via SPARK_YARN_APP_NAME (see SPARK-5222)

  • if (master.startsWith("yarn")) {

  • name = Option(name).orElse(env.get("SPARK_YARN_APP_NAME")).orNull

  • }

  • ?
  • // Set name from main class if not given

  • name = Option(name).orElse(Option(mainClass)).orNull

  • if (name == null && primaryResource != null) {

  • name = Utils.stripDirectory(primaryResource)

  • }

  • ?
  • // Action should be SUBMIT unless otherwise specified

  • action = Option(action).getOrElse(SUBMIT)

  • }

  • ?
  • 我們單獨以確定master值的那部分代碼來說明,相關代碼如下

  • ?
  • master = Option(master)

  • .orElse(sparkProperties.get("spark.master"))

  • .orElse(env.get("MASTER"))

  • .orNull

  • ?
  • // Global defaults. These should be keep to minimum to avoid confusing behavior.

  • master = Option(master).getOrElse("local[*]")

  • 確定master的值的步驟如下:1.?Option(master):若master值不為null,則以master為準;否則進入2。若master不為空,從上文的分析我們可以知道是從解析spark-submit --master選項得到的值2..orElse(sparkProperties.get("spark.master")):若sparkProperties.get("spark.master")范圍非null則以該返回值為準;否則進入3。從Step3中可以知道sparkProperties中的值都是從spark-defaults.conf中讀取3..orElse(env.get("MASTER")):若env.get("MASTER")返回非null,則以該返回值為準;否則進入4。env中的值從spark-env.sh讀取而來4. 若以上三處均為設置master,則取默認值local[*]

    查看其余配置成員的值的決定過程也和master一致,稍有不同的是并不是所有配置都能在spark-defaults.conf、spark-env.sh和spark-submit選項中設置。但優先級還是一致的。

    由此,我們可以得出結論,對于spark配置。若一個配置在多處設置,則優先級如下:spark-submit --選項 > spark-defaults.conf配置 > spark-env.sh配置 > 默認值

    最后,附上流程圖

    文章來源:https://github.com/keepsimplefocus/spark-sourcecodes-analysis/blob/master/markdowns/Spark%E8%AF%BB%E5%8F%96%E9%85%8D%E7%BD%AE.md

    總結

    以上是生活随笔為你收集整理的Spark读取配置源码剖析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。