日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink的异步I/O及Future和CompletableFuture

發(fā)布時(shí)間:2024/7/5 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink的异步I/O及Future和CompletableFuture 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1 概述

??Flink在做流數(shù)據(jù)計(jì)算時(shí),經(jīng)常要外部系統(tǒng)進(jìn)行交互,如Redis、Hive、HBase等等存儲(chǔ)系統(tǒng)。系統(tǒng)間通信延遲是否會(huì)拖慢整個(gè)Flink作業(yè),影響整體吞吐量和實(shí)時(shí)性。

??如需要查詢外部數(shù)據(jù)庫以關(guān)聯(lián)上用戶的額外信息,通常的實(shí)現(xiàn)方式是向數(shù)據(jù)庫發(fā)送用戶a的查詢請(qǐng)求(如在MapFunction中),然后等待結(jié)果返回,返回之后才能進(jìn)行下一次查詢請(qǐng)求,這是一種同步訪問的模式,如下圖左邊所示,網(wǎng)絡(luò)等待時(shí)間極大的阻礙了吞吐和延遲。

??Flink從1.2版本開始就引入了Async I/O(https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/asyncio.html)。異步模式可以并發(fā)的處理多個(gè)請(qǐng)求和回復(fù),也就是說,你可以連續(xù)的向數(shù)據(jù)庫發(fā)送用戶a、b、c、d等的請(qǐng)求,與此同時(shí),哪個(gè)請(qǐng)求的回復(fù)先返回了就處理哪個(gè)回復(fù),從而連續(xù)的請(qǐng)求之間不需要阻塞等待,如上圖右邊所示,這也是Async I/O的實(shí)現(xiàn)原理。

2 Future和CompletableFuture

??先了解一下Future和CompletableFuture

2.1 Future

??從JDK1.5開始,提供了Future來表示異步計(jì)算的結(jié)果,一般需要結(jié)合ExecutorService(執(zhí)行者)和Callable(任務(wù))來使用。Future的get方法是阻塞的

package com.quinto.flink;import java.util.concurrent.*;public class FutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {// 核心線程池大小5 最大線程池大小10 線程最大空閑時(shí)間60 時(shí)間單位s 線程等待隊(duì)列ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));Future<Long> future = executor.submit(() -> {// 故意耗時(shí)Thread.sleep(3000);return System.currentTimeMillis();});System.out.println(future.get());System.out.println("因?yàn)間et是阻塞的,所以這個(gè)消息在數(shù)據(jù)之后輸出");executor.shutdown();} }

??結(jié)果為

1612337847685 因?yàn)間et是阻塞的,所以這個(gè)消息在數(shù)據(jù)之后輸出

??Future只是個(gè)接口,實(shí)際上返回的類是FutureTask:

public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}

??FutureTask的get方法如下

