日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

徒手打造基于Spark的数据工厂(Data Factory):从设计到实现

發布時間:2023/12/4 编程问答 49 豆豆
生活随笔 收集整理的這篇文章主要介紹了 徒手打造基于Spark的数据工厂(Data Factory):从设计到实现 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在大數據處理和人工智能時代,數據工廠(Data Factory)無疑是一個非常重要的大數據處理平臺。市面上也有成熟的相關產品,比如Azure Data Factory,不僅功能強大,而且依托微軟的云計算平臺Azure,為大數據處理提供了強大的計算能力,讓大數據處理變得更為穩定高效。由于工作中我的項目也與大數據處理相關,于是我就在思考,是否自己也可以設計打造一個數據工廠,以便尋求一些技術痛點的解決方案,并且引入一些有趣的新功能。因此,我利用業余時間,逐步打造了一個基于Spark的數據工廠,并取名為Abacuza(Abacus是中國的“算盤”的意思,隱喻它是一個專門做數據計算的平臺,使用“算盤”一詞的變體,也算是體現一點中國元素吧)。說是基于Spark,其實從整個架構來看,Abacuza并不一定非要基于Spark,只需要為其定制某種數據處理引擎的插件即可,所以,Spark其實僅僅是Abacuza的一個插件,當然,Spark是目前主流的數據處理引擎,Abacuza將其作為默認的數據處理插件。?Abacuza是開源的,項目地址是:https://github.com/daxnet/abacuza。徒手打造?是的,沒錯,從前端界面都后端開發,從代碼到持續集成,再到部署腳本和SDK與容器鏡像的發布,都是自己一步步弄出來的。項目主頁上有一個簡單的教程,后面我會詳細介紹一下。在介紹如何使用Abacuza之前,我們先了解一下它的整體架構和設計思想。雖然目前Abacuza還有很多功能沒有完成,但并不影響整個數據處理流程的執行。

整體架構

Abacuza和其它的數據工廠平臺一樣,它的業務流程就是分三步走:數據讀入、數據處理、結果輸出。Abacuza的整體架構圖就很清楚地體現了這個業務流程:?

(點擊查看大圖)

數據輸入部分

數據的輸入是由輸入端點(Input Endpoints)來定義的。Abacuza支持多種數據類型的輸入:CSV文件、JSON文件、TXT文本文件、Microsoft SQL Server(暫未完全實現)以及S3的對象存儲路徑,今后還可以繼續擴展輸入端點,以支持基于管道(Pipeline)的數據處理流程,這樣一來,用戶就不需要自己使用C#或者Scala來編寫數據處理的邏輯代碼,只需要一套JSON文件進行Pipeline定義就可以了。

數據處理部分

當數據輸入已經定義好以后,Abacuza會根據Input Endpoint的設置,將數據讀入,然后轉交給后端的數據處理集群(Cluster)進行處理。Abacuza可以以插件的形式支持不同類型的集群,如上文所說,Apache Spark是Abacuza所支持的一種數據處理集群,在上面的架構圖中可以看到,Abacuza Cluster Service管理這些集群,工作任務調度器(Job Scheduler)會通過Abacuza Cluster Service將數據處理任務分配到指定類型的集群上進行處理。對于Spark而言,具體的數據處理邏輯是由用戶自己編寫代碼實現的。Spark原生支持Scala,也可以使用PySpark,Abacuza使用Microsoft .NET for Spark項目實現從.NET到Spark的綁定(Binding),用戶可以使用C#來編寫Spark的數據處理邏輯,后面的演練部分我會詳細介紹。那么與Scala相比,通過.NET for Spark使用C#編寫的數據處理程序會不會有性能問題?嗯,會有點性能問題,請看下圖(圖片來源:微軟.NET for Spark官方網站):?

