日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark加载hadoop配置原理

發布時間:2024/1/17 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark加载hadoop配置原理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

0x0 背景
最近為了將hadoop&hive的五大配置文件,即:

core-site.xml
hdfs-site.xml
yarn-site.xml
mapred-site.xml
hive-site.xml

從項目中(classpath)移到項目外(任意位置),研究了spark啟動過程的源碼,在此記錄一下。

0x1 Hadoop及Hive獲取默認配置過程
Hadoop有一個類?
Configuration implementsIterable<Map.Entry<String,String>>,Writable?
這個類就是用于處理hadoop的配置,其內部有靜態代碼塊:

static{
? ? //print deprecation warning if hadoop-site.xml is found in classpath
? ? ClassLoader cL = Thread.currentThread().getContextClassLoader();
? ? if (cL == null) {
? ? ? cL = Configuration.class.getClassLoader();
? ? }
? ? if(cL.getResource("hadoop-site.xml")!=null) {
? ? ? LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
? ? ? ? ? "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
? ? ? ? ? + "mapred-site.xml and hdfs-site.xml to override properties of " +
? ? ? ? ? "core-default.xml, mapred-default.xml and hdfs-default.xml " +
? ? ? ? ? "respectively");
? ? }
? ? addDefaultResource("core-default.xml");
? ? addDefaultResource("core-site.xml");
? }

可見,當Configuration加載后,就會從classpath讀取

hadoop-site.xml
core-default.xml
core-site.xml

這三個配置文件。?
同時,Configuration類有四個子類:?
?
分別是:

HdfsConfiguration
HiveConf
JobConf
YarnConfiguration

進入這四個類內部同樣可以見到類似的靜態代碼,?
HdfsConfiguration中:

static {
? ? addDeprecatedKeys();
? ? // adds the default resources
? ? Configuration.addDefaultResource("hdfs-default.xml");
? ? Configuration.addDefaultResource("hdfs-site.xml");
}

YarnConfiguration中:

static {
? ? ? ? addDeprecatedKeys();
? ? ? ? Configuration.addDefaultResource("yarn-default.xml");
? ? ? ? Configuration.addDefaultResource("yarn-site.xml");
? ? ? ? ...
}

JobConf中:

public static void loadResources() {
? ? ? ? addDeprecatedKeys();
? ? ? ? Configuration.addDefaultResource("mapred-default.xml");
? ? ? ? Configuration.addDefaultResource("mapred-site.xml");
? ? ? ? Configuration.addDefaultResource("yarn-default.xml");
? ? ? ? Configuration.addDefaultResource("yarn-site.xml");
}

但是HiveConf并未在靜態代碼塊中讀取配置文件,然而在CarbonData的啟動過程中,會讀取hive-site.xml:

val hadoopConf = new Configuration()
val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
if (configFile != null) {
? ? hadoopConf.addResource(configFile)
}

可見,Hadoop在啟動過程中,各組件會首先在classpath下讀取相應的配置文件。?
我們也可以通過Configuration的set(String name, String value)或者addResource(Path file)方法來添加配置,addResource內部執行流程如下:

? ? //將資源添加到resources列表(存儲配置文件資源的列表)
? ? resources.add(resource); ? // add to resources
? ? //將已有的屬性清空
? ? properties = null; ? ? ? ? // trigger reload
? ? finalParameters.clear(); ? // clear site-limits
? ? //重新加載所有配置
? ? loadResources(Properties properties,
? ? ? ? ? ? ? ? ? ArrayList<Resource> resources,
? ? ? ? ? ? ? ? ? boolean quiet)

0x2 Spark啟動過程中設置Hadoop配置
Spark Application啟動過程中首先要實啟動一個SparkContext,其實SparkContext本質上可以理解為Spark運行的配置集合。

val sc = SparkContext.getOrCreate(sparkConf)

而在SparkContext創建過程中會啟動一個調度任務,用于連接遠程集群:

val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)

如果是Spark on Yarn,會調用YarnClusterManager的createSchedulerBackend方法:

override def createSchedulerBackend(sc: SparkContext,
? ? ? masterURL: String,
? ? ? scheduler: TaskScheduler): SchedulerBackend = {
? ? sc.deployMode match {
? ? ? case "cluster" =>
? ? ? ? new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
? ? ? case "client" =>
? ? ? ? new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
? ? ? case ?_ =>
? ? ? ? throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
? ? }
? }

然后在YarnClientSchedulerBackend中創建了YarnClient,可見看Client中的構造函數:

private[spark] class Client(
? ? val args: ClientArguments,
? ? val hadoopConf: Configuration,
? ? val sparkConf: SparkConf)
? extends Logging {

? import Client._
? import YarnSparkHadoopUtil._

? def this(clientArgs: ClientArguments, spConf: SparkConf) =
? ? this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)

