import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;/*** window** Created by xxxx on 2020/10/09 .*/publicclassSocketDemoFullCount{publicstaticvoidmain(String[] args)throws Exception{//獲取需要的端口號int port;try{ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch(Exception e){System.err.println("No port set. use default port 9000--java");port =9000;}//獲取flink的運行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String hostname ="hadoop100";String delimiter ="\n";//連接socket獲取輸入的數據DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);DataStream<Tuple2<Integer,Integer>> intData = text.map(newMapFunction<String, Tuple2<Integer,Integer>>(){@Overridepublic Tuple2<Integer,Integer>map(String value)throws Exception {returnnewTuple2<>(1,Integer.parseInt(value));}});intData.keyBy(0).timeWindow(Time.seconds(5)).process(newProcessWindowFunction<Tuple2<Integer,Integer>, String, Tuple, TimeWindow>(){@Overridepublicvoidprocess(Tuple key, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<String> out)throws Exception {System.out.println("執(zhí)行process。。。");long count =0;for(Tuple2<Integer,Integer> element: elements){count++;}out.collect("window:"+context.window()+",count:"+count);}}).print();//這一行代碼一定要實現,否則程序不執(zhí)行env.execute("Socket window count");}}import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;/*** window** Created by xxxx on 2020/10/09 .*/publicclassSocketDemoIncrAgg{publicstaticvoidmain(String[] args)throws Exception{//獲取需要的端口號int port;try{ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch(Exception e){System.err.println("No port set. use default port 9000--java");port =9000;}//獲取flink的運行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String hostname ="hadoop100";String delimiter ="\n";//連接socket獲取輸入的數據DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);DataStream<Tuple2<Integer,Integer>> intData = text.map(newMapFunction<String, Tuple2<Integer,Integer>>(){@Overridepublic Tuple2<Integer,Integer>map(String value)throws Exception {returnnewTuple2<>(1,Integer.parseInt(value));}});intData.keyBy(0).timeWindow(Time.seconds(5)).reduce(newReduceFunction<Tuple2<Integer, Integer>>(){@Overridepublic Tuple2<Integer, Integer>reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2)throws Exception {System.out.println("執(zhí)行reduce操作:"+value1+","+value2);returnnewTuple2<>(value1.f0,value1.f1+value2.f1);}}).print();//這一行代碼一定要實現,否則程序不執(zhí)行env.execute("Socket window count");}}import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;/*** 滑動窗口計算** 通過socket模擬產生單詞數據* flink對數據進行統(tǒng)計計算** 需要實現每隔1秒對最近2秒內的數據進行匯總計算*** Created by xxxx on 2020/10/09 .*/publicclassSocketWindowWordCountJava{publicstaticvoidmain(String[] args)throws Exception{//獲取需要的端口號int port;try{ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch(Exception e){System.err.println("No port set. use default port 9000--java");port =9000;}//獲取flink的運行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String hostname ="hadoop100";String delimiter ="\n";//連接socket獲取輸入的數據DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);// a a c// a 1// a 1// c 1DataStream<WordWithCount> windowCounts = text.flatMap(newFlatMapFunction<String, WordWithCount>(){publicvoidflatMap(String value, Collector<WordWithCount> out)throws Exception {String[] splits = value.split("\\s");for(String word : splits){out.collect(newWordWithCount(word,1L));}}}).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1))//指定時間窗口大小為2秒,指定時間間隔為1秒.sum("count");//在這里使用sum或者reduce都可以/*.reduce(new ReduceFunction<WordWithCount>() {public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {return new WordWithCount(a.word,a.count+b.count);}})*///把數據打印到控制臺并且設置并行度windowCounts.print().setParallelism(1);//這一行代碼一定要實現,否則程序不執(zhí)行env.execute("Socket window count");}publicstaticclassWordWithCount{public String word;publiclong count;publicWordWithCount(){}publicWordWithCount(String word,long count){this.word = word;this.count = count;}@Overridepublic String toString(){return"WordWithCount{"+"word='"+ word +'\''+", count="+ count +'}';}}}import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;/*** checkpoint** Created by xxxx on 2020/10/09 .*/publicclassSocketWindowWordCountJavaCheckPoint{publicstaticvoidmain(String[] args)throws Exception{//獲取需要的端口號int port;try{ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch(Exception e){System.err.println("No port set. use default port 9000--java");port =9000;}//獲取flink的運行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每隔1000 ms進行啟動一個檢查點【設置checkpoint的周期】env.enableCheckpointing(1000);// 高級選項:// 設置模式為exactly-once (這是默認值)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// 檢查點必須在一分鐘內完成,或者被丟棄【checkpoint的超時時間】env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一時間只允許進行一個檢查點env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備注】//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel后,會刪除Checkpoint數據,只有job執(zhí)行失敗的時候才會保存checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//設置statebackend//env.setStateBackend(new MemoryStateBackend());//env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints"));//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));String hostname ="hadoop100";String delimiter ="\n";//連接socket獲取輸入的數據DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);// a a c// a 1// a 1// c 1DataStream<WordWithCount> windowCounts = text.flatMap(newFlatMapFunction<String, WordWithCount>(){publicvoidflatMap(String value, Collector<WordWithCount> out)throws Exception {String[] splits = value.split("\\s");for(String word : splits){out.collect(newWordWithCount(word,1L));}}}).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1))//指定時間窗口大小為2秒,指定時間間隔為1秒.sum("count");//在這里使用sum或者reduce都可以/*.reduce(new ReduceFunction<WordWithCount>() {public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {return new WordWithCount(a.word,a.count+b.count);}})*///把數據打印到控制臺并且設置并行度windowCounts.print().setParallelism(1);//這一行代碼一定要實現,否則程序不執(zhí)行env.execute("Socket window count");}publicstaticclassWordWithCount{public String word;publiclong count;publicWordWithCount(){}publicWordWithCount(String word,long count){this.word = word;this.count = count;}@Overridepublic String toString(){return"WordWithCount{"+"word='"+ word +'\''+", count="+ count +'}';}}}import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;/*** 把collection集合作為數據源** Created by xxxx on 2020/10/09 on 2018/10/23.*/publicclassStreamingFromCollection{publicstaticvoidmain(String[] args)throws Exception {//獲取Flink的運行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ArrayList<Integer> data =newArrayList<>();data.add(10);data.add(15);data.add(20);//指定數據源DataStreamSource<Integer> collectionData = env.fromCollection(data);//通map對數據進行處理DataStream<Integer> num = collectionData.map(newMapFunction<Integer, Integer>(){@Overridepublic Integer map(Integer value)throws Exception {return value +1;}});//直接打印num.print().setParallelism(1);env.execute("StreamingFromCollection");}}
另外的Scala案例:
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time/*** 滑動窗口計算** 每隔1秒統(tǒng)計最近2秒內的數據,打印到控制臺** Created by xxxx on 2020/10/09 .*/
object SocketWindowWordCountScala {def main(args: Array[String]): Unit ={//獲取socket端口號val port: Int =try{ParameterTool.fromArgs(args).getInt("port")}catch{case e: Exception =>{System.err.println("No port set. use default port 9000--scala")}9000}//獲取運行環(huán)境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//鏈接socket獲取輸入數據val text = env.socketTextStream("hadoop100",port,'\n')//解析數據(把數據打平),分組,窗口計算,并且聚合求sum//注意:必須要添加這一行隱式轉行,否則下面的flatmap方法執(zhí)行會報錯import org.apache.flink.api.scala._val windowCounts = text.flatMap(line => line.split("\\s"))//打平,把每一行單詞都切開.map(w =>WordWithCount(w,1))//把單詞轉成word , 1這種形式.keyBy("word")//分組.timeWindow(Time.seconds(2),Time.seconds(1))//指定窗口大小,指定間隔時間.sum("count");// sum或者reduce都可以//.reduce((a,b)=>WordWithCount(a.word,a.count+b.count))//打印到控制臺windowCounts.print().setParallelism(1);//執(zhí)行任務env.execute("Socket window count");}caseclassWordWithCount(word: String,count: Long)}import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/
object StreamingFromCollectionScala {def main(args: Array[String]): Unit ={val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉換import org.apache.flink.api.scala._val data =List(10,15,20)val text = env.fromCollection(data)//針對map接收到的數據執(zhí)行加1的操作val num = text.map(_+1)num.print().setParallelism(1)env.execute("StreamingFromCollectionScala")}}