日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark读取文件源码分析-1

發(fā)布時(shí)間:2024/2/28 编程问答 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark读取文件源码分析-1 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

    • 1. 問題背景
    • 2. 測試代碼
    • 3. 生成的DAG圖
      • 1. job0
      • 2. job1
    • 4. job0 產(chǎn)生的時(shí)機(jī)源碼分析
      • 1. 調(diào)用DataFrameReader.load,DataFrameReader.loadV1Source
      • 2. 調(diào)用DataSoure.resolveRelation方法
      • 3. 調(diào)用DataSource.getOrInferFileFormatSchema()
      • 4. InMemoryFileIndex 初始化
      • 5. 調(diào)用InMemoryFileIndex.bulkListLeafFiles 方法
        • 1. path.size判斷是否生成job
        • 2. list-files 的job0
          • 1. 設(shè)置job-description
          • 2. 接下來開始創(chuàng)建執(zhí)行job
    • 5. 調(diào)用鏈總結(jié)

1. 問題背景

??在測試spark任務(wù)的時(shí)候,發(fā)現(xiàn)讀取目錄下的多個(gè)文件,和直接讀取一個(gè)文件,spark的DAG中對應(yīng)的job個(gè)數(shù)不一樣,讀取目錄下的多個(gè)文件比單個(gè)文件多一個(gè)job,下面從源碼的角度做一個(gè)簡單的分析,本篇文章比較長,所以分為兩篇,第一篇介紹job0的源碼分析過程,第二篇介紹job1的源碼分析過程。

2. 測試代碼

