Flink-Java版单词计数(批处理流处理)
生活随笔
收集整理的這篇文章主要介紹了
Flink-Java版单词计数(批处理流处理)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
創建工程
pom.xml文件依賴如下:
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><!--依賴的一些組件需要 Scala 環境--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency></dependencies>定義輸入源
編寫代碼
批處理方式 WordCount
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector;import java.io.InputStream;/*** {@link DataSet} 批處理 api, 處理離線數據* @author regotto*/ public class WordCount {public static void main(String[] args) throws Exception {ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();DataSource<String> dataSource = env.readTextFile("D:\\CodeRepository\\flink-study\\src\\main\\resources\\hello.txt");// 將單詞按照空格分割, 變為 (word, 1) 形式的二元組DataSet<Tuple2<String, Integer>> resultSet = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split("\\W+");for (String s : words) {out.collect(new Tuple2<String, Integer>(s, 1));}}// 按照 tuple2 index = 0 進行分組, 按照 index = 1 進行求和}).groupBy(0).sum(1);resultSet.print();} }流式 WordCount
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;/*** {@link DataStream} 流式api, 處理實時數據* @author regotto*/ public class StreamWordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 從文件中獲取輸入流 // DataStream<String> source = env.readTextFile("D:\\CodeRepository\\flink-study\\src\\main\\resources\\hello.txt");// 從 socket 文本流讀取數據, 模擬 flink 從 kafka獲取數據DataStreamSource<String> source = env.socketTextStream("localhost", 7777);DataStream<Tuple2<String, Integer>> resultStream = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split("\\W+");for (String s : words) {out.collect(new Tuple2<String, Integer>(s, 1));}}// 按照 tuple2 index = 0 進行分組, 按照 index = 1 進行求和}).keyBy(0).sum(1);resultStream.print();env.execute();} }使用 nc -lk監聽7777端口, 模擬網絡流式輸入.
執行結果如下:
結論
批處理: 將所有文本處理完, 才統計輸出.
流式: 在開發環境中, 每讀取一行文本就計數一次, 進行統計輸出.
總結
以上是生活随笔為你收集整理的Flink-Java版单词计数(批处理流处理)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: uniapp封装网络请求_八张图带你走进
- 下一篇: 复习Java_List_Set_Hash