日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Apache Flink 零基础入门(十四)Flink 分布式缓存

發(fā)布時間:2024/9/16 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Flink 零基础入门(十四)Flink 分布式缓存 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Apache Flink 提供了一個分布式緩存,類似于Hadoop,用戶可以并行獲取數(shù)據(jù)。

通過注冊一個文件或者文件夾到本地或者遠程HDFS等,在getExecutionEnvironment中指定一個名字就可以。當應用程序執(zhí)行時,Flink會自動拷貝這個文件或者文件夾到所有worker進程中。用戶的Function通過指定的名字可以查找這個文件或者文件夾中的內(nèi)容。

Scala

def main(args: Array[String]): Unit = {val environment = ExecutionEnvironment.getExecutionEnvironmentval filePath = "E:/test/hello.txt"// step1: 注冊一個本地文件environment.registerCachedFile(filePath, "pk-scala-dc")val data = environment.fromElements("hadoop", "spark", "flink", "pyspark")val info=data.map(new RichMapFunction[String, String] {//step2: 在open方法中獲取到分布式緩存的內(nèi)容即可override def open(parameters: Configuration): Unit = {val dcfile = getRuntimeContext.getDistributedCache.getFile("pk-scala-dc")val lines = FileUtils.readLines(dcfile)import scala.collection.JavaConverters._for(ele <- lines.asScala){println(ele)}}override def map(value: String): String = {value}})info.writeAsText("E:/test3", WriteMode.OVERWRITE).setParallelism(4)environment.execute("DistributedCacheApp")}

Java

public class JavaDistributedCachedApp {public static void main(String[] args) throws Exception {ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();executionEnvironment.registerCachedFile("E:/test/hello.txt", "pk-java-dc");DataSource<String> data1 = executionEnvironment.fromElements("hadoop", "spark", "flink", "pyspark");data1.map(new RichMapFunction<String, String>() {List<String> list = new ArrayList<>();@Overridepublic void open(Configuration parameters) throws Exception {File file = getRuntimeContext().getDistributedCache().getFile("pk-java-dc");List<String> lines = FileUtils.readLines(file);for (String line : lines) {list.add(line);System.out.println(list);}}@Overridepublic String map(String value) throws Exception {return value;}}).print();} }

?

總結(jié)

以上是生活随笔為你收集整理的Apache Flink 零基础入门(十四)Flink 分布式缓存的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。