日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

从flink-example分析flink组件(3)WordCount 流式实战及源码分析

發(fā)布時間:2025/4/5 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 从flink-example分析flink组件(3)WordCount 流式实战及源码分析 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

前面介紹了批量處理的WorkCount是如何執(zhí)行的

<從flink-example分析flink組件(1)WordCount batch實戰(zhàn)及源碼分析>

<從flink-example分析flink組件(2)WordCount batch實戰(zhàn)及源碼分析----flink如何在本地執(zhí)行的?>

這篇從WordCount的流式處理開始

/*** Implements the "WordCount" program that computes a simple word occurrence* histogram over text files in a streaming fashion.** <p>The input is a plain text file with lines separated by newline characters.** <p>Usage: <code>WordCount --input &lt;path&gt; --output &lt;path&gt;</code><br>* If no parameters are provided, the program is run with default data from* {@link WordCountData}.** <p>This example shows how to:* <ul>* <li>write a simple Flink Streaming program,* <li>use tuple data types,* <li>write and use user-defined functions.* </ul>*/ public class WordCount {// *************************************************************************// PROGRAM// *************************************************************************public static void main(String[] args) throws Exception {// Checking input parametersfinal ParameterTool params = ParameterTool.fromArgs(args);// set up the execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// make parameters available in the web interface env.getConfig().setGlobalJobParameters(params);// get input dataDataStream<String> text;if (params.has("input")) {// read the text file from given input pathtext = env.readTextFile(params.get("input"));} else {System.out.println("Executing WordCount example with default input data set.");System.out.println("Use --input to specify file input.");// get default test text datatext = env.fromElements(WordCountData.WORDS);} DataStream<Tuple2<String, Integer>> counts =// split up the lines in pairs (2-tuples) containing: (word,1)text.flatMap(new Tokenizer())// group by the tuple field "0" and sum up tuple field "1".keyBy(0).sum(1); //1// emit resultif (params.has("output")) {counts.writeAsText(params.get("output"));} else {System.out.println("Printing result to stdout. Use --output to specify output path.");counts.print();}// execute program env.execute("Streaming WordCount");//2}// *************************************************************************// USER FUNCTIONS// *************************************************************************/*** Implements the string tokenizer that splits sentences into words as a* user-defined FlatMapFunction. The function takes a line (String) and* splits it into multiple pairs in the form of "(word,1)" ({@code Tuple2<String,* Integer>}).*/public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {// normalize and split the lineString[] tokens = value.toLowerCase().split("\\W+");// emit the pairsfor (String token : tokens) {if (token.length() > 0) {out.collect(new Tuple2<>(token, 1));}}}}}

整個執(zhí)行流程如下圖所示:

?

?第1~4步:main方法讀取文件,增加算子

private <OUT> DataStreamSource<OUT> createFileInput(FileInputFormat<OUT> inputFormat,TypeInformation<OUT> typeInfo,String sourceName,FileProcessingMode monitoringMode,long interval) {Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");Preconditions.checkArgument(monitoringMode.equals(FileProcessingMode.PROCESS_ONCE) ||interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,"The path monitoring interval cannot be less than " +ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL + " ms.");ContinuousFileMonitoringFunction<OUT> monitoringFunction =new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval);ContinuousFileReaderOperator<OUT> reader =new ContinuousFileReaderOperator<>(inputFormat); SingleOutputStreamOperator<OUT> source = addSource(monitoringFunction, sourceName).transform("Split Reader: " + sourceName, typeInfo, reader); //1return new DataStreamSource<>(source);}

增加算子的方法,當調(diào)用execute方法時,此時增加的算子會被執(zhí)行。

/*** Adds an operator to the list of operators that should be executed when calling* {@link #execute}.** <p>When calling {@link #execute()} only the operators that where previously added to the list* are executed.** <p>This is not meant to be used by users. The API methods that create operators must call* this method.*/@Internalpublic void addOperator(StreamTransformation<?> transformation) {Preconditions.checkNotNull(transformation, "transformation must not be null.");this.transformations.add(transformation);}

第5步:產(chǎn)生StreamGraph,從而可以得到JobGraph,即將Stream程序轉換成JobGraph

// transform the streaming program into a JobGraphStreamGraph streamGraph = getStreamGraph();streamGraph.setJobName(jobName);JobGraph jobGraph = streamGraph.getJobGraph();jobGraph.setAllowQueuedScheduling(true);

第6~8步啟動MiniCluster,為執(zhí)行job做準備

/*** Starts the mini cluster, based on the configured properties.** @throws Exception This method passes on any exception that occurs during the startup of* the mini cluster.*/public void start() throws Exception {synchronized (lock) {checkState(!running, "MiniCluster is already running");LOG.info("Starting Flink Mini Cluster");LOG.debug("Using configuration {}", miniClusterConfiguration);final Configuration configuration = miniClusterConfiguration.getConfiguration();final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;try {initializeIOFormatClasses(configuration);LOG.info("Starting Metrics Registry");metricRegistry = createMetricRegistry(configuration);// bring up all the RPC servicesLOG.info("Starting RPC Service(s)");AkkaRpcServiceConfiguration akkaRpcServiceConfig = AkkaRpcServiceConfiguration.fromConfiguration(configuration);final RpcServiceFactory dispatcherResourceManagreComponentRpcServiceFactory;if (useSingleRpcService) {// we always need the 'commonRpcService' for auxiliary callscommonRpcService = createRpcService(akkaRpcServiceConfig, false, null);final CommonRpcServiceFactory commonRpcServiceFactory = new CommonRpcServiceFactory(commonRpcService);taskManagerRpcServiceFactory = commonRpcServiceFactory;dispatcherResourceManagreComponentRpcServiceFactory = commonRpcServiceFactory;} else {// we always need the 'commonRpcService' for auxiliary callscommonRpcService = createRpcService(akkaRpcServiceConfig, true, null);// start a new service per component, possibly with custom bind addressesfinal String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();dispatcherResourceManagreComponentRpcServiceFactory = new DedicatedRpcServiceFactory(akkaRpcServiceConfig, jobManagerBindAddress);taskManagerRpcServiceFactory = new DedicatedRpcServiceFactory(akkaRpcServiceConfig, taskManagerBindAddress);}RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(configuration,commonRpcService.getAddress());metricRegistry.startQueryService(metricQueryServiceRpcService, null);ioExecutor = Executors.newFixedThreadPool(Hardware.getNumberCPUCores(),new ExecutorThreadFactory("mini-cluster-io"));haServices = createHighAvailabilityServices(configuration, ioExecutor);blobServer = new BlobServer(configuration, haServices.createBlobStore());blobServer.start();heartbeatServices = HeartbeatServices.fromConfiguration(configuration);blobCacheService = new BlobCacheService(configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort()));startTaskManagers();MetricQueryServiceRetriever metricQueryServiceRetriever = new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService());dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents(configuration,dispatcherResourceManagreComponentRpcServiceFactory,haServices,blobServer,heartbeatServices,metricRegistry,metricQueryServiceRetriever,new ShutDownFatalErrorHandler()));resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();webMonitorLeaderRetrievalService = haServices.getWebMonitorLeaderRetriever();dispatcherGatewayRetriever = new RpcGatewayRetriever<>(commonRpcService,DispatcherGateway.class,DispatcherId::fromUuid,20,Time.milliseconds(20L));resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(commonRpcService,ResourceManagerGateway.class,ResourceManagerId::fromUuid,20,Time.milliseconds(20L));webMonitorLeaderRetriever = new LeaderRetriever();resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);webMonitorLeaderRetrievalService.start(webMonitorLeaderRetriever);}catch (Exception e) {// cleanup everythingtry {close();} catch (Exception ee) {e.addSuppressed(ee);}throw e;}// create a new termination futureterminationFuture = new CompletableFuture<>();// now officially mark this as runningrunning = true;LOG.info("Flink Mini Cluster started successfully");}}

第9~12步 執(zhí)行job

/*** This method runs a job in blocking mode. The method returns only after the job* completed successfully, or after it failed terminally.** @param job The Flink job to execute* @return The result of the job execution** @throws JobExecutionException Thrown if anything went amiss during initial job launch,* or if the job terminally failed.*/@Overridepublic JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {checkNotNull(job, "job is null");final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose((JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));final JobResult jobResult;try {jobResult = jobResultFuture.get();} catch (ExecutionException e) {throw new JobExecutionException(job.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e));}try {return jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(job.getJobID(), e);}}

先上傳jar包文件,此時需要DispatcherGateway來執(zhí)行上轉任務,異步等待結果執(zhí)行完畢

總結:

batch和stream的執(zhí)行流程很相似,又有不同。

不同:Stream傳遞的是DataStream,Batch傳遞的是DataSet

相同:都轉換成JobGraph執(zhí)行

?

轉載于:https://www.cnblogs.com/davidwang456/p/11015594.html

總結

以上是生活随笔為你收集整理的从flink-example分析flink组件(3)WordCount 流式实战及源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 神马午夜我不卡 | 日韩精品一二三四区 | 67194成人 | 日本久操| 日韩福利网 | 日韩在线免费播放 | 久久青青草视频 | h亚洲| 国产又爽又黄的视频 | 免费无码毛片一区二三区 | 91麻豆成人| 男女性高潮免费网站 | gogo亚洲国模私拍人体 | 麻豆系列在线观看 | 国产成人黄色 | 欧美激情欧美激情在线五月 | 91精品久久久久久久99蜜桃 | 国产精品伦一区二区三级古装电影 | 7777久久亚洲中文字幕 | 国产最新精品 | 手机av免费看 | 欧美成人做爰大片免费看黄石 | 国产一区免费 | 六月色婷婷 | 国产欧美熟妇另类久久久 | 婷婷一区二区三区四区 | 色欲av伊人久久大香线蕉影院 | 久久久丁香 | 秘密的基地 | 欧美一区二区三区在线看 | 中国美女毛片 | 91性高潮久久久久久久久 | 三级在线观看网站 | 亚洲aaaa级特黄毛片 | 日本美女交配 | 日韩成人高清视频 | 九一精品在线 | 国产成人精品网 | 日本高清不卡在线 | 亚洲 欧美 日韩 在线 | 在线观看免费黄视频 | 欧美人与性动交xxⅹxx | 五月婷婷影院 | 五月激情婷婷在线 | 日本免费www| 久草影视在线观看 | 一级特级毛片 | 亚洲国产精品一 | 欧美一区二区三区四区在线 | 久久国产精品免费视频 | 牛牛影视免费观看 | 深夜在线视频 | 激情视频一区二区三区 | 日本一区二区三区四区视频 | 黄色茄子视频 | 欧美伦理片网站 | av在线免费观看一区 | 婷婷久久精品 | 亚洲激情偷拍 | 亚洲AV成人无码久久精品巨臀 | 成人h在线| 日本毛片网站 | 午夜神马福利 | 美女黄18以下禁止观看 | 曰批女人视频在线观看 | 国产精品视频一区二区三区 | 国产精品扒开腿做爽爽爽a片唱戏 | 久久免费视频精品 | 欧美少妇xxx| 国产午夜一区二区三区 | 午夜神马福利 | 中文字幕免费在线观看 | 伊人网视频在线观看 | 天天添 | 久久久国产精华液 | 中午字幕在线观看 | 丝袜老师扒开让我了一夜漫画 | 国产又黄又粗的视频 | 天天综合视频 | 无码h黄肉3d动漫在线观看 | 精品视频免费在线 | 欧美亚洲另类视频 | 久久国产免费观看 | 91大神视频在线播放 | 啪视频在线 | 久久发布国产伦子伦精品 | 简单av网| 欧美一区二区三区四区在线 | 欧美日韩一区二区三区 | 影音先锋成人在线 | 青青草激情视频 | 色七七视频 | 91色多多 | 日本欧美成人 | 狠狠操狠狠插 | 欧美 日韩 国产 一区 | 小珊的性放荡羞辱日记 | 欧美国产日韩在线 | 最新黄色网址在线观看 |