Apache Flink 零基础入门(十四)Flink 分布式缓存
生活随笔
收集整理的這篇文章主要介紹了
Apache Flink 零基础入门(十四)Flink 分布式缓存
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
Apache Flink 提供了一個分布式緩存,類似于Hadoop,用戶可以并行獲取數(shù)據(jù)。
通過注冊一個文件或者文件夾到本地或者遠程HDFS等,在getExecutionEnvironment中指定一個名字就可以。當應用程序執(zhí)行時,Flink會自動拷貝這個文件或者文件夾到所有worker進程中。用戶的Function通過指定的名字可以查找這個文件或者文件夾中的內(nèi)容。
Scala
def main(args: Array[String]): Unit = {val environment = ExecutionEnvironment.getExecutionEnvironmentval filePath = "E:/test/hello.txt"// step1: 注冊一個本地文件environment.registerCachedFile(filePath, "pk-scala-dc")val data = environment.fromElements("hadoop", "spark", "flink", "pyspark")val info=data.map(new RichMapFunction[String, String] {//step2: 在open方法中獲取到分布式緩存的內(nèi)容即可override def open(parameters: Configuration): Unit = {val dcfile = getRuntimeContext.getDistributedCache.getFile("pk-scala-dc")val lines = FileUtils.readLines(dcfile)import scala.collection.JavaConverters._for(ele <- lines.asScala){println(ele)}}override def map(value: String): String = {value}})info.writeAsText("E:/test3", WriteMode.OVERWRITE).setParallelism(4)environment.execute("DistributedCacheApp")}Java
public class JavaDistributedCachedApp {public static void main(String[] args) throws Exception {ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();executionEnvironment.registerCachedFile("E:/test/hello.txt", "pk-java-dc");DataSource<String> data1 = executionEnvironment.fromElements("hadoop", "spark", "flink", "pyspark");data1.map(new RichMapFunction<String, String>() {List<String> list = new ArrayList<>();@Overridepublic void open(Configuration parameters) throws Exception {File file = getRuntimeContext().getDistributedCache().getFile("pk-java-dc");List<String> lines = FileUtils.readLines(file);for (String line : lines) {list.add(line);System.out.println(list);}}@Overridepublic String map(String value) throws Exception {return value;}}).print();} }?
總結(jié)
以上是生活随笔為你收集整理的Apache Flink 零基础入门(十四)Flink 分布式缓存的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache Flink 零基础入门(十
- 下一篇: 业界流处理框架对比