private volatile int state;private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;public V get() throws InterruptedException, ExecutionException {int s = state;// 首先判斷FutureTask的狀態(tài)是否為完成狀態(tài),如果是完成狀態(tài),說明已經(jīng)執(zhí)行過set或setException方法,返回report(s)。任務(wù)的運(yùn)行狀態(tài)。最初是NEW == 0。運(yùn)行狀態(tài)僅在set、setException和cancel方法中轉(zhuǎn)換為終端狀態(tài)。if (s <= COMPLETING)//如果get時(shí),FutureTask的狀態(tài)為未完成狀態(tài),則調(diào)用awaitDone方法進(jìn)行阻塞s = awaitDone(false, 0L);return report(s);}/*** awaitDone方法可以看成是不斷輪詢查看FutureTask的狀態(tài)。在get阻塞期間:①如果執(zhí)行g(shù)et的線程被中斷,則移除FutureTask的所有阻塞隊(duì)列中的線程(waiters),并拋出中斷異常;②如果FutureTask的狀態(tài)轉(zhuǎn)換為完成狀態(tài)(正常完成或取消),則返回完成狀態(tài);③如果FutureTask的狀態(tài)變?yōu)镃OMPLETING, 則說明正在set結(jié)果,此時(shí)讓線程等一等;④如果FutureTask的狀態(tài)為初始態(tài)NEW,則將當(dāng)前線程加入到FutureTask的阻塞線程中去;⑤如果get方法沒有設(shè)置超時(shí)時(shí)間,則阻塞當(dāng)前調(diào)用get線程;如果設(shè)置了超時(shí)時(shí)間,則判斷是否達(dá)到超時(shí)時(shí)間,如果到達(dá),則移除FutureTask的所有阻塞列隊(duì)中的線程,并返回此時(shí)FutureTask的狀態(tài),如果未到達(dá)時(shí)間,則在剩下的時(shí)間內(nèi)繼續(xù)阻塞當(dāng)前線程。*/private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}

??Future的局限性:

??①可以發(fā)現(xiàn)雖然 Future接口可以構(gòu)建異步應(yīng)用,但是對(duì)于結(jié)果的獲取卻是很不方便,只能通過阻塞或者輪詢的方式得到任務(wù)的結(jié)果。阻塞的方式顯然和我們的異步編程的初衷相違背,輪詢的方式又會(huì)耗費(fèi)無謂的 CPU 資源,而且也不能及時(shí)地得到計(jì)算結(jié)果。

??②它很難直接表述多個(gè)Future 結(jié)果之間的依賴性。實(shí)際開發(fā)中,經(jīng)常需要將多個(gè)異步計(jì)算的結(jié)果合并成一個(gè),或者等待Future集合中的所有任務(wù)都完成,或者任務(wù)完成以后觸發(fā)執(zhí)行動(dòng)作

2.2 CompletableFuture

??JDk1.8引入了CompletableFuture,它實(shí)際上也是Future的實(shí)現(xiàn)類。這里可以得出:

??CompletableFuture有一些新特性,能完成Future不能完成的工作。

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

??首先看類定義,實(shí)現(xiàn)了CompletionStage接口,這個(gè)接口是所有的新特性了。

??對(duì)于CompletableFuture有四個(gè)執(zhí)行異步任務(wù)的方法:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

??supply開頭的帶有返回值,run開頭的無返回值。如果我們指定線程池,則會(huì)使用我么指定的線程池;如果沒有指定線程池,默認(rèn)使用ForkJoinPool.commonPool()作為線程池。

public static void main(String[] args) throws ExecutionException, InterruptedException {// 核心線程池大小5 最大線程池大小10 線程最大空閑時(shí)間60 時(shí)間單位s 線程等待隊(duì)列ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {return "hello";}, executor);System.out.println(future.get());executor.shutdown();}

??上面只是對(duì)執(zhí)行異步任務(wù),如果要利用計(jì)算結(jié)果進(jìn)一步處理使用,進(jìn)行結(jié)果轉(zhuǎn)換有如下方法:①thenApply (同步)②thenApplyAsync(異步)

// 同步轉(zhuǎn)換 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) // 異步轉(zhuǎn)換,使用默認(rèn)線程池 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) // 異步轉(zhuǎn)換,使用指定線程池 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) package com.quinto.flink;import java.util.concurrent.*;public class FutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {// 核心線程池大小5 最大線程池大小10 線程最大空閑時(shí)間60 時(shí)間單位s 線程等待隊(duì)列ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));CompletableFuture<Long> future = CompletableFuture// 執(zhí)行異步任務(wù).supplyAsync(() -> {return System.currentTimeMillis();}, executor)// 對(duì)前面的結(jié)果進(jìn)行處理.thenApply(n -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}Long time = System.currentTimeMillis();System.out.println("如果是同步的,這條消息應(yīng)該先輸出");return time-n;});System.out.println("等待2秒");System.out.println(future.get());executor.shutdown();} }

??結(jié)果為

如果是同步的,這條消息應(yīng)該先輸出 等待2秒 2017

??如果把thenApply換成thenApplyAsync,結(jié)果如下

等待2秒 如果是同步的,這條消息應(yīng)該先輸出 2008

??處理完任務(wù)以及結(jié)果,該去消費(fèi)了有如下方法:①thenAccept(能夠拿到并利用執(zhí)行結(jié)果) ② thenRun(不能夠拿到并利用執(zhí)行結(jié)果,只是單純的執(zhí)行其它任務(wù))③thenAcceptBoth(能傳入另一個(gè)stage,然后把另一個(gè)stage的結(jié)果和當(dāng)前stage的結(jié)果作為參數(shù)去消費(fèi)。)

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)public CompletableFuture<Void> thenRun(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)

??如果要組合兩個(gè)任務(wù)有如下方法:①thenCombine(至少兩個(gè)方法參數(shù),一個(gè)為其它stage,一個(gè)為用戶自定義的處理函數(shù),函數(shù)返回值為結(jié)果類型) ;② thenCompose(至少一個(gè)方法參數(shù)即處理函數(shù),函數(shù)返回值為stage類型)

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

??如果有多條渠道去完成同一種任務(wù),選擇最快的那個(gè)有如下方法:①applyToEither (有返回值)②acceptEither(沒有返回值)

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)

