Apache Kafka / Spark流系统的性能调优
電信行業(yè)的實(shí)際案例研究
調(diào)試實(shí)際的分布式應(yīng)用程序可能是一項(xiàng)艱巨的任務(wù)。 至少在一開始,最常見的Google搜索并沒有什么用。 在這篇博客文章中,我將詳細(xì)介紹如何將Apache Kafka / Spark Streaming / Apache Ignite應(yīng)用程序加速近十倍,并將開發(fā)原型轉(zhuǎn)變?yōu)橛杏玫?#xff0c;穩(wěn)定的流媒體應(yīng)用程序,該應(yīng)用程序最終超過了性能目標(biāo)。為應(yīng)用程序。
此處學(xué)習(xí)的課程相當(dāng)籠統(tǒng),可以輕松地?cái)U(kuò)展到使用MapR Streams和Kafka的類似系統(tǒng)。
該項(xiàng)目是融合平臺(tái)需求的具體案例,該平臺(tái)集成了完整的軟件堆棧以支持該系統(tǒng)的需求:實(shí)時(shí)流,大數(shù)據(jù)分布式處理和持久性。 截至撰寫本文時(shí), MapR融合數(shù)據(jù)平臺(tái)是此類平臺(tái)當(dāng)前唯一可用的生產(chǎn)就緒型實(shí)現(xiàn)。
系統(tǒng)目標(biāo)
為了滿足電信公司的需求,該應(yīng)用程序的目標(biāo)是將來自三個(gè)獨(dú)立系統(tǒng)的日志數(shù)據(jù)結(jié)合在一起。 加入數(shù)據(jù)后,就可以將網(wǎng)絡(luò)條件與任何特定客戶的特定呼叫相關(guān)聯(lián),從而使客戶支持可以向不滿意其電話服務(wù)的客戶提供準(zhǔn)確和有用的信息。 如果該應(yīng)用程序可以實(shí)時(shí)進(jìn)行而不是作為批處理工作,則它具有巨大的附加價(jià)值,因?yàn)?個(gè)小時(shí)的呼叫質(zhì)量信息對(duì)客戶服務(wù)或網(wǎng)絡(luò)運(yùn)營(yíng)沒有實(shí)際價(jià)值。
基本上,這是一個(gè)相當(dāng)簡(jiǎn)單的ETL作業(yè),通常會(huì)作為數(shù)據(jù)倉(cāng)庫的批處理作業(yè)完成,但現(xiàn)在必須作為流式分布式體系結(jié)構(gòu)實(shí)時(shí)進(jìn)行。
更具體地說,總體情況是將來自遠(yuǎn)程服務(wù)器的輸入數(shù)據(jù)流式傳輸?shù)椒植际郊褐?#xff0c;進(jìn)行一些數(shù)據(jù)清理和擴(kuò)充,將三個(gè)日志中的記錄聯(lián)接在一起,并將聯(lián)接的數(shù)據(jù)作為單個(gè)表持久保存到數(shù)據(jù)庫中。
原始系統(tǒng)的問題
原始系統(tǒng)存在幾個(gè)圍繞性能和穩(wěn)定性的問題。
首先,流應(yīng)用程序不穩(wěn)定。 在Spark Streaming應(yīng)用程序中,如果每個(gè)微批處理的處理時(shí)間等于或小于批處理時(shí)間,則稱該流穩(wěn)定。 在這種情況下,應(yīng)用程序的流式傳輸部分正在30秒的窗口中接收數(shù)據(jù),但處理時(shí)間為4.5-6分鐘。
其次,有一個(gè)批處理過程,一次要一個(gè)小時(shí)連接一次數(shù)據(jù),目標(biāo)是在30分鐘內(nèi)運(yùn)行,但要花2個(gè)小時(shí)才能完成。
第三,應(yīng)用程序運(yùn)行了幾個(gè)小時(shí)后隨機(jī)崩潰。
集群硬件,軟件堆棧和輸入數(shù)據(jù)
集群硬件非常好,有12個(gè)企業(yè)服務(wù)器節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)配備兩個(gè)E5 Xeon CPU,每個(gè)CPU具有16個(gè)物理核心,256GB內(nèi)存和八個(gè)6TB旋轉(zhuǎn)硬盤。 網(wǎng)絡(luò)是10GB以太網(wǎng)。
為該項(xiàng)目選擇的技術(shù)棧圍繞著Kafka 0.8(用于將數(shù)據(jù)流式傳輸?shù)较到y(tǒng)中),Apache Spark 1.6(用于ETL操作)(本質(zhì)上是對(duì)輸入進(jìn)行過濾和轉(zhuǎn)換,然后進(jìn)行聯(lián)接)以及使用Apache Ignite進(jìn)行。 1.6作為內(nèi)存共享緩存,可以很容易地將應(yīng)用程序的流輸入部分與數(shù)據(jù)連接起來。 如果發(fā)生故障,Apache Hive還可以用作Ignite的磁盤備份以及單獨(dú)的分析應(yīng)用程序。
初始集群的配置如下:
| 節(jié)點(diǎn) | k | 神經(jīng)網(wǎng)絡(luò) | HDFS | 梅索斯 | Mesos Master | 卡夫卡 | 火花工人 | 點(diǎn)燃 |
| 1個(gè) | X | X | X | X | X | X | X | |
| 2 | X | X | X | X | X | X | X | |
| 3 | X | X | X | X | X | X | ||
| … | X | X | X | X | X | |||
| 7 | X | X | X | X | X | |||
| 8 | X | X | X | X | X | |||
| … | X | X | X | X | ||||
| 12 | X | X | X | X |
該集群運(yùn)行Apache Hadoop的HDFS作為分布式存儲(chǔ)層,資源由Mesos 0.28管理。 最后,HBase用作最終聯(lián)接數(shù)據(jù)的最終數(shù)據(jù)存儲(chǔ)。 該項(xiàng)目范圍之外的其他系統(tǒng)將查詢?cè)撐募?
系統(tǒng)的性能要求是處理高達(dá)3GB / min或150-200,000個(gè)事件/秒的輸入吞吐量,代表已知的峰值數(shù)據(jù)吞吐量以及額外的余量。 普通吞吐量約為該值的一半,即1.5GB /分鐘和60,000-80,000個(gè)事件/秒。
原始數(shù)據(jù)源是三個(gè)遠(yuǎn)程系統(tǒng)的日志,此處標(biāo)記為A,B和C:日志A占條目的約84-85%,日志B約占1-2%,日志C約占14-15%。 數(shù)據(jù)不平衡這一事實(shí)是此應(yīng)用程序中(許多)困難的來源之一。
Spark應(yīng)用程序均使用Scala 2.10和Kafka的直接方法 (無接收器)進(jìn)行編碼。 Apache Ignite具有一個(gè)非常好的Scala API和一個(gè)神奇的IgniteRDD ,它可以使應(yīng)用程序共享內(nèi)存數(shù)據(jù),這是該系統(tǒng)降低編碼復(fù)雜性的關(guān)鍵功能。
應(yīng)用架構(gòu)
單個(gè)Kafka生產(chǎn)者將原始數(shù)據(jù)攝入到運(yùn)行在6臺(tái)服務(wù)器上的Kafka中。 生產(chǎn)者讀取各種日志,并將每個(gè)日志的記錄添加到其自己的主題中。 由于有三個(gè)日志,因此有三個(gè)Kafka主題。 每個(gè)主題分為36個(gè)分區(qū)。 最有可能存在36個(gè)分區(qū),因?yàn)橛?個(gè)節(jié)點(diǎn),每6個(gè)磁盤分配給HDFS,Kafka文檔似乎建議每個(gè)物理磁盤大約有一個(gè)分區(qū)作為指導(dǎo)。
Spark Streaming應(yīng)用程序使用數(shù)據(jù),該應(yīng)用程序選擇每個(gè)主題,然后執(zhí)行簡(jiǎn)單的過濾器以切出不必要的字段,進(jìn)行映射操作以轉(zhuǎn)換數(shù)據(jù)和執(zhí)行foreachRDD操作(每個(gè)微批處理在Spark Streaming中生成RDD)將數(shù)據(jù)保存到Ignite和Hive。
流媒體應(yīng)用程序非常簡(jiǎn)單:映射,過濾器和foreach分區(qū),保存到Ignite
第二個(gè)“常規(guī)” Spark應(yīng)用程序運(yùn)行在Ignite存儲(chǔ)在內(nèi)存中的數(shù)據(jù)上,以1小時(shí)為批處理將來自三個(gè)單獨(dú)日志的記錄合并到一個(gè)表中。 這項(xiàng)工作是使用Spark的DataFrame API完成的,該API非常適合該任務(wù)。 第二部分涉及不超過100GB的數(shù)據(jù),并且群集硬件的大小適當(dāng)以處理該數(shù)量的數(shù)據(jù)。
三個(gè)小時(shí)的數(shù)據(jù)被累積到Ignite中,因?yàn)榻^大多數(shù)調(diào)用持續(xù)不到一個(gè)小時(shí),并且我們希望一次對(duì)一個(gè)小時(shí)的數(shù)據(jù)進(jìn)行連接。 由于某些呼叫將在一批中開始而在另一批中完成,因此系統(tǒng)將保留三個(gè)小時(shí)并且僅處理一個(gè)小時(shí)的中間批次,因此聯(lián)接可以成功處理接近100%的記錄。
值得一提的是,更好的全流架構(gòu)可以避免中間表示形式的出現(xiàn)。 具有更多時(shí)間和事前思考能力的示例性現(xiàn)實(shí)案例可以更快地結(jié)束整個(gè)項(xiàng)目,而不是全神貫注地編寫第一個(gè)可行的解決方案。
性能調(diào)優(yōu)
這些應(yīng)用程序的主要問題是由于試圖運(yùn)行開發(fā)系統(tǒng)的代碼而造成的,這些代碼在運(yùn)行于真實(shí)數(shù)據(jù)的物理,本地群集上的AWS實(shí)例上進(jìn)行了測(cè)試。 從來沒有授予原始開發(fā)人員訪問生產(chǎn)集群或真實(shí)數(shù)據(jù)的權(quán)限。
Apache Ignite是一個(gè)巨大的問題源,主要是因?yàn)樗且粋€(gè)新項(xiàng)目,沒有人有任何實(shí)際經(jīng)驗(yàn),而且還因?yàn)樗皇且粋€(gè)非常成熟的項(xiàng)目。
Spark Streaming應(yīng)用程序在大約4.5分鐘內(nèi)運(yùn)行,并且項(xiàng)目目標(biāo)是在30秒內(nèi)運(yùn)行。 我們需要找到9倍于加速的改進(jìn)價(jià)值,并且由于時(shí)間限制,我們無法更改任何代碼!
該系統(tǒng)必須在一周內(nèi)準(zhǔn)備好進(jìn)行生產(chǎn)測(cè)試,因此從體系結(jié)構(gòu)和算法角度出發(fā)的代碼被假定為正確且足夠好,以至于我們只有通過調(diào)整才能達(dá)到性能要求。
修復(fù)RPC超時(shí)異常
我們從具有相同問題的人那里找到了正確的解決方案,如JIRA的SPARK-14140中所示 。 他們建議將spark.executor.heartbeatInterval從10s增加到20s。
我認(rèn)為這個(gè)問題可能是由于Kafka,Ignite或垃圾收集器暫停而導(dǎo)致節(jié)點(diǎn)由于磁盤繁忙或CPU高峰而變得忙碌所致。 由于Spark在所有節(jié)點(diǎn)上運(yùn)行,因此問題是隨機(jī)的。 (請(qǐng)參閱第一部分中的集群服務(wù)布局表。)
配置更改完全解決了此問題。 從那以后我們?cè)僖矝]有看到過。
增加驅(qū)動(dòng)程序和執(zhí)行程序的內(nèi)存
通過將內(nèi)存從每個(gè)執(zhí)行者20g增加到每個(gè)執(zhí)行者40g以及驅(qū)動(dòng)程序40g,解決了內(nèi)存不足問題和應(yīng)用程序的隨機(jī)崩潰。 令人高興的是,生產(chǎn)集群中的機(jī)器配備了大量?jī)?nèi)存。 對(duì)于新應(yīng)用程序,這是一個(gè)好習(xí)慣,因?yàn)槟婚_始不知道需要多少。
由于Spark UI報(bào)告的內(nèi)存消耗非常小,因此很難精確地調(diào)試該問題,缺乏準(zhǔn)確的信息。 實(shí)際上,由于此設(shè)置易于更改,因此根據(jù)經(jīng)驗(yàn),我們將40g作為使應(yīng)用程序穩(wěn)定運(yùn)行的最小內(nèi)存大小。
增加并行度:增加Kafka中的分區(qū)數(shù)量
輸入數(shù)據(jù)不平衡,大部分應(yīng)用程序處理時(shí)間都花在處理主題1(吞吐量的85%)上。 Kafka分區(qū)與輸入RDD中的分區(qū)數(shù)量進(jìn)行1:1匹配,導(dǎo)致只有36個(gè)分區(qū),這意味著我們只能讓36個(gè)核心忙于此任務(wù)。 為了增加并行度,我們需要增加分區(qū)數(shù)。 因此,我們將主題1分為12個(gè)主題,每個(gè)主題有6個(gè)分區(qū),總共72個(gè)分區(qū)。 我們對(duì)生產(chǎn)者進(jìn)行了簡(jiǎn)單的修改,將第一個(gè)日志中的數(shù)據(jù)平均分為12個(gè)主題,而不僅僅是一個(gè)。 消費(fèi)者方需要修改零代碼。
我們還根據(jù)其他兩個(gè)主題在輸入數(shù)據(jù)中的相對(duì)重要性,適當(dāng)調(diào)整了分區(qū)數(shù)的大小,因此我們將主題2設(shè)置為2,將主題3設(shè)置為8。
并行運(yùn)行更多任務(wù)。 調(diào)整之前,每個(gè)階段始終有36個(gè)分區(qū)!
調(diào)整執(zhí)行程序的大小
原始應(yīng)用程序僅運(yùn)行3個(gè)執(zhí)行程序,共有72個(gè)內(nèi)核。 我們將應(yīng)用程序配置為以80個(gè)內(nèi)核運(yùn)行,每個(gè)執(zhí)行者最多10個(gè)內(nèi)核,總共8個(gè)執(zhí)行者。 請(qǐng)注意,在10個(gè)節(jié)點(diǎn)的集群中,每個(gè)節(jié)點(diǎn)具有16個(gè)實(shí)際核心,我們?yōu)镵afka代理,Ignite和HDFS / NN留下了足夠的資源。
將批處理窗口從30s增加到1m
生產(chǎn)者每隔30秒將數(shù)據(jù)分批推送到Kafka,因?yàn)樗峭ㄟ^FTP批處理從遠(yuǎn)程系統(tǒng)收集的。 由于需要處理制造商,技術(shù)和年齡的困惑范圍內(nèi)的設(shè)備和系統(tǒng),因此這種布置在電信應(yīng)用中很常見。
這意味著輸入流非常不完整,如Spark UI的“流”選項(xiàng)卡的屏幕截圖所示:
將窗口增加到1m可使我們平滑輸入,并使系統(tǒng)有機(jī)會(huì)在1分鐘或更短的時(shí)間內(nèi)處理數(shù)據(jù),但仍保持穩(wěn)定。
為了確保這一點(diǎn),該團(tuán)隊(duì)生成了一個(gè)測(cè)試數(shù)據(jù),該數(shù)據(jù)模擬了已知的最壞情況數(shù)據(jù),并且使用新設(shè)置,火花流工作現(xiàn)在確實(shí)很穩(wěn)定。 該團(tuán)隊(duì)還能夠輕松地在測(cè)試數(shù)據(jù)和實(shí)際生產(chǎn)數(shù)據(jù)流之間進(jìn)行切換,并通過限制生產(chǎn)者來配置要傳入系統(tǒng)的數(shù)據(jù)量。 這對(duì)于快速測(cè)試各種配置并查看我們是否取得了進(jìn)展非常有幫助。
刪除要求保存到Hive,僅使用Ignite
與項(xiàng)目經(jīng)理的討論表明,Hive實(shí)際上并不是流應(yīng)用程序需求的一部分! 主要是因?yàn)镠Base中的數(shù)據(jù)可以輕松地被分析使用。 同樣,在此應(yīng)用程序的上下文中,每個(gè)單獨(dú)的記錄實(shí)際上都不需要100%保證地進(jìn)行處理。
確實(shí),根據(jù)系統(tǒng)的目標(biāo),丟失數(shù)據(jù)的最壞情況是無法找到客戶的呼叫質(zhì)量信息……情況已經(jīng)如此。 換句話說,數(shù)據(jù)丟失的風(fēng)險(xiǎn)不是破壞交易的因素,而獲得數(shù)據(jù)的好處是更多的見解。 只要處理和存儲(chǔ)絕大多數(shù)數(shù)據(jù),就可以實(shí)現(xiàn)業(yè)務(wù)目標(biāo)。
所有優(yōu)化的結(jié)果
流媒體應(yīng)用程序最終穩(wěn)定下來,優(yōu)化運(yùn)行時(shí)間為30-35s。
事實(shí)證明,淘汰Hive還加快了將數(shù)據(jù)連接在一起的第二個(gè)Spark應(yīng)用程序的運(yùn)行,因此它現(xiàn)在的運(yùn)行時(shí)間為3500萬,這意味著這兩個(gè)應(yīng)用程序現(xiàn)在都符合項(xiàng)目要求。
隨著下一部分的改進(jìn),Spark Streaming作業(yè)的最終性能下降到20s的較低范圍,最終加速了12倍以上。
我們必須在穩(wěn)定性方面下大力氣。 需要采取幾種策略,如下所述。
使Spark Streaming應(yīng)用程序穩(wěn)定
我們?yōu)樾迯?fù)性能所做的工作直接影響了系統(tǒng)的穩(wěn)定性。 如果兩個(gè)應(yīng)用程序本身都穩(wěn)定并且在適當(dāng)大小的資源上運(yùn)行,則系統(tǒng)最有可能總體上保持穩(wěn)定。
刪除Mesos并使用Spark Standalone
Mesos最初選擇管理資源是前瞻性的,但最終我們決定將其從最終生產(chǎn)系統(tǒng)中刪除。 首先,計(jì)劃是讓Mesos管理所有應(yīng)用程序。 但是團(tuán)隊(duì)永遠(yuǎn)無法讓Kafka和Ignite與Mesos保持良好的合作關(guān)系,因此他們以獨(dú)立模式運(yùn)行,僅由Spark由Mesos管理。 當(dāng)然,隨著時(shí)間的推移,毫無疑問,所有應(yīng)用程序都可以正確配置為與Mesos一起使用。
提議刪除Mesos有點(diǎn)爭(zhēng)議,因?yàn)镸esos比在獨(dú)立模式下運(yùn)行的Spark更先進(jìn),更酷。
但是Mesos的問題是雙重的:
如果Mesos實(shí)際控制資源,則它只能分配好資源。 就此系統(tǒng)而言,Kafka和Ignite的運(yùn)行超出了Mesos的知識(shí)范圍,這意味著它將錯(cuò)誤地將資源分配給Spark應(yīng)用程序。
此外,它是一個(gè)單一用途的集群,因此我們可以使用系統(tǒng)資源的全局視圖為每個(gè)應(yīng)用程序自定義資源的大小。 幾乎不需要?jiǎng)討B(tài)資源分配,調(diào)度隊(duì)列,多租戶和其他流行語。
更改點(diǎn)燃記憶模型
一個(gè)已知的問題是,當(dāng)由JVM控制的堆變得很大(> 32GB)時(shí),垃圾回收的成本會(huì)很大。 當(dāng)加入應(yīng)用程序運(yùn)行時(shí),我們確實(shí)可以看到這個(gè)問題:25GB隨機(jī)播放的階段中有些行的GC時(shí)間峰值很大,從10秒到超過一分鐘不等。
Ignite的初始配置是運(yùn)行ONHEAP_TIERED,并在堆上緩存48GB的數(shù)據(jù),然后溢出降至12GB的堆外內(nèi)存。 該設(shè)置已更改為OFFHEAP_TIERED模型。 盡管由于序列化成本而稍慢,但是OFFHEAP_TIERED不會(huì)導(dǎo)致大量垃圾回收。 它仍然在內(nèi)存中運(yùn)行,因此我們估計(jì)這將是凈收益。
進(jìn)行此更改后,每個(gè)批次的運(yùn)行時(shí)間從30秒降低到了約25秒,減少了約5秒鐘。 此外,連續(xù)的批處理往往具有更多相似的處理時(shí)間,增量為1-3秒,而先前的變化會(huì)超過5至10秒。
更新Ignite JVM設(shè)置
我們遵循了Ignite文檔的性能調(diào)整部分( http://apacheignite.gridgain.org/docs/jvm-and-system-tuning )中推薦的JVM選項(xiàng)。
完善Spark代碼
代碼的某些部分假定可靠性,例如對(duì)Ignite的查詢,而實(shí)際上卻存在操作失敗的可能性。 這些問題可以在代碼中解決,現(xiàn)在可以更優(yōu)雅地處理異常,盡管可能還有很多工作可以提高代碼的健壯性。 我們只能通過立即運(yùn)行該應(yīng)用程序來找到這些位置。
將ZooKeeper重新分配給節(jié)點(diǎn)10-12
鑒于群集是中型的,因此有必要盡可能多地?cái)U(kuò)展服務(wù)。 我們將ZooKeeper服務(wù)從節(jié)點(diǎn)1-3移到了節(jié)點(diǎn)10-12。
結(jié)論
調(diào)整此應(yīng)用程序大約需要1周的全職工作。 我們使用的主要信息是Spark UI和Spark日志,可以從Spark UI輕松訪問。 作業(yè)和階段以及流UI的視圖確實(shí)非常有用。
我學(xué)到的是
- 將流應(yīng)用程序從AWS上的原型遷移到本地集群需要安排測(cè)試時(shí)間
- 不使用真實(shí)數(shù)據(jù)測(cè)試AWS原型是一個(gè)大錯(cuò)誤
- 包括許多對(duì)可靠性要求很高的“出血邊緣” OSS組件(Apache Ignite和Mesos)是不現(xiàn)實(shí)的
- 更好的架構(gòu)設(shè)計(jì)可以極大地簡(jiǎn)化系統(tǒng)
- 調(diào)整Kafka / Spark Streaming應(yīng)用程序需要對(duì)整個(gè)系統(tǒng)有一個(gè)整體的了解。 這不僅僅是改變Spark的參數(shù)值; 它是數(shù)據(jù)流特征,應(yīng)用程序目標(biāo)和對(duì)客戶的價(jià)值,硬件和服務(wù),應(yīng)用程序代碼,然后使用Spark參數(shù)的組合。
- MapR融合數(shù)據(jù)平臺(tái)將減少該項(xiàng)目的開發(fā)時(shí)間,復(fù)雜性和成本。
該項(xiàng)目是這家特定電信公司的第一個(gè)項(xiàng)目,他們決定全力開發(fā)這種先進(jìn)的100%開放源代碼平臺(tái)。 他們的開拓精神應(yīng)受到贊揚(yáng)。 但是,更好的平臺(tái)和應(yīng)用程序體系結(jié)構(gòu)選擇將使他們的生活更加輕松。
現(xiàn)在需要融合的大數(shù)據(jù)平臺(tái)
實(shí)際上,該項(xiàng)目的需求表明了現(xiàn)實(shí)世界中對(duì)具有最新的融合平臺(tái)的業(yè)務(wù)需求,該平臺(tái)具有快速分布式文件系統(tǒng),用于持久性的高性能鍵值存儲(chǔ)以及實(shí)時(shí)流功能。
由于該架構(gòu)所需的完整軟件堆棧已經(jīng)內(nèi)置并得到完全支持,因此MapR解決方案可能會(huì)跳過對(duì)仍然投機(jī)的開源項(xiàng)目(如Ignite)的要求。 鑒于該系統(tǒng)已開始為具有24/7可靠性預(yù)期的電信運(yùn)營(yíng)商量產(chǎn),因此這一優(yōu)勢(shì)非常可觀。
翻譯自: https://www.javacodegeeks.com/2017/01/performance-tuning-apache-kafkaspark-streaming-system.html
總結(jié)
以上是生活随笔為你收集整理的Apache Kafka / Spark流系统的性能调优的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 微信之前删除的好友怎么找回
- 下一篇: Java平台模块系统公众审查未能通过