日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Flink(十一):流批一体API Source

發(fā)布時(shí)間:2023/11/28 生活经验 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Flink(十一):流批一体API Source 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

目錄

Source

預(yù)定義Source

基于集合的Source

基于文件的Source

???????基于Socket的Source

自定義Source

隨機(jī)生成數(shù)據(jù)

???????MySQL


Source

預(yù)定義Source

基于集合的Source

  • API

一般用于學(xué)習(xí)測(cè)試時(shí)編造數(shù)據(jù)時(shí)使用

1.env.fromElements(可變參數(shù));

2.env.fromColletion(各種集合);

3.env.generateSequence(開(kāi)始,結(jié)束);

4.env.fromSequence(開(kāi)始,結(jié)束);

  • 代碼演示:
package cn.it.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Arrays;/*** Author lanson* Desc* 把本地的普通的Java集合/Scala集合變?yōu)榉植际降腇link的DataStream集合!* 一般用于學(xué)習(xí)測(cè)試時(shí)編造數(shù)據(jù)時(shí)使用* 1.env.fromElements(可變參數(shù));* 2.env.fromColletion(各種集合);* 3.env.generateSequence(開(kāi)始,結(jié)束);* 4.env.fromSequence(開(kāi)始,結(jié)束);*/
public class SourceDemo01 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.source// * 1.env.fromElements(可變參數(shù));DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");// * 2.env.fromColletion(各種集合);DataStream<String> ds2 = env.fromCollection(Arrays.asList("hadoop", "spark", "flink"));// * 3.env.generateSequence(開(kāi)始,結(jié)束);DataStream<Long> ds3 = env.generateSequence(1, 10);//* 4.env.fromSequence(開(kāi)始,結(jié)束);DataStream<Long> ds4 = env.fromSequence(1, 10);//3.Transformation//4.sinkds1.print();ds2.print();ds3.print();ds4.print();//5.executeenv.execute();}
}

???????基于文件的Source

  • API

一般用于學(xué)習(xí)測(cè)試

env.readTextFile(本地/HDFS文件/文件夾);//壓縮文件也可以

  • 代碼演示:
package cn.it.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Author lanson* Desc* 1.env.readTextFile(本地/HDFS文件/文件夾);//壓縮文件也可以*/
public class SourceDemo02 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.source// * 1.env.readTextFile(本地文件/HDFS文件);//壓縮文件也可以DataStream<String> ds1 = env.readTextFile("data/input/words.txt");DataStream<String> ds2 = env.readTextFile("data/input/dir");DataStream<String> ds3 = env.readTextFile("hdfs://node1:8020//wordcount/input/words.txt");DataStream<String> ds4 = env.readTextFile("data/input/wordcount.txt.gz");//3.Transformation//4.sinkds1.print();ds2.print();ds3.print();ds4.print();//5.executeenv.execute();}
}

???????基于Socket的Source

一般用于學(xué)習(xí)測(cè)試

  • 需求

1.在node1上使用nc -lk 9999 向指定端口發(fā)送數(shù)據(jù)

nc是netcat的簡(jiǎn)稱,原本是用來(lái)設(shè)置路由器,我們可以利用它向某個(gè)端口發(fā)送數(shù)據(jù)

如果沒(méi)有該命令可以下安裝

yum install -y nc

2.使用Flink編寫流處理應(yīng)用程序?qū)崟r(shí)統(tǒng)計(jì)單詞數(shù)量

  • 代碼實(shí)現(xiàn):
package cn.it.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** Author lanson* Desc* SocketSource*/
public class SourceDemo03 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.sourceDataStream<String> linesDS = env.socketTextStream("node1", 9999);//3.處理數(shù)據(jù)-transformation//3.1每一行數(shù)據(jù)按照空格切分成一個(gè)個(gè)的單詞組成一個(gè)集合DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {//value就是一行行的數(shù)據(jù)String[] words = value.split(" ");for (String word : words) {out.collect(word);//將切割處理的一個(gè)個(gè)的單詞收集起來(lái)并返回}}});//3.2對(duì)集合中的每個(gè)單詞記為1DataStream<Tuple2<String, Integer>> wordAndOnesDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {//value就是進(jìn)來(lái)一個(gè)個(gè)的單詞return Tuple2.of(value, 1);}});//3.3對(duì)數(shù)據(jù)按照單詞(key)進(jìn)行分組//KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);//3.4對(duì)各個(gè)組內(nèi)的數(shù)據(jù)按照數(shù)量(value)進(jìn)行聚合就是求sumDataStream<Tuple2<String, Integer>> result = groupedDS.sum(1);//4.輸出結(jié)果-sinkresult.print();//5.觸發(fā)執(zhí)行-executeenv.execute();}
}

自定義Source

隨機(jī)生成數(shù)據(jù)

  • API

一般用于學(xué)習(xí)測(cè)試,模擬生成一些數(shù)據(jù)

Flink還提供了數(shù)據(jù)源接口,我們實(shí)現(xiàn)該接口就可以實(shí)現(xiàn)自定義數(shù)據(jù)源,不同的接口有不同的功能,分類如下:

SourceFunction:非并行數(shù)據(jù)源(并行度只能=1)

RichSourceFunction:多功能非并行數(shù)據(jù)源(并行度只能=1)

ParallelSourceFunction:并行數(shù)據(jù)源(并行度能夠>=1)

RichParallelSourceFunction:多功能并行數(shù)據(jù)源(并行度能夠>=1)--后續(xù)學(xué)習(xí)的Kafka數(shù)據(jù)源使用的就是該接口

  • 需求

每隔1秒隨機(jī)生成一條訂單信息(訂單ID、用戶ID、訂單金額、時(shí)間戳)