??Future和CompletableFuture對(duì)比:

??Future:只能通過get方法或者死循環(huán)判斷isDone來獲取。異常情況不好處理。

??CompletableFuture:只要設(shè)置好回調(diào)函數(shù)即可實(shí)現(xiàn):①只要任務(wù)完成,就執(zhí)行設(shè)置的函數(shù),不用考慮什么時(shí)候任務(wù)完成②如果發(fā)生異常,會(huì)執(zhí)行處理異常的函數(shù)③能應(yīng)付復(fù)雜任務(wù)的處理,如果有復(fù)雜任務(wù),比如依賴問題,組合問題等,同樣可以寫好處理函數(shù)來處理

3 使用Aysnc I/O的條件

??(1)具有對(duì)外部系統(tǒng)進(jìn)行異步IO訪問的客戶端API,如使用vertx,但是目前只支持scala 2.12的版本,可以使用java類庫來做

??(2)沒有這樣的客戶端,可以通過創(chuàng)建多個(gè)客戶端并使用線程池處理同步調(diào)用來嘗試將同步客戶端轉(zhuǎn)變?yōu)橛邢薜牟l(fā)客戶端,如可以寫ExecutorService來實(shí)現(xiàn)。但是這種方法通常比適當(dāng)?shù)漠惒娇蛻舳诵实汀?/p>

4 Aysnc I/O的案例

4.1 有外部系統(tǒng)進(jìn)行異步IO訪問的客戶端API的方式

// 這個(gè)例子實(shí)現(xiàn)了異步請(qǐng)求和回調(diào)的Futures,具有Java8的Futures接口(與Flink的Futures相同)class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {// 定義連接客戶端,并且不參與序列化private transient DatabaseClient client;@Overridepublic void open(Configuration parameters) throws Exception {// 創(chuàng)建連接client = new DatabaseClient(host, post, credentials);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {// 用連接進(jìn)行查詢,查詢之后返回的是future,有可能有,有可能沒有final Future<String> result = client.query(key);// 如果有結(jié)果返回的話會(huì)通知你(有個(gè)回調(diào)方法),這里可以設(shè)置超時(shí)時(shí)間,如果超過了一定的時(shí)間還沒有返回相當(dāng)于從這里取一取就會(huì)拋異常,結(jié)果就會(huì)返回nullCompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {return result.get();} catch (InterruptedException | ExecutionException e) {// Normally handled explicitly.return null;}}//如果它已經(jīng)執(zhí)行完了,就會(huì)把結(jié)果放到Collections里面}).thenAccept( (String dbResult) -> {resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));});} }// create the original stream DataStream<String> stream = ...;// unorderedWait這個(gè)是不在乎請(qǐng)求返回的順序的,里面用到的是阻塞隊(duì)列,隊(duì)列滿了會(huì)阻塞,隊(duì)列里面一次最多可以有100個(gè)異步請(qǐng)求,超時(shí)時(shí)間是1000毫秒 DataStream<Tuple2<String, String>> resultStream =AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

