日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark2.4.0 SparkEnv 源码分析

發(fā)布時(shí)間:2023/12/15 编程问答 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark2.4.0 SparkEnv 源码分析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Spark2.4.0 SparkEnv 源碼分析

更多資源

  • github: https://github.com/opensourceteams/spark-scala-maven-2.4.0

時(shí)序圖

前置條件

  • Hadoop版本: hadoop-2.9.2
  • Spark版本: spark-2.4.0-bin-hadoop2.7
  • JDK.1.8.0_191
  • scala2.11.12

主要內(nèi)容描述

  • SparkEnv對(duì)象構(gòu)建
  • SparkEnv類中做如下操作
  • ).new SecurityManager()
  • ).new NettyRpcEnvFactory()
  • ).創(chuàng)建NettyRpcEnv
  • ).Utils.startServiceOnPort(啟動(dòng)sparkDriver)
  • ). new BroadcastManager
  • ).注冊(cè)端點(diǎn)MapOutputTracker
  • ).ShuffleManager:SortShuffleManager
  • ).默認(rèn)內(nèi)存管理器:UnifiedMemoryManager
  • ).注冊(cè)端點(diǎn)MapOutputTracker
  • ).SortShuffleManager
  • ).UnifiedMemoryManager
  • ).注冊(cè)端點(diǎn)BlockManagerMaster
  • ).new BlockManager
  • ).注冊(cè)端點(diǎn)OutputCommitCoordinator

SparkContext

SparkContext 類構(gòu)造中調(diào)用createSparkEnv

private var _env: SparkEnv = _// Create the Spark execution environment (cache, map output tracker, etc)_env = createSparkEnv(_conf, isLocal, listenerBus)

SparkContext.createSparkEnv()

  • 調(diào)用SparkEnv.createDriverEnv函數(shù),創(chuàng)建SparkEnv對(duì)象
// This function allows components created by SparkEnv to be mocked in unit tests:private[spark] def createSparkEnv(conf: SparkConf,isLocal: Boolean,listenerBus: LiveListenerBus): SparkEnv = {SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))}

SparkEnv

SparkEnv.createDriverEnv()

  • 調(diào)用create()方法
/*** Create a SparkEnv for the driver.*/private[spark] def createDriverEnv(conf: SparkConf,isLocal: Boolean,listenerBus: LiveListenerBus,numCores: Int,mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {assert(conf.contains(DRIVER_HOST_ADDRESS),s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")val bindAddress = conf.get(DRIVER_BIND_ADDRESS)val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)val port = conf.get("spark.driver.port").toIntval ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {Some(CryptoStreamUtils.createKey(conf))} else {None}create(conf,SparkContext.DRIVER_IDENTIFIER,bindAddress,advertiseAddress,Option(port),isLocal,numCores,ioEncryptionKey,listenerBus = listenerBus,mockOutputCommitCoordinator = mockOutputCommitCoordinator)}

SparkEnv.create()

  • ).這個(gè)是最重要的方法
  • ).SparkEnv對(duì)象是在這個(gè)方法中構(gòu)造的
  • ).new SecurityManager()
  • ).new NettyRpcEnvFactory()
  • ).創(chuàng)建NettyRpcEnv
  • ).Utils.startServiceOnPort(啟動(dòng)sparkDriver)
  • ). new BroadcastManager
  • ).注冊(cè)端點(diǎn)MapOutputTracker
  • ).ShuffleManager:SortShuffleManager
  • ).默認(rèn)內(nèi)存管理器:UnifiedMemoryManager
  • ).注冊(cè)端點(diǎn)MapOutputTracker
  • ).SortShuffleManager
  • ).UnifiedMemoryManager
  • ).注冊(cè)端點(diǎn)BlockManagerMaster
  • ).new BlockManager
  • ).注冊(cè)端點(diǎn)OutputCommitCoordinator
