spark SQL读取ORC文件从Driver启动到开始执行Task(或stage)间隔时间太长(计算Partition时间太长)且产出orc单个文件中stripe个数太多问题解决方案...
1、背景:
? ??控制上游文件個數每天7000個,每個文件大小小于256M,50億條+,orc格式。查看每個文件的stripe個數,500個左右,查詢命令:hdfs fsck viewfs://hadoop/nn01/warehouse/…….db/……/partition_date=2017-11-11/part-06999 -files -blocks;
stripe個數查看命令:hive --orcfiledump viewfs://hadoop/nn01/warehouse/…….db/table/partition_date=2017-11-11/part-06999 | less
2、問題出現:
? ? 通過Spark SQL讀取orc格式文件,從spark作業提交到計算出Partition,開始執行Task,間隔時間太長。
? ? 頻繁打印如下日志:
17/11/11 03:52:01 INFO BlockManagerMasterEndpoint: Registering block manager gh-data-hdp-dn0640.---:11942 with 6.1 GB RAM, BlockManagerId(554, ----, 11942)
17/11/11 03:52:29 INFO DFSClient: Firstly choose dn: DatanodeInfoWithStorage[10.20.--.--:50010,DS-32f8aaa5-c6ce-48a9-a2b1-3b169df193b9,DISK], --
17/11/11 03:52:29 INFO DFSClient: Firstly choose dn:?
? ? 問題抽象:如果執行如下簡單SQL 也會出現作業提交后ApplicationMaster(Driver)啟動了,作業Task遲遲不執行,Partition不能計算出來。SparkUI刷不出來DAU圖,看不到Stage相關信息。
SELECT * from table where partition_date=2017-11-11 limit 1;
3、問題分析
? ? 初步分析:Driver讀取DataNode的數據,通過分析GC日志發現:確認Driver讀取了DataNode上的數據(orc文件的head信息),導致Driver產生了full GC。
? ? 源碼跟蹤分析:發現和spark讀取orc文件的策略有關系。
? ? 查看HiveConf.java發現Spark讀取orc文件默認采用HYBRID策略。
?
HIVE_ORC_SPLIT_STRATEGY("hive.exec.orc.split.strategy", "HYBRID", new StringSet(new String[]{"HYBRID", "BI", "ETL"}), "This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based on heuristics."),?
? ??查看OrcInputFormat.java文件發現HYBRID切分策略代碼如下:
?
public SplitStrategy call() throws IOException {final SplitStrategy splitStrategy;AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,context.conf, context.transactionList);List<Long> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());Path base = dirInfo.getBaseDirectory();List<FileStatus> original = dirInfo.getOriginalFiles();boolean[] covered = new boolean[context.numBuckets];boolean isOriginal = base == null;// if we have a base to work fromif (base != null || !original.isEmpty()) {// find the base files (original or new style)List<FileStatus> children = original;if (base != null) {children = SHIMS.listLocatedStatus(fs, base,AcidUtils.hiddenFileFilter);}long totalFileSize = 0;for (FileStatus child : children) {totalFileSize += child.getLen();AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename(child.getPath(), context.conf);int b = opts.getBucket();// If the bucket is in the valid range, mark it as covered.// I wish Hive actually enforced bucketing all of the time.if (b >= 0 && b < covered.length) {covered[b] = true;}}int numFiles = children.size();long avgFileSize = totalFileSize / numFiles;switch(context.splitStrategyKind) {case BI:// BI strategy requested through configsplitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal,deltas, covered);break;case ETL:// ETL strategy requested through configsplitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal,deltas, covered);break;default:// HYBRID strategyif (avgFileSize > context.maxSize) {splitStrategy = new ETLSplitStrategy(context, fs, dir, children, isOriginal, deltas,covered);} else {splitStrategy = new BISplitStrategy(context, fs, dir, children, isOriginal, deltas,covered);}break;}} else {// no base, only deltassplitStrategy = new ACIDSplitStrategy(dir, context.numBuckets, deltas, covered);}return splitStrategy;} }?
? ??HYBRID策略:Spark Driver啟動的時候,會去nameNode讀取元數據,根據文件總大小和文件個數計算一個文件的平均大小,如果這個平均值大于默認256M的時候就會觸發ETL策略。ETL策略就會去DataNode上讀取orc文件的head等信息,如果stripe個數多或元數據信息太大就會導致Driver 產生FUll GC,這個時候就會表現為Driver啟動到Task執行間隔時間太久的現象。
4、解決方案:
spark 1.6.2:
?
val hiveContext = new HiveContext(sc) // 默認64M,即代表在壓縮前數據量累計到64M就會產生一個stripe。與之對應的hive.exec.orc.default.row.index.stride=10000可以控制有多少行是產生一個stripe。 // 調整這個參數可控制單個文件中stripe的個數,不配置單個文件stripe過多,影響下游使用,如果配置了ETL切分策略或啟發式觸發了ETL切分策略,就會使得Driver讀取DataNode元數據太大,進而導致頻繁GC,使得計算Partition的時間太長難以接受。 hiveContext.setConf("hive.exec.orc.default.stripe.size","268435456") // 總共有三種策略{"HYBRID", "BI", "ETL"}), 默認是"HYBRID","This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based on heuristics."), // 如果不配置,當orc文件大小大于spark框架估算的平均值256M時,會觸發ETL策略,導致Driver讀取DataNode數據切分split花費大量的時間。 hiveContext.setConf("hive.exec.orc.split.strategy", "BI")?
spark2.2.0:
?
// 創建一個支持Hive的SparkSession val sparkSession = SparkSession.builder().appName("PvMvToBase")// 默認64M,即代表在壓縮前數據量累計到64M就會產生一個stripe。與之對應的hive.exec.orc.default.row.index.stride=10000可以控制有多少行是產生一個stripe。// 調整這個參數可控制單個文件中stripe的個數,不配置單個文件stripe過多,影響下游使用,如果配置了ETL切分策略或啟發式觸發了ETL切分策略,就會使得Driver讀取DataNode元數據太大,進而導致頻繁GC,使得計算Partition的時間太長難以接受。.config("hive.exec.orc.default.stripe.size", 268435456L)// 總共有三種策略{"HYBRID", "BI", "ETL"}), 默認是"HYBRID","This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when spending little more time in split generation is acceptable (split generation reads and caches file footers). HYBRID chooses between the above strategies based on heuristics."),// 如果不配置,當orc文件大小大于spark框架估算的平均值256M時,會觸發ETL策略,導致Driver讀取DataNode數據切分split花費大量的時間。.config("hive.exec.orc.split.strategy", "BI").enableHiveSupport().getOrCreate()Spark Shuffle六大問題 fetch操作、數據存儲、文件個數、什么排序算法簡單介紹
MapReduce過程詳解及其性能優化
總結
以上是生活随笔為你收集整理的spark SQL读取ORC文件从Driver启动到开始执行Task(或stage)间隔时间太长(计算Partition时间太长)且产出orc单个文件中stripe个数太多问题解决方案...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何下载一个物种的全部EST序列 | N
- 下一篇: SSRF(服务端请求伪造)