? private val yarnClient = YarnClient.createYarnClient
? private val yarnConf = new YarnConfiguration(hadoopConf)

可見,Spark將利用SparkConf中的配置,調用SparkHadoopUtil.get.newConfiguration(spConf)方法生成相應的Hadoop配置。?
其實,在SparkContext中,有2個成員變量(本質上是一個):

private var _hadoopConfiguration: Configuration = _
def hadoopConfiguration: Configuration = _hadoopConfiguration
....
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)

這個_hadoopConfiguration 也是通過SparkHadoopUtil.get.newConfiguration(_conf)方法獲取到hadoop的配置。?
進入SparkHadoopUtil.get.newConfiguration(_conf)方法,可見到:

? ? ?conf.getAll.foreach { case (key, value) =>
? ? ? ? if (key.startsWith("spark.hadoop.")) {
? ? ? ? ? hadoopConf.set(key.substring("spark.hadoop.".length), value)
? ? ? ? }
? ? ? }
1
2
3
4
5
也就是說,在SparkConf中所有以spark.hadoop.開頭的屬性,都會被轉換為hadoop的配置。

那么我們通過解析hadoop的xml配置文件,轉換為相應的鍵值對,傳給spark就可以了。代碼如下:

? ? /**
? ? ?* 讀取hadoopConfPath下所有hadoop相關配置文件,并轉換為SparkConf
? ? ?*
? ? ?* @param hadoopConfPath hadoop配置文件所在的文件夾
? ? ?* @return?
? ? ?*/
? ? public SparkConf getHadoopConf(String hadoopConfPath) {
? ? ? ? SparkConf hadoopConf = new SparkConf();

? ? ? ? try {
? ? ? ? ? ? Map<String, String> hiveConfMap = parseXMLToMap(hadoopConfPath + "/hive-site.xml");
? ? ? ? ? ? Map<String, String> hadoopConfMap = parseXMLToMap(hadoopConfPath + "/core-site.xml");
? ? ? ? ? ? hadoopConfMap.putAll(parseXMLToMap(hadoopConfPath + "/hdfs-site.xml"));
? ? ? ? ? ? hadoopConfMap.putAll(parseXMLToMap(hadoopConfPath + "/yarn-site.xml"));
? ? ? ? ? ? hadoopConfMap.putAll(parseXMLToMap(hadoopConfPath + "/mapred-site.xml"));

? ? ? ? ? ? for (Map.Entry<String, String> entry : hiveConfMap.entrySet()) {
? ? ? ? ? ? ? ? hadoopConf.set(entry.getKey(), entry.getValue());
? ? ? ? ? ? }
? ? ? ? ? ? for (Map.Entry<String, String> entry : hadoopConfMap.entrySet()) {
? ? ? ? ? ? ? ? hadoopConf.set("spark.hadoop." + entry.getKey(), entry.getValue());
? ? ? ? ? ? }
? ? ? ? ? ? return hadoopConf;
? ? ? ? } catch (DocumentException e) {
? ? ? ? ? ? logger.error("讀取xml文件失敗!");
? ? ? ? ? ? throw new RuntimeException(e);
? ? ? ? }

? ? }

? ? //將xml解析為HashMap
? ? private Map<String, String> parseXMLToMap(String xmlFilePath) throws DocumentException {
? ? ? ? Map<String, String> confMap = new HashMap<>();
? ? ? ? SAXReader reader = new SAXReader();
? ? ? ? Document document = reader.read(new File(xmlFilePath));
? ? ? ? Element configuration = document.getRootElement();
? ? ? ? Iterator iterator = configuration.elementIterator();
? ? ? ? while (iterator.hasNext()) {
? ? ? ? ? ? Element property = (Element) iterator.next();
? ? ? ? ? ? String name = property.element("name").getText();
? ? ? ? ? ? String value = property.element("value").getText();
? ? ? ? ? ? confMap.put(name, value);
? ? ? ? }
? ? ? ? return confMap;
? ? }

注意:?
經測試,如果集群有kerberos加密,該方法無效!?
原因可能是:

class SparkHadoopUtil extends Logging {
? ? ? private val sparkConf = new SparkConf(false).loadFromSystemProperties(true)
? ? ? val conf: Configuration = newConfiguration(sparkConf)
? ? ? UserGroupInformation.setConfiguration(conf)

在該類中設置了一個new的SparkConf,這個SparkConf只從System.getProperty讀取spark開頭的屬性,因此不是正確的屬性,導致kerberos登錄異常。
?

總結

以上是生活随笔為你收集整理的Spark加载hadoop配置原理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。