从JDK9的Flow接口说起
一、JDK9響應(yīng)式編程
Java是一個(gè)“古老”并且廣泛應(yīng)用的編程語(yǔ)言,但Java9中引入了一些新鮮有趣的特性。這篇文章主要介紹FlowAPI這個(gè)新特性,通過(guò)FlowAPI我們僅僅使用JDK就能夠搭建響應(yīng)式應(yīng)用程序,而不需要其他額外的類(lèi)庫(kù),如RxJava或Project Reactor。
盡管如此,當(dāng)你看到過(guò)接口文檔后你就會(huì)明白到正如字面所說(shuō),這只是一個(gè)API而已。她僅僅包含了一些Interface和一個(gè)實(shí)現(xiàn)類(lèi):
1.Interface Flow.Publisher<T>定義了生產(chǎn)數(shù)據(jù)和控制事件的方法。 2.Interface Flow.Subscriber<T>定義了消費(fèi)數(shù)據(jù)和事件的方法。 3.Interface Flow.Subscription 定義了鏈接Publisher和Subscriber的方法。 4.Interface Flow.Processor<T,R>定義了轉(zhuǎn)換Publisher到Subscriber的方法 5.最后,class SubmissionPublisher<T>是Flow.Publisher<T>的實(shí)現(xiàn),她可以靈活的生產(chǎn)數(shù)據(jù),同時(shí)與Reactive Stream兼容。雖然Java9中沒(méi)有很多FlowAPI的實(shí)現(xiàn)類(lèi)可供我們使用,但是依靠這些接口第三方可以提供的響應(yīng)式編程得到了規(guī)范和統(tǒng)一,比如從JDBC driver到RabbitMQ的響應(yīng)式實(shí)現(xiàn)。
其中Publisher為數(shù)據(jù)發(fā)布者,Subscriber為數(shù)據(jù)訂閱者,Subscription為發(fā)布者和訂閱者之間的訂閱關(guān)系,Processor為數(shù)據(jù)處理器。
- 關(guān)系圖:
二、Pull,Push,Pull-Push
我對(duì)響應(yīng)式編程的理解是, 這是一種數(shù)據(jù)消費(fèi)者控制數(shù)據(jù)流的編程方式。需要指出是,當(dāng)消費(fèi)速度低于生產(chǎn)速度時(shí),消費(fèi)者要求生產(chǎn)者降低速度以完全消費(fèi)數(shù)據(jù)(這個(gè)現(xiàn)象稱(chēng)作back-pressure(背壓))。這種處理方式不是在制造混亂,你可能已經(jīng)使用過(guò)這種模式,只是最近因?yàn)樵谥饕蚣芎推脚_(tái)上使用才變得更流行,比如Java9,Spring5。另外在分布式系統(tǒng)中處理大規(guī)模數(shù)據(jù)傳輸時(shí)也使用到了這種模式。
回顧過(guò)去可以幫我們更好的理解這種模式。
- pull模式
幾年前,最常見(jiàn)的消費(fèi)數(shù)據(jù)模式是pull-based。client端不斷輪詢(xún)服務(wù)端以獲取數(shù)據(jù)。這種模式的優(yōu)點(diǎn)是當(dāng)client端資源有限時(shí)可以更好的控制數(shù)據(jù)流(停止輪詢(xún)),而缺點(diǎn)是當(dāng)服務(wù)端沒(méi)有數(shù)據(jù)時(shí)輪詢(xún)是對(duì)計(jì)算資源和網(wǎng)絡(luò)資源的浪費(fèi)。
- push模式
隨著時(shí)間推移,處理數(shù)據(jù)的模式轉(zhuǎn)變?yōu)閜ush-based,生產(chǎn)者不關(guān)心消費(fèi)者的消費(fèi)能力,直接推送數(shù)據(jù)。這種模式的缺點(diǎn)是當(dāng)消費(fèi)資源低于生產(chǎn)資源時(shí)會(huì)造成緩沖區(qū)溢出從而數(shù)據(jù)丟失,當(dāng)丟失率維持在較小的數(shù)值時(shí)還可以接受,但是當(dāng)這個(gè)比率變大時(shí)我們會(huì)希望生產(chǎn)者降速以避免大規(guī)模數(shù)據(jù)丟失。
- pull-push模式
響應(yīng)式編程是一種pull-push混合模式以綜合他們的優(yōu)點(diǎn),這種模式下消費(fèi)者負(fù)責(zé)請(qǐng)求數(shù)據(jù)以控制生產(chǎn)者數(shù)據(jù)流,同時(shí)當(dāng)處理資源不足時(shí)也可以選擇阻斷或者丟棄數(shù)據(jù),接下來(lái)我們會(huì)看到一個(gè)典型案例。
三、Flow與Stream
響應(yīng)式編程并不是為了替換傳統(tǒng)編程,其實(shí)兩者相互兼容而且可以互相協(xié)作完成任務(wù)。Java8中引入的StreamAPI通過(guò)map,reduce以及其他操作可以完美的處理數(shù)據(jù)集,而FlowAPI則專(zhuān)注于處理數(shù)據(jù)的流通,比如對(duì)數(shù)據(jù)的請(qǐng)求,減速,丟棄,阻塞等。同時(shí)你可以使用Streams作為數(shù)據(jù)源(publisher),當(dāng)必要時(shí)阻塞丟棄其中的數(shù)據(jù)。你也可以在Subscriber中使用Streams以進(jìn)行數(shù)據(jù)的歸并操作。更值得一提的是:reactive streams不僅兼容傳統(tǒng)編程方式,而且還支持函數(shù)式編程以極大的提高可讀性和可維護(hù)性。有一點(diǎn)可能會(huì)使我們感到困惑:如果你需要在兩個(gè)系統(tǒng)間傳輸數(shù)據(jù),同時(shí)進(jìn)行轉(zhuǎn)形操作,如何使用Flows和Streams來(lái)完成?這種情況下,我們使用Java8的Function來(lái)做數(shù)據(jù)轉(zhuǎn)換,但是如何在Publisher和Subscriber之間使用StreamAPI呢?答案是我們可以在Publisher和Subscriber之間再加一個(gè)subscriber,她可以從最初的publisher獲取數(shù)據(jù),轉(zhuǎn)換,然后再作為一個(gè)新的publisher,而使最初的subscriber訂閱這個(gè)新的publisher,也是Java9中的接口Flow.Processor<T,R>,我們只需要實(shí)現(xiàn)這個(gè)接口并編寫(xiě)轉(zhuǎn)換數(shù)據(jù)的functions。從技術(shù)上講,我們完全可以使用Flows來(lái)替換Streams,但任何時(shí)候都這么做就顯得過(guò)于偏激。比如,我們創(chuàng)建一個(gè)Publisher來(lái)作為int數(shù)組的數(shù)據(jù)源,然后在Processor中轉(zhuǎn)換Integer為String,最后創(chuàng)建一個(gè)Subscriber來(lái)歸并到一個(gè)String中。這個(gè)時(shí)候就完全沒(méi)有必要使用Flows,因?yàn)檫@不是在控制兩個(gè)模塊或兩個(gè)線(xiàn)程間的數(shù)據(jù)通信,這個(gè)時(shí)候使用Streams更為合理。
四、例子
Publisher部分的源碼如下所示:
它是一個(gè)函數(shù)式接口,只包含一個(gè)subscribe方法,通過(guò)這個(gè)方法將數(shù)據(jù)發(fā)布出去。
Subscriber部分的源碼如下所示:
該接口包含了四個(gè)方法:
Subscription部分的源碼如下所示:
Processor部分的代碼如下所示:
它是一個(gè)空接口,但是它繼承了Publisher和Subscriber,所以它既能發(fā)布數(shù)據(jù)也能訂閱數(shù)據(jù)。基于這個(gè)特性,它可以充當(dāng)數(shù)據(jù)轉(zhuǎn)換的角色,先從數(shù)據(jù)發(fā)布者那接收數(shù)據(jù)項(xiàng),然后經(jīng)過(guò)處理后再發(fā)布給最終的數(shù)據(jù)訂閱者。
接下來(lái)我們舉個(gè)數(shù)據(jù)發(fā)布和數(shù)據(jù)訂閱的簡(jiǎn)單示例,以此了解Java 9 Flow API的使用。先入為主,直接貼出整個(gè)示例代碼:
public class FlowApiTest {public static void main(String[] args) throws InterruptedException {// 1. 定義 String 類(lèi)型的數(shù)據(jù)發(fā)布者,JDK 9自帶的// SubmissionPublisher 實(shí)現(xiàn)了 PublisherSubmissionPublisher<String> publisher = new SubmissionPublisher<>();// 2. 創(chuàng)建一個(gè)訂閱者,用于接收發(fā)布者的消息Subscriber<String> subscriber = new Subscriber<>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 通過(guò) Subscription 和發(fā)布者保持訂閱關(guān)系,并用它來(lái)給發(fā)布者反饋this.subscription = subscription;// 請(qǐng)求一個(gè)數(shù)據(jù)this.subscription.request(1);}@Overridepublic void onNext(String item) {// 接收發(fā)布者發(fā)布的消息System.out.println("【訂閱者】接收消息 <------ " + item);// 接收后再次請(qǐng)求一個(gè)數(shù)據(jù)this.subscription.request(1);// 如果不想再接收數(shù)據(jù),也可以直接調(diào)用 cancel,表示不再接收了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 過(guò)程中出現(xiàn)異常會(huì)回調(diào)這個(gè)方法System.out.println("【訂閱者】數(shù)據(jù)接收出現(xiàn)異常," + throwable);// 出現(xiàn)異常,取消訂閱,告訴發(fā)布者我不再接收數(shù)據(jù)了// 實(shí)際測(cè)試發(fā)現(xiàn),只要訂閱者接收消息出現(xiàn)異常,進(jìn)入了這個(gè)回調(diào)// 訂閱者就不會(huì)再繼續(xù)接收消息了this.subscription.cancel();}@Overridepublic void onComplete() {// 當(dāng)發(fā)布者發(fā)出的數(shù)據(jù)都被接收了,// 并且發(fā)布者關(guān)閉后,會(huì)回調(diào)這個(gè)方法System.out.println("【訂閱者】數(shù)據(jù)接收完畢");}};// 3. 發(fā)布者和訂閱者需要建立關(guān)系publisher.subscribe(subscriber);// 4. 發(fā)布者開(kāi)始發(fā)布數(shù)據(jù)for (int i = 0; i < 10; i++) {String message = "hello flow api " + i;System.out.println("【發(fā)布者】發(fā)布消息 ------> " + message);publisher.submit(message);}// 5. 發(fā)布結(jié)束后,關(guān)閉發(fā)布者publisher.close();// main線(xiàn)程延遲關(guān)閉,不然訂閱者還沒(méi)接收完消息,線(xiàn)程就被關(guān)閉了Thread.currentThread().join(2000);} }上面使用JDK 自帶的Publisher實(shí)現(xiàn)類(lèi)SubmissionPublisher來(lái)發(fā)布 String類(lèi)型的數(shù)據(jù),然后用匿名實(shí)現(xiàn)類(lèi)的方式創(chuàng)建了一個(gè)Subscriber實(shí)現(xiàn)類(lèi)。接著使用SubmissionPublisher的subscribe方法來(lái)為發(fā)布者和訂閱者建立關(guān)系。建立關(guān)系后,發(fā)布者就可以發(fā)布數(shù)據(jù),接收者也開(kāi)始接收數(shù)據(jù)。詳細(xì)的說(shuō)明注釋里都寫(xiě)了,這里就不再贅述代碼的邏輯了。
所謂的背壓(Backpressure)通俗的講就是數(shù)據(jù)接收者的壓力,傳統(tǒng)模式下,發(fā)布者只關(guān)心數(shù)據(jù)的創(chuàng)造與發(fā)布,而當(dāng)數(shù)據(jù)發(fā)布速率遠(yuǎn)高于數(shù)據(jù)接收速率的時(shí)候,數(shù)據(jù)接收者緩沖區(qū)將被填滿(mǎn),無(wú)法再接收數(shù)據(jù)。發(fā)布者并不關(guān)心這些,依舊不斷地發(fā)送數(shù)據(jù),所以就造成了IO阻塞。基于響應(yīng)式模型實(shí)現(xiàn)的Flow API可以很好地解決這個(gè)問(wèn)題。在Java 9的Flow API定義中,Subscriber會(huì)將Publisher發(fā)布的數(shù)據(jù)緩沖在Subscription中,其長(zhǎng)度默認(rèn)為256:
假如當(dāng)這個(gè)緩沖區(qū)都被填滿(mǎn)后,Publisher將會(huì)停止發(fā)送數(shù)據(jù),直到Subscriber接收了數(shù)據(jù)Subscription有空閑位置的時(shí)候,Publisher才會(huì)繼續(xù)發(fā)布數(shù)據(jù),而非一味地發(fā)個(gè)不停。下面用代碼來(lái)演示這個(gè)情況:
上面代碼中,我們?cè)赟ubscriber的onNext方法中用下面的代碼模擬延遲,讓數(shù)據(jù)處理過(guò)程維持在2秒左右:
try {TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {e.printStackTrace(); }然后數(shù)據(jù)發(fā)布量調(diào)整到了500,當(dāng)程序啟動(dòng)的時(shí)候,由于數(shù)據(jù)發(fā)布的速度非常快(普通for循環(huán)),所以數(shù)據(jù)訂閱者的數(shù)據(jù)緩沖區(qū)瞬間被填滿(mǎn),于是你會(huì)看到下面這個(gè)情況,只有當(dāng)數(shù)據(jù)訂閱者處理了一個(gè)數(shù)據(jù)的時(shí)候,數(shù)據(jù)發(fā)布者才會(huì)相應(yīng)地再次發(fā)布一個(gè)新數(shù)據(jù):
Processor的使用也很簡(jiǎn)單,其實(shí)它就是Publisher和Subscriber的結(jié)合體,充當(dāng)數(shù)據(jù)處理的角色,通常的做法是用它來(lái)接收發(fā)布者發(fā)布的消息,然后進(jìn)行相應(yīng)的處理,再將數(shù)據(jù)發(fā)布出去,供消息訂閱者接收。下面是一個(gè)Processor用法的簡(jiǎn)單示例:
public class ProcessorTest {static class MyProcessor extends SubmissionPublisher<String> implements Processor<String, String> {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 通過(guò) Subscription 和發(fā)布者保持訂閱關(guān)系,并用它來(lái)給發(fā)布者反饋this.subscription = subscription;// 請(qǐng)求一個(gè)數(shù)據(jù)this.subscription.request(1);}@Overridepublic void onNext(String item) {// 接收發(fā)布者發(fā)布的消息System.out.println("【處理器】接收消息 <------ " + item);// 處理器將消息進(jìn)行轉(zhuǎn)換String newItem = "【處理器加工后的數(shù)據(jù): " + item + "】";this.submit(newItem);// 接收后再次請(qǐng)求一個(gè)數(shù)據(jù),表示我已經(jīng)處理完了,你可以再發(fā)數(shù)據(jù)過(guò)來(lái)了this.subscription.request(1);// 如果不想再接收數(shù)據(jù),也可以直接調(diào)用cancel,表示不再接收了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 過(guò)程中出現(xiàn)異常會(huì)回調(diào)這個(gè)方法System.out.println("【處理器】數(shù)據(jù)接收出現(xiàn)異常," + throwable);// 出現(xiàn)異常,取消訂閱,告訴發(fā)布者我不再接收數(shù)據(jù)了this.subscription.cancel();}@Overridepublic void onComplete() {System.out.println("【處理器】數(shù)據(jù)處理完畢");// 處理器處理完數(shù)據(jù)后關(guān)閉this.close();}}public static void main(String[] args) throws InterruptedException {// 1. 定義String類(lèi)型的數(shù)據(jù)發(fā)布者,JDK 9自帶的// SubmissionPublisher實(shí)現(xiàn)了 PublisherSubmissionPublisher<String> publisher = new SubmissionPublisher<>();// 2. 創(chuàng)建處理器,用于接收發(fā)布者發(fā)布的消息,// 轉(zhuǎn)換后再發(fā)送給訂閱者MyProcessor processor = new MyProcessor();// 3. 發(fā)布者和處理器建立訂閱的關(guān)系publisher.subscribe(processor);// 4.創(chuàng)建一個(gè)訂閱者,用于接收處理器的消息Subscriber<String> subscriber = new Subscriber<>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {this.subscription = subscription;this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("【訂閱者】接收消息 <------ " + item + "");this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.out.println("【訂閱者】數(shù)據(jù)接收出現(xiàn)異常," + throwable);this.subscription.cancel();}@Overridepublic void onComplete() {System.out.println("【訂閱者】數(shù)據(jù)接收完畢");}};// 5. 處理器和訂閱者建立訂閱關(guān)系processor.subscribe(subscriber);// 6. 發(fā)布者開(kāi)始發(fā)布數(shù)據(jù)for (int i = 0; i < 10; i++) {String message = "hello flow api " + i;System.out.println("【發(fā)布者】發(fā)布消息 ------> " + message);publisher.submit(message);}// 7. 發(fā)布結(jié)束后,關(guān)閉發(fā)布者publisher.close();// main線(xiàn)程延遲關(guān)閉,不然訂閱者還沒(méi)接收完消息,線(xiàn)程就被關(guān)閉了Thread.currentThread().join(2000);} }
參考文章
參考文章
總結(jié)
以上是生活随笔為你收集整理的从JDK9的Flow接口说起的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 一道京东的面试题
- 下一篇: 互联网日报 | 新东方二次上市通过港交所