日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Flink程序加载数据源(3)自定义数据源(2)从Mysql 加载数据源

發(fā)布時間:2024/1/18 数据库 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink程序加载数据源(3)自定义数据源(2)从Mysql 加载数据源 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Flink程序加載數(shù)據(jù)源(3)自定義數(shù)據(jù)源(2)從Mysql 加載數(shù)據(jù)源

? 上文引出了Flink程序自定義數(shù)據(jù)源的方法,我們來再次回顧下。

? Flink還提供了數(shù)據(jù)源接口(抽象類),我們實現(xiàn)該接口(繼承抽象類)就可以實現(xiàn)自定義數(shù)據(jù)源,不同的接口(抽象類)功能的豐富性與范圍不同,分類如下:

? EX:

  • SourceFunction: 非并行數(shù)據(jù)源(并行度只能=1)

  • RichSourceFunction: 多功能非并行數(shù)據(jù)源(并行度只能=1)

  • ParallelSourceFunction: 并行數(shù)據(jù)源(并行度能夠>=1)

  • RichParallelSourceFunction: 多功能并行數(shù)據(jù)源(并行度能夠>=1)

代碼實現(xiàn)

如上所說,實現(xiàn)flink為我們提供的一些數(shù)據(jù)源接口,即能夠實現(xiàn)自定義數(shù)據(jù)源了!

env.addSource(自定義數(shù)據(jù)源類對象);

下邊進行完整示例演示:

① 準備環(huán)境

//準備環(huán)境 env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

② 獲取數(shù)據(jù)源

env.addSource(自定義數(shù)據(jù)源類對象);

③ 從Mysql中獲取數(shù)據(jù)源示例

數(shù)據(jù)對象

@Data @NoArgsConstructor @AllArgsConstructor public static class VehicleAlarm {private String id;private String licensePlate;private String plateColor;private Long deviceTime;private String zone; }

自定義數(shù)據(jù)源類

public static class MysqlSource extends RichParallelSourceFunction<VehicleAlarm> {Connection conn = null;PreparedStatement ps = null;ResultSet result = null;private boolean flag = true;String url = "jdbc:mysql://xxx:3306/alarm-sc?useUnicode=true&characterEncoding=utf-8&useSSL=false";@Overridepublic void open(Configuration parameters) throws Exception {conn = DriverManager.getConnection(url, "root", "root");String sql = "select id,license_plate,plate_color,device_time,`zone` from vehicle_alarm_202103";ps = conn.prepareStatement(sql);super.open(parameters);}@Overridepublic void run(SourceContext<VehicleAlarm> ctx) throws Exception {while (flag) {result = ps.executeQuery();while (result.next()) {String id = result.getString("id");String licensePlate = result.getString("license_plate");String plateColor = result.getString("plate_color");Long deviceTime = result.getLong("device_time");String zone = result.getString("zone");VehicleAlarm vehicleAlarm = new VehicleAlarm(id, licensePlate, plateColor, deviceTime, zone);ctx.collect(vehicleAlarm);}Thread.sleep(2000);}}@Overridepublic void cancel() {flag = false;}@Overridepublic void close() throws Exception {if (conn != null) {conn.close();}if (ps != null) {ps.close();}if (result != null) {result.close();}} }

結果展示:

方法以及特別屬性解釋說明:

  • open():數(shù)據(jù)源最開始打開時執(zhí)行,整個數(shù)據(jù)源從加載到銷毀,只會執(zhí)行一次
  • run(SourceContex):實現(xiàn)數(shù)據(jù)獲取邏輯,并可以通過傳入的參數(shù)ctx進行向下游節(jié)點的數(shù)據(jù)轉發(fā)。
  • SourceContext:source函數(shù)用于發(fā)出元素和可能的watermark的接口,確定以及返回source生成的元素的類型。
  • cancel():用來取消數(shù)據(jù)源,一般在run方法中,會存在一個循環(huán)來持續(xù)產生數(shù)據(jù),cancel方法則可以使該循環(huán)終止。
  • close():數(shù)據(jù)源關閉時執(zhí)行,整個數(shù)據(jù)源從加載到銷毀,只會執(zhí)行一次

Flink完整流程代碼:

//準備環(huán)境 env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(2); //獲取數(shù)據(jù)源 source DataStreamSource<VehicleAlarm> streamSource = env.addSource(new MysqlSource()); //數(shù)據(jù)處理 todo streamSource.print(); //數(shù)據(jù)收集 sink //程序執(zhí)行 execute env.execute("mysql-source");

總結

以上是生活随笔為你收集整理的Flink程序加载数据源(3)自定义数据源(2)从Mysql 加载数据源的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。