4.2 沒有外部系統(tǒng)進(jìn)行異步IO訪問的客戶端API的方式

package com.quinto.flink;import com.alibaba.druid.pool.DruidDataSource; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.Collections; import java.util.concurrent.*; import java.util.function.Supplier;class AsyncDatabaseRequest extends RichAsyncFunction<String,String> {// 這里用到了連接池,以前查詢是阻塞的,查詢完這個(gè)下一個(gè)還是同個(gè)連接,// 現(xiàn)在要發(fā)送多個(gè)請(qǐng)求不能用同個(gè)連接,每個(gè)請(qǐng)求都會(huì)返回一個(gè)結(jié)果。這里不但要用到連接池,還要用到線程池。private transient DruidDataSource druidDataSource;private transient ExecutorService executorService;@Overridepublic void open(Configuration parameters) throws Exception {executorService = Executors.newFixedThreadPool(20);druidDataSource = new DruidDataSource();druidDataSource.setDriverClassName("com.mysql.jdbc.Driver");druidDataSource.setUsername("root");druidDataSource.setPassword("root");druidDataSource.setUrl("jdbc:mysql:..localhost:3306/bigdata?characterEncoding=UTF-8");druidDataSource.setInitialSize(5);druidDataSource.setMinIdle(10);druidDataSource.setMaxActive(20);}@Overridepublic void close() throws Exception {druidDataSource.close();executorService.shutdown();}@Overridepublic void asyncInvoke(String input,final ResultFuture<String> resultFuture) {// 向線程池丟入一個(gè)線程Future<String> future = executorService.submit(() -> {String sql = "SELECT id,name FROM table WHERE id = ?";String result = null;Connection connection = null;PreparedStatement stmt = null;ResultSet rs = null;try {connection = druidDataSource.getConnection();stmt = connection.prepareStatement(sql);rs = stmt.executeQuery();while (rs.next()){result = rs.getString("name");}}finally {if (rs!=null){rs.close();}if (stmt!=null){stmt.close();}if (connection!=null){connection.close();}}return result;});// 接收任務(wù)的處理結(jié)果,并消費(fèi)處理,無返回結(jié)果。CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {// 從future里面把結(jié)果取出來,如果有就返回,沒有的話出異常就返回nullreturn future.get();} catch (Exception e) {return null;}}// 拿到上一步的執(zhí)行結(jié)果,進(jìn)行處理}).thenAccept((String result)->{// 從future里面取出數(shù)據(jù)會(huì)有一個(gè)回調(diào),然后會(huì)把他放到resultFuture,complete中要求放的是一個(gè)集合,所以需要進(jìn)行轉(zhuǎn)換resultFuture.complete(Collections.singleton(result));});} }

??這樣mysql的API還是用他原來的,只不過把mysql的查詢使用把要查詢的功能丟線程池。以前查詢要好久才返回,現(xiàn)在來一個(gè)查詢就丟到線程池里面,不需要等待結(jié)果,返回的結(jié)果放在future里面。原來查詢是阻塞的,現(xiàn)在開啟一個(gè)線程查,把查詢結(jié)果丟到future里面。相當(dāng)于新開一個(gè)線程讓他幫我查,原來是單線程的,現(xiàn)在開多個(gè)線程同時(shí)查,然后把結(jié)果放future,以后有結(jié)果了從這里面取。

總結(jié)

以上是生活随笔為你收集整理的Flink的异步I/O及Future和CompletableFuture的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。