日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

apache beam入门之初次使用

發(fā)布時間:2024/1/18 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 apache beam入门之初次使用 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

beam入門寶典之初次使用

咱們不多廢話,先直接來如何簡單使用beam框架。
這里我不使用常見的wordCount做例子,而是一個大寫轉(zhuǎn)小寫的例子,語言選用java語言
這個例子里我們會初步學(xué)到:

  • 如何建立管道
  • 如何手動生成數(shù)據(jù)
  • 如何轉(zhuǎn)換
  • 如何查看輸出
  • 首先我們要新建1個maven工程,然后在pom.xml中加入如下依賴:

    <dependency><groupId>org.apache.beam</groupId><artifactId>beam-runners-direct-java</artifactId><version>${beam.version}</version> </dependency>

    beam.version版本選擇beam官網(wǎng)上最新,筆者編寫此文時使用的版本是2.13.0

    接著我們新建1個HowToCreateAndShowData類,然后開始例子

    建立管道

    任何beam程序,都需要先建立1個管道選項option,再建立1個初始管道

    // 建立選項 PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); // 建立管道 Pipeline pipeline = Pipeline.create(pipelineOptions);

    關(guān)于選項option和pipeline的更多用法,后面的章節(jié)會繼續(xù)介紹

    手動生成數(shù)據(jù)

    我們有了pipeline之后,就要往里面塞入數(shù)據(jù)
    beam里提供了手動輸入數(shù)據(jù)的方式,如下:

    // 生成初始的輸入數(shù)據(jù) // 相當(dāng)于往管道里塞入了3個自己寫的字符串元素 PCollection<String> pcStart = pipeline.apply( Create.of( "HELLO!", "THIS IS BEAM DEMO!", "HAPPY STUDY!"));

    我們調(diào)用pipeline的apply方法來輸入1個Create對象,里面的元素就是我們的輸入元素
    并且返回1個PCollection的對象,我們稱之為數(shù)據(jù)集。
    <String>指的是數(shù)據(jù)集中數(shù)據(jù)的類型

    如何轉(zhuǎn)換

    要實現(xiàn)轉(zhuǎn)換,需要先編寫1個DoFn的子類,并實現(xiàn)processElement方法,代碼和講解如下:

    // 把字符串轉(zhuǎn)成小寫的轉(zhuǎn)換方法類 // DoFn<String,String>中的第一個String是輸入的類型,第二個String是輸出的類型 static class StrToLowerCaseFn extends DoFn<String, String> { /** * processElement,過程元素處理方法,類似于spark、mr中的map操作 * 必須加上@ProcessElement注解,并實現(xiàn)processElement方法 * @param context */ @ProcessElement public void processElement(ProcessContext context) {// 從管道中取出的1個元素String inputStr = context.element();// 轉(zhuǎn)成大寫String outputStr = inputStr.toLowerCase();// 輸出結(jié)果context.output(outputStr);} }

    接著將這個計算方法,用數(shù)據(jù)集.apply(ParDo.of(計算類))的方式組裝到剛才的pcStart中

    // 組裝小寫轉(zhuǎn)換 PCollection<String> pcMid = pcStart.apply(ParDo.of(new StrToLowerCaseFn()));

    如何輸出

    輸出的話,我們可以按照上面的方法再編寫1個DoFn子類,用于將數(shù)據(jù)集中輸入的元素打印到控制臺

    // 打印結(jié)果方法類 // 因為不需要再往下輸出,所以 static class PrintStrFn extends DoFn<String, Void> { /** * processElement,過程元素處理方法,類似于spark、mr中的map操作 * 必須加上@ProcessElement注解,并實現(xiàn)processElement方法 * @param context */ @ProcessElement public void processElement(ProcessContext context) {// 從管道中取出的1個元素String inputStr = context.element();// 輸出System.out.println(inputStr);} }

    然后組裝

    // 組裝輸出操作 pcMid.apply(ParDo.of(new PrintStrFn()));

    運行

    剛才的3次apply結(jié)束后,其實轉(zhuǎn)換都還沒有開始,僅僅只是組裝計算拓?fù)涞?個流程。
    真正開始計算需要調(diào)用下面的代碼:

    // 運行結(jié)果 pipeline.run().waitUntilFinish();

    執(zhí)行main方法,輸出如下結(jié)果:
    image.png

    完整代碼

    /*** The howToCreateAndShowData** */ public class HowToCreateAndShowData {public static void main(String[] args) {PipelineOptions pipelineOptions = PipelineOptionsFactory.create();Pipeline pipeline = Pipeline.create(pipelineOptions);// 生成初始的輸入數(shù)據(jù)// 相當(dāng)于往管道里塞入了3個自己寫的字符串元素PCollection<String> pcStart = pipeline.apply(Create.of("HELLO!","THIS IS BEAM DEMO!","HAPPY STUDY!"));// 組裝小寫轉(zhuǎn)換PCollection<String> pcMid = pcStart.apply(ParDo.of(new StrToLowerCaseFn()));// 組裝輸出操作pcMid.apply(ParDo.of(new PrintStrFn()));// 運行結(jié)果pipeline.run().waitUntilFinish();}// 把字符串轉(zhuǎn)成小寫的轉(zhuǎn)換方法類// DoFn<String,String>中的第一個String是輸入的類型,第二個String是輸出的類型static class StrToLowerCaseFn extends DoFn<String, String> {/*** processElement,過程元素處理方法,類似于spark、mr中的map操作* 必須加上@ProcessElement注解,并實現(xiàn)processElement方法** @param context*/@ProcessElementpublic void processElement(ProcessContext context) {// 從管道中取出的1個元素String inputStr = context.element();// 轉(zhuǎn)成大寫String outputStr = inputStr.toLowerCase();// 輸出結(jié)果context.output(outputStr);}}// 打印結(jié)果方法類// 因為不需要再往下輸出,所以static class PrintStrFn extends DoFn<String, Void> {/*** processElement,過程元素處理方法,類似于spark、mr中的map操作* 必須加上@ProcessElement注解,并實現(xiàn)processElement方法** @param context*/@ProcessElementpublic void processElement(ProcessContext context) {// 從管道中取出的1個元素String inputStr = context.element();// 輸出System.out.println(inputStr);}} }

    總結(jié)

    以上是生活随笔為你收集整理的apache beam入门之初次使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。