/*** Helper method to create a SparkEnv for a driver or an executor.*/private def create(conf: SparkConf,executorId: String,bindAddress: String,advertiseAddress: String,port: Option[Int],isLocal: Boolean,numUsableCores: Int,ioEncryptionKey: Option[Array[Byte]],listenerBus: LiveListenerBus = null,mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER// Listener bus is only used on the driverif (isDriver) {assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")}val securityManager = new SecurityManager(conf, ioEncryptionKey)if (isDriver) {securityManager.initializeAuth()}ioEncryptionKey.foreach { _ =>if (!securityManager.isEncryptionEnabled()) {logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +"wire.")}}val systemName = if (isDriver) driverSystemName else executorSystemNameval rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,securityManager, numUsableCores, !isDriver)// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.if (isDriver) {conf.set("spark.driver.port", rpcEnv.address.port.toString)}// Create an instance of the class with the given name, possibly initializing it with our confdef instantiateClass[T](className: String): T = {val cls = Utils.classForName(className)// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just// SparkConf, then one taking no argumentstry {cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE).newInstance(conf, new java.lang.Boolean(isDriver)).asInstanceOf[T]} catch {case _: NoSuchMethodException =>try {cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]} catch {case _: NoSuchMethodException =>cls.getConstructor().newInstance().asInstanceOf[T]}}}// Create an instance of the class named by the given SparkConf property, or defaultClassName// if the property is not set, possibly initializing it with our confdef instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {instantiateClass[T](conf.get(propertyName, defaultClassName))}val serializer = instantiateClassFromConf[Serializer]("spark.serializer", "org.apache.spark.serializer.JavaSerializer")logDebug(s"Using serializer: ${serializer.getClass}")val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)val closureSerializer = new JavaSerializer(conf)def registerOrLookupEndpoint(name: String, endpointCreator: => RpcEndpoint):RpcEndpointRef = {if (isDriver) {logInfo("Registering " + name)rpcEnv.setupEndpoint(name, endpointCreator)} else {RpcUtils.makeDriverRef(name, conf, rpcEnv)}}val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)val mapOutputTracker = if (isDriver) {new MapOutputTrackerMaster(conf, broadcastManager, isLocal)} else {new MapOutputTrackerWorker(conf)}// Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint// requires the MapOutputTracker itselfmapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,new MapOutputTrackerMasterEndpoint(rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))// Let the user specify short names for shuffle managersval shortShuffleMgrNames = Map("sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")val shuffleMgrClass =shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)val memoryManager: MemoryManager =if (useLegacyMemoryManager) {new StaticMemoryManager(conf, numUsableCores)} else {UnifiedMemoryManager(conf, numUsableCores)}val blockManagerPort = if (isDriver) {conf.get(DRIVER_BLOCK_MANAGER_PORT)} else {conf.get(BLOCK_MANAGER_PORT)}val blockTransferService =new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,blockManagerPort, numUsableCores)val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(BlockManagerMaster.DRIVER_ENDPOINT_NAME,new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),conf, isDriver)// NB: blockManager is not valid until initialize() is called later.val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,blockTransferService, securityManager, numUsableCores)val metricsSystem = if (isDriver) {// Don't start metrics system right now for Driver.// We need to wait for the task scheduler to give us an app ID.// Then we can start the metrics system.MetricsSystem.createMetricsSystem("driver", conf, securityManager)} else {// We need to set the executor ID before the MetricsSystem is created because sources and// sinks specified in the metrics configuration file will want to incorporate this executor's// ID into the metrics they report.conf.set("spark.executor.id", executorId)val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)ms.start()ms}val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {new OutputCommitCoordinator(conf, isDriver)}val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)val envInstance = new SparkEnv(executorId,rpcEnv,serializer,closureSerializer,serializerManager,mapOutputTracker,shuffleManager,broadcastManager,blockManager,securityManager,metricsSystem,memoryManager,outputCommitCoordinator,conf)// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is// called, and we only need to do it for driver. Because driver may run as a service, and if we// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.if (isDriver) {val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePathenvInstance.driverTmpDir = Some(sparkFilesDir)}envInstance}

end

總結(jié)

以上是生活随笔為你收集整理的Spark2.4.0 SparkEnv 源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。