日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark 小文件合并优化实践

發布時間:2023/12/20 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark 小文件合并优化实践 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • 背景
  • 一些嘗試
    • MergeTable
    • 性能優化
  • 后記

對 spark 任務數據落地(HDFS) 碎片文件過多的問題的優化實踐及思考。

背景

此文是關于公司在 Delta Lake 上線之前對Spark任務寫入數據產生碎片文件優化的一些實踐。

  • 形成原因
    數據在流轉過程中經歷 filter/shuffle 等過程后,開發人員難以評估作業寫出的數據量。即使使用了 Spark 提供的AE功能,目前也只能控制 shuffle read 階段的數據量,寫出數據的大小實際還會受壓縮算法及格式的影響,因此在任務運行時,對分區的數據評估非常困難。

    • shuffle 分區過多過碎,寫入性能會較差且生成的小文件會非常多。
    • shuffle 分區過少過大,則寫入并發度可能會不夠,影響任務運行時間。
  • 不利影響
    在產生大量碎片文件后,任務數據讀取的速度會變慢(需要尋找讀入大量的文件,如果是機械盤更是需要大量的尋址操作),同時會對 hdfs namenode 內存造成很大的壓力。

在這種情況下,只能讓業務/開發人員主動的合并下數據或者控制分區數量,提高了用戶的學習及使用成本,往往效果還非常不理想。
既然在運行過程中對最終落地數據的評估如此困難,是否能將該操作放在數據落地后進行?對此我們進行了一些嘗試,希望能自動化的解決/緩解此類問題。

一些嘗試

大致做了這么一些工作:

  • 修改 Spark FileFormatWriter 源碼,數據落盤時,記錄相關的 metrics,主要是一些分區/表的記錄數量和文件數量信息。
  • 在發生落盤操作后,會自動觸發碎片文件檢測,判斷是否需要追加合并數據任務。
  • ?實現一個 MergeTable 語法用于合并表/分區碎片文件,通過系統或者用戶直接調用。
  • 第1和第2點主要是平臺化的一些工作,包括監測數據落盤,根據采集的 metrics 信息再判斷是否需要進行 MergeTable 操作,下文是關于 MergeTable 的一些細節實現。

    MergeTable

    功能:

  • 能夠指定表或者分區進行合并
  • 合并分區表但不指定分區,則會遞歸對所有分區進行檢測合并
  • ?指定了生成的文件數量,就會跳過規則校驗,直接按該數量進行合并
  • 語法:

    merge table [表名] [options (fileCount=合并后文件數量)] --非分區表 merge table [表名] PARTITION (分區信息) [options (fileCount=合并后文件數量)] --分區表

    碎片文件校驗及合并流程圖?:

    性能優化

    對合并操作的性能優化

  • 只合并碎片文件
    如果設置的碎片閾值是128M,那么只會將該表/分區內小于該閾值的文件進行合并,同時如果碎片文件數量小于一定閾值,將不會觸發合并,這里主要考慮的是合并任務存在一定性能開銷,因此允許系統中存在一定量的小文件?。

  • 分區數量及合并方式
    定義了一些規則用于計算輸出文件數量及合并方式的選擇,獲取任務的最大并發度 maxConcurrency 用于計算數據的分塊大小,再根據數據碎片文件的總大小選擇合并(coalesce/repartition)方式。

    • 開啟 dynamicAllocation
      maxConcurrency = spark.dynamicAllocation.maxExecutors * spark.executor.cores
    • 未開啟 dynamicAllocation
      maxConcurrency = spark.executor.instances * spark.executor.cores

    以幾個場景為例對比優化前后?的性能:
    ? 場景1:最大并發度100,碎片文件數據100,碎片文件總大小100M,如果使用 coalesce(1),將會只會有1個線程去讀/寫數據,改為 repartition(1),則會有100個并發讀,一個線程順序寫。性能相差100X。

    ? 場景2:最大并發度100,碎片文件數量10000,碎片文件總大小100G,如果使用 repartition(200),將會導致100G的數據發生 shuffle,改為 coalesce(200),則能在保持相同并發的情況下避免 200G數據的IO。

    ? 場景3:最大并發度200,碎片文件數量10000,碎片文件總大小50G,如果使用 coalesce(100),會保存出100個500M文件,但是會浪費一半的計算性能,改為 coalesce(200),合并耗時會下降為原來的50%。

    上述例子的核心都是在充分計算資源的同時避免不必要的IO。

  • 修復元數據
    因為 merge 操作會修改數據的創建及訪問時間,所以在目錄替換時需要將元數據信息修改到 merge 前的一個狀態,該操作還能避免冷數據掃描的誤判。最后還要調用 refresh table 更新表在 spark 中的狀態緩存。?

  • commit 前進行校驗
    在最終提交前對數據進行校驗,判斷合并前后數據量是否發生變化(從數據塊元數據中直接獲取數量,避免發生IO),存在異常則會進行回滾,放棄合并操作。?

  • 數據寫入后,自動合并效果圖:

    后記

    收益
    該同步合并的方式已經在我們的線上穩定運行了1年多,成功的將平均文件大小從150M提升到了270M左右,提高了數據讀取速度,與此同時 Namenode 的內存壓力也得到了極大緩解。

    ?對 MergeTable 操作做了上述的相關優化后,根據不同的數據場景下,能帶來數倍至數十倍的性能提升。

    缺陷
    因為采用的是同步合并的方式,由于沒有事務控制,所以在合并過程中數據不可用,這也是我們后來開始引入 D?elta Lake 的一個原因。

    總結

    以上是生活随笔為你收集整理的Spark 小文件合并优化实践的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。