MaxCompute(ODPS)上处理非结构化数据的Best Practice
2019獨角獸企業重金招聘Python工程師標準>>>
摘要:?隨著MaxCompute(ODPS)2.0的上線,新增的非結構化數據處理框架也推出一系列的介紹文章,包括 MaxCompute上如何訪問OSS數據, 基本功能用法和整體介紹,側重介紹讀取OSS數據進行計算處理; 本文:MaxCompute(ODPS)上處理非結構化數據的Best Practice。
隨著MaxCompute(ODPS)2.0的上線,新增的非結構化數據處理框架也推出一系列的介紹文章,包括
1、MaxCompute上如何訪問OSS數據, 基本功能用法和整體介紹,側重介紹讀取OSS數據進行計算處理;
2、MaxCompute上處理非結構化數據的Best Practice。 基于非結構化框架實現原理,提供一些最佳實踐總結;
3、MaxCompute訪問TableStore(OTS) 數據, 著重介紹通過非結構化框架來訪問計算KV(TableStore/OTS)數據;
4、MaxCompute到OSS的非結構化數據輸出(及圖像處理實例):介紹了非結構化輸出功能,并通過圖像處理等范例,說明怎樣通過MaxCompute的計算能力,打通整個OSS -> MaxCompute -> OSS的數據處理閉環;
5、如何在MaxCompute上處理存儲在OSS上的開源格式數據, 介紹對于存儲在OSS上的常見開源數據(ORC, PARQUET, AVRO等)格式,如何通過非結構化框架進行處理。
本文是這系列中的第【2】篇。
?前言
隨著MaxCompute(原ODPS)非結構化數據處理框架的推出,在SQL線上打通了MaxCompute與OSS數據之間的計算數據連接生態,我們看到了視頻,圖像,音頻以及基因,氣象等各種各種各樣數據在MaxCompute平臺上實現了與傳統結構化數據的無縫融合。之前我們提供了在MaxCompute非結構化框架處理OSS上數據的整體介紹,在基本功能實現后,我們收到用戶許多關于優化和怎樣最好的使用非結構化功能的問題。 這里通過分析非結構化框架底層的一些實現原理以及我們看到的一些使用場景,提供一些關于Best Practice的總結,方便大家更有效的在MaxCompute中處理各種數據。
1. 數據在OSS上的存儲
1.1 OSS LOCATION 的選擇
MaxCompute通過在EXTERNAL TABLE上的LOCATION cluase來指定需要處理的OSS數據地址【注:本文假設用戶對于非結構化框架,包括EXTERNABLE TABLE, StorageHanlder等的定義等都有比較好的了解,相關細節這里不再具體說明。 有疑問可以先參考之前的基本功能介紹】。其中LOCATION將指向一個OSS的一個目錄(或者更準確的說,是一個以‘/’結尾的地址),其中LOCATION為標準URI格式:
LOCATION 'oss://${endpoint}/${bucket}/${userPath}/'?對于數據安全比較敏感的場景,比如在多用戶場景或者公共云上,則推薦采用上述方式,不再LOCATION上使用AK,而是通過STS/RAM體系事先進行鑒權(參見基本功能介紹)。
LOCATION的選擇有幾點要注意:
- 不允許使用oss的root bucket作為LOCATION, 也就是說${userPath}不可以為空,這個要求源自OSS對root bucket下存放內容的一些限制。
- LOCATION不能指向一個單獨文件,也就是說,類似oss://oss-cn-hangzhou.aliyuncs.com/mybucket/directory/data.csv?這種LOCATION是無效的。 如果只有一個文件要處理,則應該提供該文件的父目錄。
1.2 數據文件的存儲和處理:小文件和大文件
在分布式計算系統中,文件的大小對于整個系統的運行效率,性能等都有比較大的相關性。 這里對MaxCompute對非結構化數據的相關處理機制做一個介紹,并分析幾種有代表性的場景(e.g., 小文件和大文件),總結了幾個針對MaxCompute計算場景中,比較好的OSS文件存儲建議。
-
小文件:通常小文件往往伴隨著超大的文件數目,這對于分布式計算系統來說,有兩個問題:
- 大的文件數,會導致在進行文件分片時, 獲取文件宏信息的overhead較大,導致planning和分片比較耗時,比如一個100萬個文件的oss LOCATION, planning的耗時可能在分鐘以上的量級。
- 打開每個OSS文件是有ovehead的,碎片化的小文件會帶來額外的讀取開銷。 比如從OSS讀取1000個10KB大小的文件,相比讀取一個10MB的的文件,耗時可能在10倍以上。 對大量小文件的訪問將帶來整個分布式系統更多的網絡開銷,降低實際上有效的IO throughput。
-
大文件:與小文件相對的,是另外一個極端: 超大文件。 分布式系統的精髓是分而治之的思想:對數據進行分片,通過并發處理多個分片來加快海量數據的處理。 在極限情況下,如果海量數據存在一個無法被切割處理的單個文件中,那并發度就被降成為1,這樣子的“分布式系統”就失去了意義。 即使沒有那么極端,多個超大文件(比如每個幾十GB),對分布式系統也是不友好的:大的文件處理可能需要單獨占用大量系統資源,給資源調度帶來困難,另外還容易造成長尾,失敗重跑代價過高等問題。 所以從MaxCompute處理計算的角度,也不推薦在OSS上使用超大文件保存數據。
所以總體上不推薦在一個OSS目錄中存放過多的文件。 可以從另一個方面,考慮將Externable Table做partition,盡量在partition的子粒度上進行數據處理。 另外,在適用的場景下,可以考慮使用tar文件,比如把多個圖像文件打在一個tar文件中再保存到OSS上面。 如果是文本文件,MaxCompute的built-in StorageHandler (比如com.aliyun.odps.CsvStorageHandler或者com.aliyun.odps.TsvStorageHandler) 是能自動從tar文件中讀取數據的。 如果用戶自己定義的StorageHandler/Extractor,也可以在用戶代碼中使用Java中的tar處理類,比如直接使用Apache common 的TarArchiveInputStream來訪問。
總結一下, 作為一個整體上的指導原則,MaxCompute非結構框架推薦如下比較理想的OSS數據存儲方案:
數據文件根據應用特性,分文件夾存儲,不推薦一個文件夾中存儲10萬以上個文件。 可以考慮使用tar打包多個文件來作為降低物理文件數目的方法。
比較適中的文件大小以及均勻分布的數據文件,能更合理的使用各種系統資源, 從而提高分布式處理效率。 對MaxCompute非結構化框架而言,單個文件大小在1MB-2GB是比較理想的情況。
1.3 MaxCompute訪問OSS的網絡連通以及速度
MaxComput和OSS作為獨立的分布式計算和存儲服務,在不同的部署集群上的網絡連通性有可能影響MaxCompute訪問OSS的數據的可達性。 網絡的連通性整體服從七網隔離的原則,具體一點來說有幾點:
MaxCompute的公共云集群上的計算應該訪問OSS的外部集群,另外推薦需要訪問的OSS集群與MaxCompute計算集群在物理上盡量靠近。關于OSS公共云上的訪問域名以及對應數據中心可以參考OSS文檔。
在MaxCompute并發訪問OSS的情況下,一個需要特別注意的是OSS具有限流機制,默認情況下一個OSS賬號的訪問流量是限制在5Gb/s,也就是600MB/s左右。 在MaxComput的高并發度下(比如1000個以上的計算節點),OSS數據下載的速度可能將不再受限于單機網絡速度,而取決與OSS的總體流量限速。 在這種情況下,完全可能出現單個計算節點的下載速度低于1MB/s。 當然OSS的限流是可以特別配置的,如果有超大量的數據計算需求,可以聯系OSS團隊調高對應賬戶的具體的限流上限。
2. 在用戶自定義StorageHandler/Extractor中對輸入數據的處理
除了提供幾個內置的StorageHandler用來處理CSV, TSV以及Apache ORC文件以外,MaxCompute同時開發了非結構化Java SDK來方便用戶對數據進行解析和處理。 通過這樣的方法,擴展整個非結構化數據處理的生態,對接視頻,圖像,音頻,基因,氣象等數據處理的能力。 簡單的來說, MaxCompute封裝了分布式系統的細節,使用Java?InputStream?的一個增強子類來將做輸入數據與用戶代碼的對接。 這樣的接口設計區別于Hive的SerDe,?RowFormatter等多層封裝,提供了更自然的完全非結構化數據入口, 用戶能獲得原始數據流,用類似單機程序相似的邏輯進行處理。 當然,基于分布式系統的處理原則,還是有一些Best Practice推薦用戶遵守。
2.1 輸入數據流的處理模式
對于輸入數據流(InputStream),推薦在獲取數據bytes后能直接在內存中直接處理。?最理想的情況是,能針對輸入數據做流式的“邊讀邊計算”的處理。 當然,對于某些數據格式,由于數據本身的特性,很難做到完全的流式處理:比如對于某些圖片/音頻數據格式,一張文件必須完全讀入才能獲得正確的編碼信息以及其他特性,那這種情況下,在文件本身不是很大的情況下,可以把文件完全讀入本地內存,再行處理。 效率比較低的一種方式是把數據文件下載到本地,然后再通過FileStream讀取本地文件進行處理,這樣的處理模式有兩個問題:
2.2 三方庫使用
在非結構化數據的處理線上,經常遇到的一個需求是把單機的數據處理機制,通過MaxCompute非結構化數據框架,遷移到分布式系統上執行。 比如希望同過ffmpeg來直接讀取視頻數據,或者希望通過Netcdf-Java來直接處理氣象的netcdf/grib格式數據。 而這些三方庫往往有一些共同的特性/局限性,比如
- 可能是基于C/C++,所以需要通過JNI來運行native代碼
- 可能是面對單機實現,所以數據的入口經常是一個本地的文件地址
在這些情況下,非結構化框架均有對應的方式來支持。 比如在隔離打開的情況下允許JNI的使用,以及通過權限審批允許數據下載到本機臨時文件等等。 從長期來講,MaxCompute框架本身也認同使用native C/C++代碼庫,來處理各種特定的數據格式,將是無法避免的,所以會從框架本身安全等方面來解決這個問題,但是對于讀取數據到本地再做處理,從本質上是一種比較大的額外消耗,還是推薦通過直接處理輸入數據的方式來做,比如改動NETCDF-JAVA的實現,把輸入接口通過FilePath->FileStream改成直接使用InputStream等。
3. 結語
MaxCompute非結構化框架是隨著MaxCompute2.0推出的新功能,除了處理OSS上面的非結構化數據之外,最近也打通了與TableStore(OTS)的數據鏈路。 框架本身也還在不斷的發展和完善,包括和MaxCompute優化器以及和整個UDF框架更緊密的結合和擴展等等。 在這里先從現有系統的實現和我們收到的一些反饋,總結提煉了一些處理非結構化數據的最佳實踐,也希望得到更多的反饋,把框架功能做到更優。 后繼我們也會結合具體的使用場景,比如城市大腦上的離線視頻圖像處理等,來提供一些更具體的使用范例。
原文鏈接
轉載于:https://my.oschina.net/u/3735980/blog/1812730
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的MaxCompute(ODPS)上处理非结构化数据的Best Practice的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 网商贷逾期多久上征信记录,逾期一天就会上
- 下一篇: 第二章平稳时间序列模型——ACF和PAC