日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Apache Flink 零基础入门(十二)Flink sink

發布時間:2024/9/16 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Flink 零基础入门(十二)Flink sink 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

將DataSet中的數據Sink到哪里去。使用的是對應的OutPutFormat,也可以使用自定義的sink,有可能寫到hbase中,hdfs中。

  • writeAsText() / TextOutputFormat ,以String的形式寫入
  • writeAsCsv(...) / CsvOutputFormat,以CSV的方式寫進去

  • print() / printToErr() / print(String msg) / printToErr(String msg)以標準輸出

?writeAsText

object DataSetSinkApp {def main(args: Array[String]): Unit = {val environment = ExecutionEnvironment.getExecutionEnvironmentval data = 1.to(10)val text = environment.fromCollection(data)val filePath = "E:/test"text.writeAsText(filePath)environment.execute("DataSetSinkApp")} }

如果E:/test文件或者文件夾存在,將無法執行成功。除非增加一個WriteMode.OVERWRITE

text.writeAsText(filePath, WriteMode.OVERWRITE)

這樣就在E盤下新建了一個test文件,內容是1到10。

那么如何保存到文件夾中?

text.writeAsText(filePath, WriteMode.OVERWRITE).setParallelism(2)

設置并行度為2,這樣就存到test文件夾下,兩個文件1和2

默認情況下,不設置并行度,會把結果寫到一個文件中,如果設置并行度,那么每一個并行度都對應一個輸出。

Java

public static void main(String[] args) throws Exception {ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();List<Integer> info = new ArrayList<>();for(int i = 1;i <=10; i++) {info.add(i);}DataSource<Integer> data1 = executionEnvironment.fromCollection(info);String filePath = "E:/test2";data1.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE);executionEnvironment.execute("JavaDataSetSinkApp");}

?

總結

以上是生活随笔為你收集整理的Apache Flink 零基础入门(十二)Flink sink的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。