要求:

- 隨機(jī)生成訂單ID(UUID)

- 隨機(jī)生成用戶ID(0-2)

- 隨機(jī)生成訂單金額(0-100)

- 時(shí)間戳為當(dāng)前系統(tǒng)時(shí)間

  • 代碼實(shí)現(xiàn)
package cn.it.source;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.util.Random;
import java.util.UUID;/*** Author lanson* Desc*需求* 每隔1秒隨機(jī)生成一條訂單信息(訂單ID、用戶ID、訂單金額、時(shí)間戳)* 要求:* - 隨機(jī)生成訂單ID(UUID)* - 隨機(jī)生成用戶ID(0-2)* - 隨機(jī)生成訂單金額(0-100)* - 時(shí)間戳為當(dāng)前系統(tǒng)時(shí)間** API* 一般用于學(xué)習(xí)測(cè)試,模擬生成一些數(shù)據(jù)* Flink還提供了數(shù)據(jù)源接口,我們實(shí)現(xiàn)該接口就可以實(shí)現(xiàn)自定義數(shù)據(jù)源,不同的接口有不同的功能,分類如下:* SourceFunction:非并行數(shù)據(jù)源(并行度只能=1)* RichSourceFunction:多功能非并行數(shù)據(jù)源(并行度只能=1)* ParallelSourceFunction:并行數(shù)據(jù)源(并行度能夠>=1)* RichParallelSourceFunction:多功能并行數(shù)據(jù)源(并行度能夠>=1)--后續(xù)學(xué)習(xí)的Kafka數(shù)據(jù)源使用的就是該接口*/
public class SourceDemo04_Customer {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.SourceDataStream<Order> orderDS = env.addSource(new MyOrderSource()).setParallelism(2);//3.Transformation//4.SinkorderDS.print();//5.executeenv.execute();}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Order {private String id;private Integer userId;private Integer money;private Long createTime;}public static class MyOrderSource extends RichParallelSourceFunction<Order> {private Boolean flag = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (flag){Thread.sleep(1000);String id = UUID.randomUUID().toString();int userId = random.nextInt(3);int money = random.nextInt(101);long createTime = System.currentTimeMillis();ctx.collect(new Order(id,userId,money,createTime));}}//取消任務(wù)/執(zhí)行cancle命令的時(shí)候執(zhí)行@Overridepublic void cancel() {flag = false;}}
}

???????MySQL

  • 需求:

實(shí)際開(kāi)發(fā)中,經(jīng)常會(huì)實(shí)時(shí)接收一些數(shù)據(jù),要和MySQL中存儲(chǔ)的一些規(guī)則進(jìn)行匹配,那么這時(shí)候就可以使用Flink自定義數(shù)據(jù)源從MySQL中讀取數(shù)據(jù)

那么現(xiàn)在先完成一個(gè)簡(jiǎn)單的需求:

從MySQL中實(shí)時(shí)加載數(shù)據(jù)

要求MySQL中的數(shù)據(jù)有變化,也能被實(shí)時(shí)加載出來(lái)

  • 準(zhǔn)備數(shù)據(jù)
CREATE TABLE `t_student` (`id` int(11) NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,`age` int(11) DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;INSERT INTO `t_student` VALUES ('1', 'jack', '18');INSERT INTO `t_student` VALUES ('2', 'tom', '19');INSERT INTO `t_student` VALUES ('3', 'rose', '20');INSERT INTO `t_student` VALUES ('4', 'tom', '19');INSERT INTO `t_student` VALUES ('5', 'jack', '18');INSERT INTO `t_student` VALUES ('6', 'rose', '20');

  • 代碼實(shí)現(xiàn):
package cn.it.source;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.concurrent.TimeUnit;/*** Author lansnon* Desc* 需求:* 實(shí)際開(kāi)發(fā)中,經(jīng)常會(huì)實(shí)時(shí)接收一些數(shù)據(jù),要和MySQL中存儲(chǔ)的一些規(guī)則進(jìn)行匹配,那么這時(shí)候就可以使用Flink自定義數(shù)據(jù)源從MySQL中讀取數(shù)據(jù)* 那么現(xiàn)在先完成一個(gè)簡(jiǎn)單的需求:* 從MySQL中實(shí)時(shí)加載數(shù)據(jù)* 要求MySQL中的數(shù)據(jù)有變化,也能被實(shí)時(shí)加載出來(lái)*/
public class SourceDemo05_Customer_MySQL {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.SourceDataStream<Student> studentDS = env.addSource(new MySQLSource()).setParallelism(1);//3.Transformation//4.SinkstudentDS.print();//5.executeenv.execute();}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Student {private Integer id;private String name;private Integer age;}public static class MySQLSource extends RichParallelSourceFunction<Student> {private Connection conn = null;private PreparedStatement ps = null;@Overridepublic void open(Configuration parameters) throws Exception {//加載驅(qū)動(dòng),開(kāi)啟連接//Class.forName("com.mysql.jdbc.Driver");conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root");String sql = "select id,name,age from t_student";ps = conn.prepareStatement(sql);}private boolean flag = true;@Overridepublic void run(SourceContext<Student> ctx) throws Exception {while (flag) {ResultSet rs = ps.executeQuery();while (rs.next()) {int id = rs.getInt("id");String name = rs.getString("name");int age = rs.getInt("age");ctx.collect(new Student(id, name, age));}TimeUnit.SECONDS.sleep(5);}}@Overridepublic void cancel() {flag = false;}@Overridepublic void close() throws Exception {if (conn != null) conn.close();if (ps != null) ps.close();}}
}

總結(jié)

以上是生活随笔為你收集整理的2021年大数据Flink(十一):流批一体API Source的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。