public class UserProfileTest {static String filePath = "hdfs:///user/daily/20200828/*.parquet";public static void main(String[] args) {SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("user_profile_test").set(ConfigurationOptions.ES_NODES, "").set(ConfigurationOptions.ES_PORT, "").set(ConfigurationOptions.ES_MAPPING_ID, "uid");//主要想要考察一下這個(gè)地方為什么會產(chǎn)生更多的jobSparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();Dataset<Row> userProfileSource = sparkSession.read().parquet(filePath);userProfileSource.count();userProfileSource.write().parquet("hdfs:///user/daily/result2020082808/");} }

3. 生成的DAG圖

我們這里可以看到

Dataset<Row> userProfileSource = sparkSession.read().parquet(filePath);

這一句產(chǎn)生了兩個(gè)job,我們這里也只關(guān)注這兩個(gè)job



截取上面的有效部分放大

1. job0

job0的Description是

Listing leaf files and directories for 100 paths: hdfs://hadoop-01:9000/user/daily/20200828/part-00000-0e0dc5b5-5061-41ca-9fa6-9fb7b3e09e98-c000.snappy.parquet, ... parquet at UserProfileTest.java:26

job1的partition數(shù)量是100

2. job1

job1的Description是

parquet at UserProfileTest.java:26 parquet at UserProfileTest.java:26

想知道這兩個(gè)job產(chǎn)生的時(shí)機(jī),為什么會有這個(gè)區(qū)別。

4. job0 產(chǎn)生的時(shí)機(jī)源碼分析

1. 調(diào)用DataFrameReader.load,DataFrameReader.loadV1Source

sparkSession.read().parquet(filePath)會走到 DataFrameReader.load方法,執(zhí)行條件判斷的時(shí)候會走到最后一個(gè)else 執(zhí)行 loadV1Source

/*** Loads input in as a `DataFrame`, for data sources that support multiple paths.* Only works if the source is a HadoopFsRelationProvider.** @since 1.6.0*/@scala.annotation.varargsdef load(paths: String*): DataFrame = {if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {throw new AnalysisException("Hive data source can only be used with tables, you can not " +"read files of Hive data source directly.")}val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)if (classOf[DataSourceV2].isAssignableFrom(cls)) {val ds = cls.newInstance()val options = new DataSourceOptions((extraOptions ++DataSourceV2Utils.extractSessionConfigs(ds = ds.asInstanceOf[DataSourceV2],conf = sparkSession.sessionState.conf)).asJava)// Streaming also uses the data source V2 API. So it may be that the data source implements// v2, but has no v2 implementation for batch reads. In that case, we fall back to loading// the dataframe as a v1 source.val reader = (ds, userSpecifiedSchema) match {case (ds: ReadSupportWithSchema, Some(schema)) =>ds.createReader(schema, options)case (ds: ReadSupport, None) =>ds.createReader(options)case (ds: ReadSupportWithSchema, None) =>throw new AnalysisException(s"A schema needs to be specified when using $ds.")case (ds: ReadSupport, Some(schema)) =>val reader = ds.createReader(options)if (reader.readSchema() != schema) {throw new AnalysisException(s"$ds does not allow user-specified schemas.")}readercase _ => null // fall back to v1}if (reader == null) {loadV1Source(paths: _*)} else {Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))}} else {// 會走到這里來loadV1Source(paths: _*)}}調(diào)用這個(gè)方法private def loadV1Source(paths: String*) = {// Code path for data source v1.sparkSession.baseRelationToDataFrame(DataSource.apply(sparkSession,paths = paths,userSpecifiedSchema = userSpecifiedSchema,className = source,options = extraOptions.toMap).resolveRelation())}

在loadV1Source中new了一個(gè)DataSource對象,這里的apply方法是因?yàn)镈ataSource是case類,所以產(chǎn)生了伴生對象,在其中定義了apply和unapply方法,參考這里進(jìn)一步了解apply

然后調(diào)用了DataSoure對象的resolveRelation()方法。

2. 調(diào)用DataSoure.resolveRelation方法

/*** Create a resolved [[BaseRelation]] that can be used to read data from or write data into this* [[DataSource]]** @param checkFilesExist Whether to confirm that the files exist when generating the* non-streaming file based datasource. StructuredStreaming jobs already* list file existence, and when generating incremental jobs, the batch* is considered as a non-streaming file based data source. Since we know* that files already exist, we don't need to check them again.*/def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {val relation = (providingClass.newInstance(), userSpecifiedSchema) match {// TODO: Throw when too much is given.case (dataSource: SchemaRelationProvider, Some(schema)) =>dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema)case (dataSource: RelationProvider, None) =>dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)case (_: SchemaRelationProvider, None) =>throw new AnalysisException(s"A schema needs to be specified when using $className.")case (dataSource: RelationProvider, Some(schema)) =>val baseRelation =dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)if (baseRelation.schema != schema) {throw new AnalysisException(s"$className does not allow user-specified schemas.")}baseRelation// We are reading from the results of a streaming query. Load files from the metadata log// instead of listing them using HDFS APIs.case (format: FileFormat, _)if FileStreamSink.hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths,sparkSession.sessionState.newHadoopConf()) =>val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)val tempFileCatalog = new MetadataLogFileIndex(sparkSession, basePath, None)val fileCatalog = if (userSpecifiedSchema.nonEmpty) {val partitionSchema = combineInferredAndUserSpecifiedPartitionSchema(tempFileCatalog)new MetadataLogFileIndex(sparkSession, basePath, Option(partitionSchema))} else {tempFileCatalog}val dataSchema = userSpecifiedSchema.orElse {format.inferSchema(sparkSession,caseInsensitiveOptions,fileCatalog.allFiles())}.getOrElse {throw new AnalysisException(s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " +"It must be specified manually")}HadoopFsRelation(fileCatalog,partitionSchema = fileCatalog.partitionSchema,dataSchema = dataSchema,bucketSpec = None,format,caseInsensitiveOptions)(sparkSession)// This is a non-streaming file based datasource.// 最后會命中這個(gè)casecase (format: FileFormat, _) =>val allPaths = caseInsensitiveOptions.get("path") ++ pathsval hadoopConf = sparkSession.sessionState.newHadoopConf()val globbedPaths = allPaths.flatMap(DataSource.checkAndGlobPathIfNecessary(hadoopConf, _, checkFilesExist)).toArrayval fileStatusCache = FileStatusCache.getOrCreate(sparkSession)// 這里會發(fā)生調(diào)用關(guān)系val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) {val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytesnew CatalogFileIndex(sparkSession,catalogTable.get,catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize))} else {new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema), fileStatusCache)}HadoopFsRelation(fileCatalog,partitionSchema = partitionSchema,dataSchema = dataSchema.asNullable,bucketSpec = bucketSpec,format,caseInsensitiveOptions)(sparkSession)case _ =>throw new AnalysisException(s"$className is not a valid Spark SQL Data Source.")}relation match {case hs: HadoopFsRelation =>SchemaUtils.checkColumnNameDuplication(hs.dataSchema.map(_.name),"in the data schema",equality)SchemaUtils.checkColumnNameDuplication(hs.partitionSchema.map(_.name),"in the partition schema",equality)case _ =>SchemaUtils.checkColumnNameDuplication(relation.schema.map(_.name),"in the data schema",equality)}relation}

在上面方法的

val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, fileStatusCache)

調(diào)用了 getOrInferFileFormatSchema方法

3. 調(diào)用DataSource.getOrInferFileFormatSchema()

private def getOrInferFileFormatSchema(format: FileFormat,fileStatusCache: FileStatusCache = NoopCache): (StructType, StructType) = {// the operations below are expensive therefore try not to do them if we don't need to, e.g.,// in streaming mode, we have already inferred and registered partition columns, we will// never have to materialize the lazy val below// 這里定義的是lazy變量,最終使用的時(shí)候才會初始化lazy val tempFileIndex = {val allPaths = caseInsensitiveOptions.get("path") ++ pathsval hadoopConf = sparkSession.sessionState.newHadoopConf()val globbedPaths = allPaths.toSeq.flatMap { path =>val hdfsPath = new Path(path)val fs = hdfsPath.getFileSystem(hadoopConf)val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)}.toArray// 這個(gè)地方初始化了InMemoryFileIndex 對象,也就是在這里形成了第一個(gè)jobnew InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)}val partitionSchema = if (partitionColumns.isEmpty) {// Try to infer partitioning, because no DataSource in the read path provides the partitioning// columns properly unless it is a Hive DataSource// 在這里第一次真正使用lazy的tempFileIndex變量,也就促使了InMemoryFileIndex 的初始化。combineInferredAndUserSpecifiedPartitionSchema(tempFileIndex)} else {// maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred// partitioningif (userSpecifiedSchema.isEmpty) {val inferredPartitions = tempFileIndex.partitionSchemainferredPartitions} else {val partitionFields = partitionColumns.map { partitionColumn =>userSpecifiedSchema.flatMap(_.find(c => equality(c.name, partitionColumn))).orElse {val inferredPartitions = tempFileIndex.partitionSchemaval inferredOpt = inferredPartitions.find(p => equality(p.name, partitionColumn))if (inferredOpt.isDefined) {logDebug(s"""Type of partition column: $partitionColumn not found in specified schema|for $format.|User Specified Schema|=====================|${userSpecifiedSchema.orNull}||Falling back to inferred dataType if it exists.""".stripMargin)}inferredOpt}.getOrElse {throw new AnalysisException(s"Failed to resolve the schema for $format for " +s"the partition column: $partitionColumn. It must be specified manually.")}}StructType(partitionFields)}}val dataSchema = userSpecifiedSchema.map { schema =>StructType(schema.filterNot(f => partitionSchema.exists(p => equality(p.name, f.name))))}.orElse {format.inferSchema(sparkSession,caseInsensitiveOptions,tempFileIndex.allFiles())}.getOrElse {throw new AnalysisException(s"Unable to infer schema for $format. It must be specified manually.")}// We just print a waring message if the data schema and partition schema have the duplicate// columns. This is because we allow users to do so in the previous Spark releases and// we have the existing tests for the cases (e.g., `ParquetHadoopFsRelationSuite`).// See SPARK-18108 and SPARK-21144 for related discussions.try {SchemaUtils.checkColumnNameDuplication((dataSchema ++ partitionSchema).map(_.name),"in the data schema and the partition schema",equality)} catch {case e: AnalysisException => logWarning(e.getMessage)}(dataSchema, partitionSchema)}

在這里會調(diào)用到

new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache)

4. InMemoryFileIndex 初始化

接著來看看InMemoryFileIndex 類

class InMemoryFileIndex(sparkSession: SparkSession,rootPathsSpecified: Seq[Path],parameters: Map[String, String],partitionSchema: Option[StructType],fileStatusCache: FileStatusCache = NoopCache)extends PartitioningAwareFileIndex(sparkSession, parameters, partitionSchema, fileStatusCache) {// Filter out streaming metadata dirs or files such as "/.../_spark_metadata" (the metadata dir)// or "/.../_spark_metadata/0" (a file in the metadata dir). `rootPathsSpecified` might contain// such streaming metadata dir or files, e.g. when after globbing "basePath/*" where "basePath"// is the output of a streaming query.override val rootPaths =rootPathsSpecified.filterNot(FileStreamSink.ancestorIsMetadataDirectory(_, hadoopConf))@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _@volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _@volatile private var cachedPartitionSpec: PartitionSpec = _//該類在初始化的時(shí)候回執(zhí)行 ```refresh0 ```方法refresh0()............

該類在初始化的時(shí)候回執(zhí)行 refresh0方法

private def refresh0(): Unit = {// 這里發(fā)生了調(diào)用val files = listLeafFiles(rootPaths)cachedLeafFiles =new mutable.LinkedHashMap[Path, FileStatus]() ++= files.map(f => f.getPath -> f)cachedLeafDirToChildrenFiles = files.toArray.groupBy(_.getPath.getParent)cachedPartitionSpec = null}

在refresh0方法中又會調(diào)用 listLeafFiles(rootPaths)方法。

/*** List leaf files of given paths. This method will submit a Spark job to do parallel* listing whenever there is a path having more files than the parallel partition discovery* discovery threshold.** This is publicly visible for testing.*/def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {val output = mutable.LinkedHashSet[FileStatus]()val pathsToFetch = mutable.ArrayBuffer[Path]()for (path <- paths) {fileStatusCache.getLeafFiles(path) match {case Some(files) =>HiveCatalogMetrics.incrementFileCacheHits(files.length)output ++= filescase None =>pathsToFetch += path}Unit // for some reasons scalac 2.12 needs this; return type doesn't matter}val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))// 這里發(fā)生了bulkListLeafFiles 的調(diào)用val discovered = InMemoryFileIndex.bulkListLeafFiles(pathsToFetch, hadoopConf, filter, sparkSession)discovered.foreach { case (path, leafFiles) =>HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)fileStatusCache.putLeafFiles(path, leafFiles.toArray)output ++= leafFiles}output} }

然后又發(fā)生了對InMemoryFileIndex.bulkListLeafFiles方法的調(diào)用

5. 調(diào)用InMemoryFileIndex.bulkListLeafFiles 方法

/*** Lists a collection of paths recursively. Picks the listing strategy adaptively depending* on the number of paths to list.** This may only be called on the driver.** @return for each input path, the set of discovered files for the path*/private def bulkListLeafFiles(paths: Seq[Path],hadoopConf: Configuration,filter: PathFilter,sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {//在這里如果path下的數(shù)量小于32(parallelPartitionDiscoveryThreshold的默認(rèn)值),就直接返回了,// 如果大于32的話會開一個(gè)job單獨(dú)來查找有哪些文件,防止萬一path下的文件太多耗時(shí)比較長// Short-circuits parallel listing when serial listing is likely to be faster.if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {return paths.map { path =>(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))}}logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")HiveCatalogMetrics.incrementParallelListingJobCount(1)val sparkContext = sparkSession.sparkContextval serializableConfiguration = new SerializableConfiguration(hadoopConf)val serializedPaths = paths.map(_.toString)val parallelPartitionDiscoveryParallelism =sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism// Set the number of parallelism to prevent following file listing from generating many tasks// in case of large #defaultParallelism.val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)val statusMap = try {// 在這里會判斷出 job的description為 Listing leaf files and directories for 100 paths:val description = paths.size match {case 0 =>s"Listing leaf files and directories 0 paths"case 1 =>s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"case s =>s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."}//這里對job Description進(jìn)行設(shè)置sparkContext.setJobDescription(description)sparkContext.parallelize(serializedPaths, numParallelism).mapPartitions { pathStrings =>val hadoopConf = serializableConfiguration.valuepathStrings.map(new Path(_)).toSeq.map { path =>(path, listLeafFiles(path, hadoopConf, filter, None))}.iterator}.map { case (path, statuses) =>val serializableStatuses = statuses.map { status =>// Turn FileStatus into SerializableFileStatus so we can send it back to the driverval blockLocations = status match {case f: LocatedFileStatus =>f.getBlockLocations.map { loc =>SerializableBlockLocation(loc.getNames,loc.getHosts,loc.getOffset,loc.getLength)}case _ =>Array.empty[SerializableBlockLocation]}SerializableFileStatus(status.getPath.toString,status.getLen,status.isDirectory,status.getReplication,status.getBlockSize,status.getModificationTime,status.getAccessTime,blockLocations)}(path.toString, serializableStatuses)// 這里的collect() 為action算子,所以會觸發(fā)一個(gè)job的形成}.collect()} finally {sparkContext.setJobDescription(previousJobDescription)}// turn SerializableFileStatus back to StatusstatusMap.map { case (path, serializableStatuses) =>val statuses = serializableStatuses.map { f =>val blockLocations = f.blockLocations.map { loc =>new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)}new LocatedFileStatus(new FileStatus(f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,new Path(f.path)),blockLocations)}(new Path(path), statuses)}}

下面的代碼都是上面InMemoryFileIndex.bulkListLeafFiles方法的部分節(jié)選分析

1. path.size判斷是否生成job

// Short-circuits parallel listing when serial listing is likely to be faster.if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {return paths.map { path =>(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))}}

這一段代碼主要是用來判斷傳過來的一級目錄下有多少path,在我們這里對應(yīng)的就是匹配路徑hdfs:///user/daily/20200828/*.parquet的有多少個(gè)path,這個(gè)時(shí)候spark并不認(rèn)為匹配的路徑是一個(gè)文件,只是當(dāng)作一個(gè)目錄應(yīng)對,因?yàn)閟park支持多級目錄的識別,所以,如果目錄比較多的話都放在driver端進(jìn)行查找的話耗時(shí)可能會很長,在path的數(shù)量大于32的時(shí)候會生成一個(gè)job,扔到y(tǒng)arn集群中通過多個(gè)executor來進(jìn)行并行的查找。

sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold的值參考下面的代碼

def parallelPartitionDiscoveryThreshold: Int =getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold").doc("The maximum number of paths allowed for listing files at driver side. If the number " +"of detected paths exceeds this value during partition discovery, it tries to list the " +"files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " +"LibSVM data sources.").intConf.checkValue(parallel => parallel >= 0, "The maximum number of paths allowed for listing " +"files at driver side must not be negative").createWithDefault(32)

這里因?yàn)閔dfs:///user/daily/20200828/*.parquet有100個(gè)文件,所以上面的if并不成立,也就是會走到下面生成job0來查找文件

注意這里如果小于32調(diào)用的方法是 listLeafFiles(path, hadoopConf, filter, Some(sparkSession))并不是上面的 listLeafFiles(paths: Seq[Path])

/*** Lists a single filesystem path recursively. If a SparkSession object is specified, this* function may launch Spark jobs to parallelize listing.** If sessionOpt is None, this may be called on executors.** @return all children of path that match the specified filter.*/private def listLeafFiles(path: Path,hadoopConf: Configuration,filter: PathFilter,sessionOpt: Option[SparkSession]): Seq[FileStatus] = {....}

這里省略了方法體,從方法簽名上可以看到是Lists a single filesystem path recursively
就是從一個(gè)路徑下遞歸的查找文件的意思,也就是說一級路徑數(shù)量小于32會在driver端對每個(gè)路徑進(jìn)行遞歸的查找。注意這個(gè)方法也是屬于InMemoryFileIndex,但是和上面出現(xiàn)的

def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {...... }

不是同一個(gè)方法

2. list-files 的job0

因?yàn)樯厦娴膇f代碼不會執(zhí)行,接著往下走就是對應(yīng)生成的job0的代碼,因?yàn)檫€是有一些內(nèi)容的,我們會再拆開了看,當(dāng)然,在代碼中也有詳細(xì)的注釋

val sparkContext = sparkSession.sparkContextval parallelPartitionDiscoveryParallelism =sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism// Set the number of parallelism to prevent following file listing from generating many tasks// in case of large #defaultParallelism.val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)val statusMap = try {// 在這里會判斷出 job的description為 Listing leaf files and directories for 100 paths:val description = paths.size match {case 0 =>s"Listing leaf files and directories 0 paths"case 1 =>s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"case s =>s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."}//這里對job Description進(jìn)行設(shè)置sparkContext.setJobDescription(description)sparkContext.parallelize(serializedPaths, numParallelism).mapPartitions { pathStrings =>val hadoopConf = serializableConfiguration.valuepathStrings.map(new Path(_)).toSeq.map { path =>(path, listLeafFiles(path, hadoopConf, filter, None))}.iterator}.map { case (path, statuses) =>val serializableStatuses = statuses.map { status =>// Turn FileStatus into SerializableFileStatus so we can send it back to the driverval blockLocations = status match {case f: LocatedFileStatus =>f.getBlockLocations.map { loc =>SerializableBlockLocation(loc.getNames,loc.getHosts,loc.getOffset,loc.getLength)}case _ =>Array.empty[SerializableBlockLocation]}SerializableFileStatus(status.getPath.toString,status.getLen,status.isDirectory,status.getReplication,status.getBlockSize,status.getModificationTime,status.getAccessTime,blockLocations)}(path.toString, serializableStatuses)// 這里的collect() 為action算子,所以會觸發(fā)一個(gè)job的形成}.collect()
1. 設(shè)置job-description

在bulkListLeafFiles() 中設(shè)置job-description為

val description = paths.size match {case 0 =>s"Listing leaf files and directories 0 paths"case 1 =>s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"case s =>s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."}sparkContext.setJobDescription(description)
2. 接下來開始創(chuàng)建執(zhí)行job
val parallelPartitionDiscoveryParallelism = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelismval numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)sparkContext.setJobDescription(description)sparkContext.parallelize(serializedPaths, numParallelism).mapPartitions { pathStrings =>val hadoopConf = serializableConfiguration.valuepathStrings.map(new Path(_)).toSeq.map { path =>(path, listLeafFiles(path, hadoopConf, filter, None))}.iterator}.map { case (path, statuses) =>val serializableStatuses = statuses.map { status =>// Turn FileStatus into SerializableFileStatus so we can send it back to the driverval blockLocations = status match {case f: LocatedFileStatus =>f.getBlockLocations.map { loc =>SerializableBlockLocation(loc.getNames,loc.getHosts,loc.getOffset,loc.getLength)}case _ =>Array.empty[SerializableBlockLocation]}SerializableFileStatus(status.getPath.toString,status.getLen,status.isDirectory,status.getReplication,status.getBlockSize,status.getModificationTime,status.getAccessTime,blockLocations)}(path.toString, serializableStatuses)// 這里的collect() 為action算子,所以會觸發(fā)一個(gè)job的形成}.collect()

這里可以看到,并行度的設(shè)置為Math.min(paths.size, parallelPartitionDiscoveryParallelism)
這里的調(diào)試發(fā)現(xiàn)sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism默認(rèn)值為10000
所以numParallelism=paths.size 為100(在對應(yīng)的目錄下有100個(gè)paquet文件)
而且這個(gè)并行任務(wù)的最終方式是遞歸的找到所有文件的block信息,可以通過這段代碼看出來

mapPartitions { pathStrings =>val hadoopConf = serializableConfiguration.valuepathStrings.map(new Path(_)).toSeq.map { path =>(path, listLeafFiles(path, hadoopConf, filter, None))}.iterator}

里面的listLeafFiles(path, hadoopConf, filter, None)的定義是遞歸的從一個(gè)路徑下查找所有的文件

5. 調(diào)用鏈總結(jié)

DataFrameReader.load() DataFrameReader.loadV1Source() DataSoure.resolveRelation() DataSource.getOrInferFileFormatSchema() new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache) InMemoryFileIndex.refresh0() InMemoryFileIndex.listLeafFiles() InMemoryFileIndex.bulkListLeafFiles()

總結(jié)

以上是生活随笔為你收集整理的spark读取文件源码分析-1的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。