日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Flink-Java版单词计数(批处理流处理)

發布時間:2024/7/5 java 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink-Java版单词计数(批处理流处理) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

創建工程

pom.xml文件依賴如下:

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><!--依賴的一些組件需要 Scala 環境--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency></dependencies>

定義輸入源

  • resources 目錄下創建 hello.txt 作為輸入源, 內容單詞自定義.
  • 在 Linux 環境下使用 nc -lk 端口模擬網絡流式輸入.
  • 編寫代碼

    批處理方式 WordCount

    import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector;import java.io.InputStream;/*** {@link DataSet} 批處理 api, 處理離線數據* @author regotto*/ public class WordCount {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> dataSource = env.readTextFile("D:\\CodeRepository\\flink-study\\src\\main\\resources\\hello.txt");// 將單詞按照空格分割, 變為 (word, 1) 形式的二元組DataSet<Tuple2<String, Integer>> resultSet = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split("\\W+");for (String s : words) {out.collect(new Tuple2<String, Integer>(s, 1));}}// 按照 tuple2 index = 0 進行分組, 按照 index = 1 進行求和}).groupBy(0).sum(1);resultSet.print();} }

    流式 WordCount

    import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;/*** {@link DataStream} 流式api, 處理實時數據* @author regotto*/ public class StreamWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 從文件中獲取輸入流 // DataStream<String> source = env.readTextFile("D:\\CodeRepository\\flink-study\\src\\main\\resources\\hello.txt");// 從 socket 文本流讀取數據, 模擬 flink 從 kafka獲取數據DataStreamSource<String> source = env.socketTextStream("localhost", 7777);DataStream<Tuple2<String, Integer>> resultStream = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split("\\W+");for (String s : words) {out.collect(new Tuple2<String, Integer>(s, 1));}}// 按照 tuple2 index = 0 進行分組, 按照 index = 1 進行求和}).keyBy(0).sum(1);resultStream.print();env.execute();} }

    使用 nc -lk監聽7777端口, 模擬網絡流式輸入.


    執行結果如下:

    結論

    批處理: 將所有文本處理完, 才統計輸出.
    流式: 在開發環境中, 每讀取一行文本就計數一次, 進行統計輸出.

    總結

    以上是生活随笔為你收集整理的Flink-Java版单词计数(批处理流处理)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。