在這個Benchmark中,處理相同總量的數據,Scala使用了375秒,.NET花了406秒,Python使用433秒,雖然與Scala相比有些差距,但是比Python要好一些。但是不用擔心,如果在你的應用場景中,性能是放在第一位的,那么Abacuza的Job Runner機制允許你使用Scala編寫數據處理程序,然后上傳到Spark集群執行(也就是你不需要依賴于.NET和C#)。

數據輸出部分

與數據輸入部分類似,處理之后的數據輸出方式是由輸出端點(Output Endpoints)來定義的。Abacuza也支持多種數據輸出方式:將結果打印到日志、將結果輸出到外部文件系統以及將結果輸出到當前項目所在的S3對象存儲路徑。無論是數據輸入部分還是輸出部分,這些端點都是可以定制的,并且可以通過ASP.NET Core的插件系統以及docker-compose或者Kubernetes的volume/Block Storage來實現動態加載。

相關概念和運作機理

Abacuza有以下這些概念:

  • 集群(Cluster):一個集群是一個完整的大數據處理平臺,比如Apache Spark

  • 集群類型(Cluster Type):定義集群的類型,例如,運行在localhost的Spark集群和運行在云端的Spark集群都是Spark集群,那么它們的集群類型就是spark。

  • 集群連接(Cluster Connection):定義了Abacuza數據工廠訪問集群的方式,類似于數據庫系統的連接字符串

  • 任務執行器(Job Runner):定義了數據處理任務應該如何被提交到集群上執行。它可以包含具體的數據處理業務邏輯

  • 輸入端點(Input Endpoint):定義了原始數據(需要被處理的數據)的來源

  • 輸出端點(Output Endpoint):定義了處理完成后的數據的輸出方式

  • 項目(Project):一種類型數據處理任務的邏輯定義,它包括多個輸入端點、一個輸出端點以及多個數據處理版本(Revision)的信息,同時它還定義了應該使用哪個任務執行器來執行數據處理任務

  • 數據處理版本(Revision):它歸屬于一個特定的項目,表示不同批次的數據處理結果

  • 當一個用戶準備使用Abacuza完成一次大數據處理的任務時,一般會按照下面的步驟進行:

  • 使用用戶名/密碼(暫時只支持用戶名密碼登錄)登錄Abacuza的管理界面

  • 基于一個已經安裝好的集群(比如Apache Spark),配置它的集群類型集群連接,用來定義Abacuza與該集群的通信方式(集群和集群連接定義了數據應該在哪里被處理(where))

  • 定義任務執行器,在任務執行器中,設置運行數據處理任務的集群類型,當數據處理任務被提交時,Abacuza Cluster Service會基于所選的集群類型,根據一定的算法來選擇一個集群進行數據處理。任務執行器中也定義了數據處理的邏輯,(比如,由Scala、C#或者Python編寫的應用程序,可以上傳到spark類型的集群上運行)。簡單地說,任務執行器定義了數據應該如何被處理(how

  • 創建一個新的項目,在這個項目中,通過輸入端點來設置所需處理的數據來源,通過輸出端點來設置處理后的數據的存放地點,并設置該項目所用到的任務執行器。之后,用戶點擊Submit按鈕,將數據提交到集群上進行處理。處理完成后,在數據處理版本列表中查看結果

  • 技術選型

    Abacuza采用微服務架構風格,每個單獨的微服務都在容器中運行,目前實驗階段采用docker-compose進行容器編排,今后會加入Kubernetes支持。現將Abacuza所使用的框架與相關技術簡單羅列一下:

  • Spark執行程序選擇Microsoft .NET for Spark,一方面自己對.NET技術棧比較熟悉,另一方面,.NET for Spark有著很好的流式數據處理的SDK API,并且可以很方便地整合ML.NET實現機器學習的業務場景

  • 所有的微服務都是使用運行在.NET 5下的ASP.NET Core?Web API實現,每個微服務的后端數據庫采用MongoDB

  • 用于任務調度的Abacuza Job Service微服務使用Quartz.NET實現定期任務調度,用來提交數據處理任務以及更新任務狀態。后端同時采用了PostgreSQL數據庫

  • 存儲層與服務層之間引入Redis做數據緩存,減少MongoDB的查詢負載

  • 默認支持的Spark集群使用Apache Livy為其提供RESTful API接口

  • 文件對象存儲采用MinIO?S3

  • API網關采用Ocelot框架

  • 微服務的瞬態故障處理:Polly框架

  • 身份認證與授權采用ASP.NET Core Identity集成的IdentityServer4解決方案

  • 反向代理:nginx

  • 前端頁面:Angular 12、Angular powered Bootstrap、Bootstrap、AdminLTE

  • 弱弱補一句:本人前端技術沒有后端技術精湛,所以前端頁面會有不少問題,樣式也不是那么的專業美觀,前端高手請忽略這些細節。;)?Abacuza采用了插件化的設計,用戶可以根據需要擴展下面這些組件:

    • 實現自己的數據處理集群以及集群連接:因此你不必拘泥于使用Apache Spark

    • 實現自己的輸入端點輸出端點:因此你可以自定義數據的輸入部分和輸出部分

    • 實現自己的任務執行器:因此你可以選擇不采用基于.NET for Spark的解決方案,你可以自己用Scala或者Python來編寫數據處理程序

    在Abacuza的管理界面中,可以很方便地看到目前系統中已經被加載的插件:??因此,Abacuza數據工廠應該可以滿足絕大部分大數據處理的業務場景。本身整個平臺都是基于.NET開發,并且通過NuGet分發了Abacuza SDK,因此擴展這些組件是非常簡單的,后面的演練部分可以看到詳細介紹。

    部署拓撲

    以下是Abacuza的部署拓撲:?

    ?

    整個部署結構還是比較簡單的:5個主要的微服務由基于Ocelot實現的API Gateway負責代理,Ocelot可以整合IdentityServer4,在Gateway的層面完成用戶的認證(Gateway層面的授權暫未實現)。基于IdentityServer4實現的Identity Service并沒有部署在API Gateway的后端,因為在這個架構中,它的認證授權策略與一般的微服務不同。API Gateway、Identity Service以及基于Angular實現的web app都由nginx反向代理,向外界(客戶端瀏覽器)提供統一的訪問端點。所有的后端服務都運行在docker里,并可以部署在Kubernetes中。

    演練:在Abacuza上運行Word Count程序

    Word Count是Spark官方推薦的第一個案例程序,它的任務是統計輸入文件中每個單詞的出現次數。.NET for Spark也有一個相同的Word Count案例。在此,我仍然使用Word Count案例,介紹如何在Abacuza上運行數據處理程序。

    先決條件

    你需要一臺Windows、MacOS或者Linux的計算機,上面裝有.NET 5 SDK、docker以及docker-compose(如果是Windows或者MacOS,則安裝docker的桌面版),同時確保安裝了git客戶端命令行。

    創建Word Count數據處理程序

    首先使用dotnet命令行創建一個控制臺應用程序,然后添加相關的引用:


    $ dotnet new console -f net5.0 -n WordCountApp

    $ cd WordCountApp

    $ dotnet add package Microsoft.Spark --version 1.0.0

    $ dotnet add package Abacuza.JobRunners.Spark.SDK --prerelease

    然后在項目中新加入一個class文件,實現一個WordCountRunner類:


    using?Abacuza.JobRunners.Spark.SDK;

    using?Microsoft.Spark.Sql;

    ?

    namespace?WordCountApp

    {

    ???public?class?WordCountRunner : SparkRunnerBase

    ???{

    ??????public?WordCountRunner(string[] args) : base(args)

    ??????{

    ??????}

    ?

    ??????protected?override?DataFrame RunInternal(SparkSession sparkSession, DataFrame dataFrame)

    ????????????=> dataFrame

    ???????????????.Select(Functions.Split(Functions.Col("value"), " ").Alias("words"))

    ???????????????.Select(Functions.Explode(Functions.Col("words"))

    ???????????????.Alias("word"))

    ???????????????.GroupBy("word")

    ???????????????.Count()

    ???????????????.OrderBy(Functions.Col("count").Desc());

    ???}

    }

    接下來修改Program.cs文件,在Main函數中調用WordCountRunner:


    static?void?Main(string[] args)

    {

    ???new?WordCountRunner(args).Run();

    }

    然后,在命令行中,WordCountApp.csproj所在的目錄下,使用下面的命令來生成基于Linux x64平臺的編譯輸出:


    $ dotnet publish -c Release -f net5.0 -r linux-x64 -o published

    最后,使用ZIP工具,將published下的所有文件(不包括published目錄本身)全部打包成一個ZIP壓縮包。例如,在Linux下,可以使用下面的命令將published目錄下的所有文件打成一個ZIP包:


    $ zip -rj WordCountApp.zip published/.

    Word Count程序已經寫好了,接下來我們就啟動Abacuza,并在其中運行這個WordCountApp。

    運行Word Count程序

    你可以使用git clone https://github.com/daxnet/abacuza.git命令,將Abacuza源代碼下載到本地,然后在Abacuza的根目錄下,使用下面的命令進行編譯:

    1

    $ docker-compose -f docker-compose.build.yaml build

    編譯成功之后,用文本編輯器編輯template.env文件,在里面設置好本機的IP地址(不能使用localhost或者127.0.0.1,因為在容器環境中,localhost和127.0.0.1表示當前容器本身,而不是運行容器的主機),端口號可以默認:

    然后,使用下面的命令啟動Abacuza:

    1

    $ docker-compose --env-file template.env up

    啟動成功后,可以使用docker ps命令查看正在運行的容器:?

    用瀏覽器訪問http://<你的IP地址>:9320,即可打開Abacuza登錄界面,輸入用戶名super,密碼P@ssw0rd完成登錄,進入Dashboard(目前Dashboard還未完成)。然后在左側菜單中,點擊Cluster Connections,然后點擊右上角的Add Connection按鈕:

    在彈出的對話框中,輸入集群連接的名稱和描述,集群類型選擇spark,在設置欄中,輸入用于連接Spark集群的JSON配置信息。由于我們本地啟動的Spark在容器中,直接使用本機的IP地址即可,如果你的Spark集群部署在其它機器上,也可以使用其它的IP地址。在配置完這些信息后,點擊Save按鈕保存:

    接下來就是創建任務執行器。在Abacuza管理界面,點擊左邊的Job Runners菜單,然后點擊右上角的Add Job Runner按鈕:?

    在彈出的對話框中,輸入任務執行器的名稱和描述信息,集群類型選擇spark,之后當該任務執行器開始執行時,會挑選任意一個類型為spark的集群來處理數據。?

    填入這些基本信息后,點擊Save按鈕,此時會進入任務執行器的詳細頁面,用來進行進一步的設置。在Payload template中,輸入以下JSON文本:

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    {

    ??"file": "${jr:binaries:microsoft-spark-3-0_2.12-1.0.0.jar}",

    ??"className": "org.apache.spark.deploy.dotnet.DotnetRunner",

    ??"args": [

    ????"${jr:binaries:WordCountApp.zip}",

    ????"WordCountApp",

    ????"${proj:input-defs}",

    ????"${proj:output-defs}",

    ????"${proj:context}"

    ??]

    }

    大概介紹一下每個參數:

    • file:指定了在Spark集群上需要運行的程序所在的JAR包,這里直接使用微軟的Spark JAR

    • className:指定了需要運行的程序在JAR包中的名稱,這里固定使用org.apache.spark.deploy.dotnet.DotnetRunner

    • ${jr:binaries:WordCountApp.zip} 表示由className指定的DotnetRunner會調用當前任務執行器中的二進制文件WordCountApp.zip中的程序來執行數據處理任務

    • WordCountApp 為ZIP包中可執行程序的名稱

    • ${proj:input-defs} 表示輸入文件及其配置將引用當前執行數據處理的項目中的輸入端點的定義

    • ${proj:output-defs} 表示輸出文件及其配置將引用當前執行數據處理的項目中的輸出端點的定義

    • ${proj:context} 表示Spark會從當前項目讀入相關信息并將其傳遞給任務執行器

    在上面的配置中,引用了兩個binary文件:microsoft-spark-3-0_2.12-1.0.0.jar和WordCountApp.zip。于是,我們需要將這兩個文件上傳到任務執行器中。仍然在任務執行器的編輯界面,在Binaries列表中,點擊加號按鈕,將這兩個文件附加到任務執行器上。注意:microsoft-spark-3-0_2.12-1.0.0.jar文件位于上文用到的published目錄中,而WordCountApp.zip則是在上文中生成的ZIP壓縮包。

    配置完成后,點擊Save & Close按鈕,保存任務執行器。接下來,創建一個數據處理項目,在左邊的菜單中,點擊Projects,然后在右上角點擊Add Project按鈕:?

    在彈出的Add Project對話框中,輸入項目的名稱、描述,然后選擇輸入端點和輸出端點,以及負責處理該項目數據的任務執行器:

    在此,我們將輸入端點設置為文本文件(Text Files),輸出端點設置為控制臺(Console),也就是直接輸出到日志中。這些配置在后續的項目編輯頁面中也是可以更改的。一個項目可以包含多個輸入端點,但是只能有一個輸出端點。點擊Save按鈕保存設置,此時Abacuza會打開項目的詳細頁,在INPUT選項卡下,添加需要統計單詞出現次數的文本文件:?

    在OUTPUT選項卡下,確認輸出端點設置為Console:

    然后點擊右上角或者右下角的Submit按鈕,提交數據處理任務,此時,選項卡會自動切換到REVISIONS,并且更新Job的狀態:?

    稍等片刻,如果數據處理成功,Job Status會從RUNNING變為COMPLETED:

    點擊Actions欄中的文件按鈕,即可查看數據處理的日志輸出:

    從日志文件中可以看到,Abacuza已經根據我們寫的數據處理程序,統計出輸入文件input.txt中每個單詞的出現次數。通過容器的日志輸出也能看到同樣的信息:?

    總結

    本文介紹了自己純手工打造的數據工廠(Data Factory)的設計與實現,并開發了一個案例來演示該數據工廠完成數據處理的整個過程。之后還有很多功能可以完善:Dashboard、認證授權的優化、用戶與組的管理、第三方IdP的集成、Pipeline的實現等等,今后有空再慢慢弄吧。

    總結

    以上是生活随笔為你收集整理的徒手打造基于Spark的数据工厂(Data Factory):从设计到实现的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。