日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

reactor官方文档译文(2)Reactor-core模块

發(fā)布時(shí)間:2025/4/5 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 reactor官方文档译文(2)Reactor-core模块 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
You should never do your asynchronous work alone. — Jon Brisbin 完成Reactor 1后寫到 You should never do your asynchronous work alone. — Stephane Maldini 完成Reactor 2后寫到

名稱解釋:back pressure:背壓。在交換機(jī)在阻止外來數(shù)據(jù)包發(fā)送到堵塞端口的時(shí)候可能會(huì)發(fā)生丟包。而背壓就是考驗(yàn)交換機(jī)在這個(gè)時(shí)候避免丟包的能力。很多的交換機(jī)當(dāng)發(fā)送或接收緩沖區(qū)溢出的時(shí)候通過將阻塞信號發(fā)送回源地址來實(shí)現(xiàn)背壓。交換機(jī)在全雙工時(shí)使用IEEE802.3x流控制達(dá)到同樣目的。

首先,我們使用groovy示例來展示core模塊的功能:

//Initialize context and get default dispatcher Environment.initialize()//RingBufferDispatcher with 8192 slots by default def dispatcher = Environment.sharedDispatcher()//Create a callback Consumer<Integer> c = { data ->println "some data arrived: $data" }//Create an error callback Consumer<Throwable> errorHandler = { it.printStackTrace }//Dispatch data asynchronously r.dispatch(1234, c, errorHandler)Environment.terminate()

下面,我們使用Stream reactive實(shí)現(xiàn)來看:

//standalone async processor def processor = RingBufferProcessor.<Integer>create()//send data, will be kept safe until a subscriber attaches to the processor processor.onNext(1234) processor.onNext(5678)//consume integer data processor.subscribe(new Subscriber<Integer>(){void onSubscribe(Subscription s){//unbounded subscriber s.request Long.MAX}void onNext(Integer data){println data}void onError(Throwable err){err.printStackTrace()}void onComplete(){println 'done!'} }//Shutdown internal thread and call complete processor.onComplete()

Core模塊概覽

Reactor core模塊的子單元:

Common IO和功能類型,一些是直接從java8 功能接口回遷的。Function、Supplier、consumer、Predicate、BiConsumer、BiFunctionTuplesResource、Pausable、TimerBuffer,Codec和一組預(yù)定義的Codec。Environment 上下文Dispatcher 協(xié)議和一組預(yù)定義的Dispatcher。預(yù)定義的Reactive Stream Processor

reactor-core可以用來逐漸替代另外的消息傳遞策略、調(diào)度時(shí)間任務(wù)或者以小的功能塊組織代碼。這種突破使開發(fā)者與其它Reactive基礎(chǔ)庫更好的合作,特別是對于沒有耐心的開發(fā)者,沒有了對RingBuffer的理解負(fù)擔(dān)。

注意:Reactor-core隱藏了LMAX disruptor,因此不會(huì)出現(xiàn)也不會(huì)和現(xiàn)有的Disruptor依賴沖突。

功能模塊

?功能模塊重用是核心,通常情況下在你使用Reactor時(shí)就需要的功能。因此,功能編程酷在哪里?其中一個(gè)核心理念是將可執(zhí)行代碼看做別的數(shù)據(jù)。另一點(diǎn),類似于Closure或者匿名函數(shù),此時(shí)業(yè)務(wù)邏輯由最初的調(diào)用者決定。它同樣避免了過量的If/SWITCH模塊,并且這種分離是概念更清晰:每個(gè)模塊完成一個(gè)功能且不需要共享任何東西。

?組織功能模塊

  每個(gè)功能組件都給出它的一般任務(wù)的明確意圖:

Consumer:簡單回調(diào)--一勞永逸的

BiCounsumer:兩個(gè)參數(shù)的簡單回調(diào),通常用在序列比較,例如:前一個(gè)和下一個(gè)參數(shù)。

Function:轉(zhuǎn)換邏輯--請求/應(yīng)答

BiFunction:兩個(gè)參數(shù)的轉(zhuǎn)換,通常用在累加器,比較前一個(gè)和下一個(gè)參數(shù),返回一個(gè)新的值。

Supplier:工廠邏輯--輪詢

Predicate:測試路徑--過濾

注意:我們也將Publisher和Subscriber視作功能塊,敢于稱之為Reactive功能塊。盡管如此,它們作為基礎(chǔ)組件,廣泛應(yīng)用到Reactor及其其它地方。Stream API接收reactor.fn參數(shù),為你創(chuàng)建合適的Subscriber。

好消息是在功能模塊中包裝可執(zhí)行指令可以向磚塊一樣進(jìn)行復(fù)用。

Consumer<String> consumer = new Consumer<String>(){@Overridevoid accept(String value){System.out.println(value);} };//Now in Java 8 style for brievety Function<Integer, String> transformation = integer -> ""+integer;Supplier<Integer> supplier = () -> 123;BiConsumer<Consumer<String>, String> biConsumer = (callback, value) -> {for(int i = 0; i < 10; i++){//lazy evaluate the final logic to run callback.accept(value);} };//note how the execution flows from supplier to biconsumer biConsumer.accept(consumer,transformation.apply(supplier.get()) );

?最初聽起來,這可能不是一個(gè)引人注目的革命性變革。但是這種基本思維模式的改變,將揭示我們使異步代碼變的穩(wěn)健和可組合性的使命是多么可貴。Dispatcher分發(fā)器將輸入數(shù)據(jù)和錯(cuò)誤回調(diào)分發(fā)給consumer來處理。Reactor Stream模塊將更好的使用這些組件。

  當(dāng)使用Ioc容器如spring時(shí),一個(gè)好的開發(fā)者將利用Java的配置屬性來返回一個(gè)無狀態(tài)的功能bean。然后可以優(yōu)美的注入到stream Pipeline或者分發(fā)他們的執(zhí)行代碼中的block中。

  元組

  你可以注意到這些接口,它們對輸入?yún)?shù)和比較少的固定數(shù)量的參數(shù)的泛型有很好的支持。你怎么傳遞超過1個(gè)或者超過2個(gè)的參數(shù)呢?答案是使用元組Tuple,Tuple類似于csv中一個(gè)單獨(dú)實(shí)例的一樣,可以在在功能性編程中保證它們的類型安全和支持多個(gè)數(shù)量的參數(shù)。

  以前面的例子為例,我們嘗試提供兩個(gè)參數(shù)的BiConsumer而使用單個(gè)參數(shù)的Consumer

Consumer<Tuple2<Consumer<String>, String>> biConsumer = tuple -> {for(int i = 0; i < 10; i++){//Correct typing, compiler happy tuple.getT1().accept(tuple.getT2());} };biConsumer.accept(Tuple.of(consumer,transformation.apply(supplier.get())) );

注意:Tuple需要分配更多的空間,因此在比較或者鍵值信號等一般使用場景中更多直接使用Bi***組件。

Environment和Dispatcher

  功能性構(gòu)建塊已經(jīng)準(zhǔn)備就緒,讓我們使用它們來進(jìn)行異步編程。第一步是到Dispatcher分區(qū)。

  在我們啟動(dòng)任意Dispatcher前,需要保證可以有效的創(chuàng)建它們。通常,創(chuàng)建它們的代價(jià)比較高,原因是需要預(yù)分配一個(gè)內(nèi)存分區(qū)來保持分配的信號,這就是前言中介紹的著名的運(yùn)行時(shí)分配和啟動(dòng)時(shí)預(yù)分配的不同對比。因此提出了一個(gè)名為"Environment"共享上下文概念,使用它來管理這些不同類型的Dispatcher,從而避免不必要的創(chuàng)建開銷。

  Environment

  reactor的使用者(或者可用的擴(kuò)展庫如@Spring)創(chuàng)建或者停止Environment。它們自動(dòng)從META_INF/reactor/reactor-environment.properties處讀取配置文件。

  注意,屬性文件可以改變,通過在classpath下的META-INFO/reactor目錄下一個(gè)新的屬性配置可以改變屬性文件。

? ? ? ?通過傳遞下面的環(huán)境變量reactor.profiles.active來在運(yùn)行時(shí)段改變默認(rèn)的配置文件。

java - jar reactor-app.jar -Dreactor.profiles.active=turbo

  啟動(dòng)和停止Environment

Environment env = Environment.initialize();//Current registered environment is the same than the one initialized Assert.isTrue(Environment.get() == env);//Find a dispatcher named "shared" Dispatcher d = Environment.dispatcher("shared");//get the Timer bound to this environment Timer timer = Environment.timer();//Shutdown registered Dispatchers and Timers that might run non-daemon threads Environment.terminate(); //An option could be to register a shutdownHook to automatically invoke terminate.

注意:在一個(gè)給定的Jvm應(yīng)用中,最好只維護(hù)一個(gè)Enviroment.在大多數(shù)情況下,使用Environment.initializeIfEmpty()就完全ok。

Dispacher分發(fā)器

  從Reactor 1開始,Dispatcher就存在了。Dispatcher通常抽象消息傳遞的方法,和Java Executor有類似的通用約定。事實(shí)上Dispatcher繼承自Executor。

  Dispatcher對有數(shù)據(jù)信號的傳送方式及消費(fèi)者同步或異步執(zhí)行的錯(cuò)誤信息有一套比較嚴(yán)格的類型限制約定。這種方式在面對經(jīng)典的Executors時(shí)解決了第一個(gè)問題--錯(cuò)誤隔離。效果如下:

錯(cuò)誤消費(fèi)者的調(diào)用不需要終端當(dāng)前分配的資源。如果沒有指定,它默認(rèn)從當(dāng)前存在的Environment中去尋找,并使用指定給它的errorJournalConsumer。

  異步Dispatche提供的第二個(gè)獨(dú)特的特征是運(yùn)行使用尾部遞歸策略來再次調(diào)度。尾部遞歸的應(yīng)用場景是分發(fā)器發(fā)現(xiàn)Dispatcher的classLoader已經(jīng)分配到正在運(yùn)行的線程,這時(shí),當(dāng)當(dāng)前消費(fèi)者返回時(shí)將要執(zhí)行的task放入到隊(duì)列中。

  使用一個(gè)類似于?Groovy Spock test的異步的多線程分發(fā)器:

import reactor.core.dispatch.*//... given:def sameThread = new SynchronousDispatcher()def diffThread = new ThreadPoolExecutorDispatcher(1, 128)def currentThread = Thread.currentThread()Thread taskThread = nulldef consumer = { ev ->taskThread = Thread.currentThread()}def errorConsumer = { error ->error.printStackTrace()}when: "a task is submitted"sameThread.dispatch('test', consumer, errorConsumer)then: "the task thread should be the current thread"currentThread == taskThreadwhen: "a task is submitted to the thread pool dispatcher"def latch = new CountDownLatch(1)diffThread.dispatch('test', { ev -> consumer(ev); latch.countDown() }, errorConsumer)latch.await(5, TimeUnit.SECONDS) // Wait for task to execute then: "the task thread should be different when the current thread"taskThread != currentThread

注意:

  如Java Executor一樣,它們?nèi)鄙倭宋覀儗⒓尤氲絉eactor 2.x的一個(gè)特點(diǎn):Reactive stream協(xié)議。這時(shí)在Reactor中僅有幾個(gè)未完成事項(xiàng)中的一個(gè)未完成事項(xiàng)--沒有將Reactive stream標(biāo)準(zhǔn)直接綁定到Reactor中。然后,你可以在Stream章節(jié)部分找到快速結(jié)合Reactor stream的方法。

表3 Dispatcher家族介紹

DispatcherFrom EnvironmentDescriptionStrengthsWeaknesses

RingBuffer

sharedDispatcher()

An LMAX DisruptorRingBuffer based Dispatcher.

Small latency peaks tolerated

Fastest Async Dispatcher, 10-15M+ dispatch/sec on commodity hardware

Support ordering

'Spin' Loop when getting the next slot on full capcity

Single Threaded, no concurrent dispatch

Mpsc

sharedDispatcher() if Unsafe not available

Alternative optimized message-passing structure.

Latency peaks tolerated

5-10M+ dispatch/sec on commodity hardware

Support ordering

Unbounded and possibly using as much available heap memory as possible

Single Threaded, no concurrent dispatch

WorkQueue

workDispatcher()

An LMAX DisruptorRingBuffer based Dispatcher.

Latency Peak tolerated for a limited time

Fastest Multi-Threaded Dispatcher, 5-10M+ dispatch/sec on commodity hardware

'Spin' Loop when getting the next slot on full capcity

Concurrent dispatch

Doesn’t support ordering

Synchronous

dispatcher("sync") or SynchronousDispatcher. INSTANCE

Runs on the current thread.

Upstream and Consumer executions are colocated

Useful for Test support

Support ordering if the reentrant dispatch is on the current thread

No Tail Recursion support

Blocking

TailRecurse

tailRecurse() or TailRecurse Dispatcher. INSTANCE

Synchronous Reentrant Dispatcher that enqueue dispatches when currently dispatching.

Upstream and Consumer executions are colocated

Reduce execution stack, greatly expanded by functional call chains

Unbounded Tail Recurse depth

Blocking

Support ordering (Thread Stealing)

ThreadPoolExecutor

newDispatcher(int, int, DispatcherType. THREAD_POOL_EXECUTOR)

Use underlying ThreadPoolExecutor message-passing

Multi-Threaded

Blocking Consumers, permanent latency tolerated

1-5M+ dispatch/sec on commodity hardware

Concurrent run on a given consumer executed twice or more

Unbounded by default

Doesn’t support ordering

Traceable Delegating

N/A

Decorate an existing dispatcher with TRACE level logs.

Dispatch tapping

Runs slower than the delegated dispatcher alone

Log overhead (runtime, disk)

?

?

DispatcherSupplier

?  你可能已經(jīng)注意到了,一些Dispatcher事單線程的,特別是RingBufferDispatcher和MpsDispatcher。更進(jìn)一步,根據(jù)Reactive Stream規(guī)范,Subscriber/Processor的實(shí)現(xiàn)是不允許并發(fā)通知的。這一點(diǎn)尤其對Reactor Streams產(chǎn)生了影響,使用Stream.dispachOn(Dispatcher)和一個(gè)Dispatcher來給并發(fā)信號的顯示失敗留后門。

  然后,有一個(gè)方法來避免這個(gè)缺點(diǎn),使用Dispatcher池DispatcherSupplier。實(shí)際上,作為Supplier的工廠,Supplier.get()方法根據(jù)有趣的共享策略:輪詢、最少使用。。等間接提供一個(gè)Dispatcher。

  Enviroment提供了一個(gè)靜態(tài)方法去創(chuàng)建、并注冊到當(dāng)前活躍Environment的Dispatcher池:一組輪詢的返回Dispatcher。一旦就緒,Supplier提供對Dispatcher數(shù)目的控制。

  不同于一般的Dispatcher,Environment提供了一站式的管理服務(wù):

Environment.initialize(); //....//Create an anonymous pool of 2 dispatchers with automatic default settings (same type than default dispatcher, default backlog size...) DispatcherSupplier supplier = Environment.newCachedDispatchers(2);Dispatcher d1 = supplier.get(); Dispatcher d2 = supplier.get(); Dispatcher d3 = supplier.get(); Dispatcher d4 = supplier.get();Assert.isTrue( d1 == d3 && d2 == d4); supplier.shutdown();//Create and register a new pool of 3 dispatchers DispatcherSupplier supplier1 = Environment.newCachedDispatchers(3, "myPool"); DispatcherSupplier supplier2 = Environment.cachedDispatchers("myPool");Assert.isTrue( supplier1 == supplier2 ); supplier1.shutdown();

Timer定時(shí)器

  Dispatcher盡可能快的計(jì)算接收的任務(wù),然而,Timer定時(shí)器提供一次性或者周期性的調(diào)度API。Reactor Core模塊默認(rèn)提供了一個(gè)HashWheelTimer定時(shí)器,它自動(dòng)綁定到任意的新的Environment中。HashWheelTimer對處理大量的、并發(fā)的、內(nèi)存調(diào)度任務(wù)有巨大的優(yōu)勢,它是替換java TaskScheduler的一個(gè)強(qiáng)大的選項(xiàng)。

  注意:它不是一個(gè)持久化的調(diào)度器,應(yīng)用關(guān)閉時(shí)task將會(huì)丟失。下個(gè)正式版本Timer定時(shí)器將會(huì)有一些改變,例如使用redis增加持久化/共享,請關(guān)注。

?  創(chuàng)建一個(gè)簡單的定時(shí)器:

import reactor.fn.timer.Timer//... given: "a new timer"Environment.initializeIfEmpty()Timer timer = Environment.timer()def latch = new CountDownLatch(10)when: "a task is submitted"timer.schedule({ Long now -> latch.countDown() } as Consumer<Long>,period,TimeUnit.MILLISECONDS)then: "the latch was counted down"latch.await(1, TimeUnit.SECONDS)timer.cancel()Environment.terminate()

核心Processor

核心Processor用來做比Dispatcher更集中的job:支持背壓計(jì)算異步task。

提供了org.reactivestreams.Processor接口的直接實(shí)現(xiàn),因此可以很好的和別的Reactive Stream廠商一起工作。

記住:Processor即是Subscriber也是Publisher,因此你可以在想要的地方(source,processing,sink)將一個(gè)Processor插入到Reactive stream chain中。

注意:規(guī)范不推薦直接使用Processor.onNext(d)。

?

RingBuffer Processors

?基于RingBuffer的Reactive Stream Processor的優(yōu)點(diǎn)如下:

  高吞吐量

  重啟時(shí)不會(huì)丟掉沒有消費(fèi)的數(shù)據(jù),且從最近的沒有消費(fèi)的數(shù)據(jù)開始執(zhí)行

    若沒有Subscriber監(jiān)聽,數(shù)據(jù)不會(huì)丟失(不想Reactor-stream的Broadcaster會(huì)丟掉數(shù)據(jù))

    若在消息處理過程中取消Subscriber,信號將會(huì)安全的重新執(zhí)行,實(shí)際上它能在RingBufferProcessor上很好的工作。

  靈活的背壓,它允許任意時(shí)間內(nèi)有限數(shù)量的背壓,Subscriber會(huì)消費(fèi)掉并且請求更多的數(shù)據(jù)。

  傳播的背壓,因?yàn)樗且粋€(gè)Processor,它可以通過訂閱方式傳遞消息。

  多線程的出/入Processor。

事實(shí)上,RingBuffer*Process類似于典型的MicroMessageBroker!

?  它們的唯一缺點(diǎn)是它們在運(yùn)行時(shí)創(chuàng)建它們會(huì)消耗大量的資源,原因是它們不像它們的兄弟RingBufferDispatcher可以很容易的共享,這種特性使它們更適應(yīng)于高吞吐量的預(yù)定義數(shù)據(jù)管道。

RingBufferProcessor

?Reactor的RingBufferProcessor組件本質(zhì)上是Disruptor的RingBuffer,設(shè)計(jì)的目的是盡可能的和原生的效率一樣。使用場景是:你需要分發(fā)task到另外一個(gè)線程,且該線程具有低耗、高吞吐量還在你的工作流中管理背壓。

我使用RingBufferProcessor來計(jì)算遠(yuǎn)程異步調(diào)用的各種輸出:AMQP, SSD存儲(chǔ)和內(nèi)存存儲(chǔ),Process完全處理掉易變的延遲,每秒百萬級別的消息的數(shù)據(jù)源從來沒有阻塞過。 — 友好的Reactor使用者 RingBufferProcessor的使用場景

?圖7 在跟定時(shí)間T內(nèi),一個(gè)ringbufferprocessor,2個(gè)消費(fèi)同一個(gè)sequence的Subscriber。

你可以使用靜態(tài)工具方法去創(chuàng)建一個(gè)ringbufferprocessor:

?

Processor<Integer, Integer> p = RingBufferProcessor.create("test", 32); //1 Stream<Integer> s = Streams.wrap(p); //2s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //3 s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //4 s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //5input.subscribe(p); //5

1.創(chuàng)建一個(gè)Processor,讓它具有32個(gè)slot的內(nèi)部RingBuffer。

2. 從Reactive Streams Processor創(chuàng)建一個(gè)Reactor。

3. 每個(gè)請求調(diào)用consume方法在自己的線程內(nèi)創(chuàng)建一個(gè)Disruptor的EventProcessor。

4. 每個(gè)請求調(diào)用consume方法在自己的線程內(nèi)創(chuàng)建一個(gè)Disruptor的EventProcessor。

5. 每個(gè)請求調(diào)用consume方法在自己的線程內(nèi)創(chuàng)建一個(gè)Disruptor的EventProcessor。

6. 向一個(gè)Reactive Streams Publisher訂閱這個(gè)Processor。

傳遞到Processor的Subscribe.onNext(Buffer)方法的每個(gè)數(shù)據(jù)元素將廣播給所有的消費(fèi)者。這個(gè)Processor沒有使用輪詢分發(fā),因?yàn)樗赗ingBufferWorkProcess中,RingBufferWorkProcess下面將要討論。若傳遞1、2、3三個(gè)整數(shù)到Processor,可以看到控制臺(tái)輸出結(jié)果如下:

Thread[test-2,5,main] data=1 Thread[test-1,5,main] data=1 Thread[test-3,5,main] data=1 Thread[test-1,5,main] data=2 Thread[test-2,5,main] data=2 Thread[test-1,5,main] data=3 Thread[test-3,5,main] data=2 Thread[test-2,5,main] data=3 Thread[test-3,5,main] data=3

每個(gè)線程接收到傳給Process的所有數(shù)據(jù),每個(gè)線程順序獲得數(shù)據(jù),因?yàn)閮?nèi)部使用RingBuffer管理

slot來發(fā)布數(shù)據(jù)。

RingBufferWorkProcessor

?不像標(biāo)準(zhǔn)的RingBufferProcessor只廣播它的值給所有的消費(fèi)者,RingBufferWorkProcessor基于消費(fèi)者的多少來分發(fā)請求值。Processor接收信息,然后輪詢發(fā)送到不同的線程中(因?yàn)槊總€(gè)消費(fèi)者有自己獨(dú)立的線程),然而使用內(nèi)部RingBuffer來有效管理消息的發(fā)布。

我們構(gòu)造了一個(gè)可擴(kuò)展的、多種htp微服務(wù)器請求負(fù)載均衡的RingBufferWorkProcessor.說它看起來快過光速可能是我錯(cuò)了,另外gc的壓力完全可控。
— 使用RingBufferWorkProcessor的Reactor友好者

使用RingBufferWorkProcessor非常簡單,你只要改變上面示例代碼的引用到靜態(tài)的create方法創(chuàng)建。使用RingBufferWorkProcessor如下,其它的代碼時(shí)一樣的。

Processor<Integer, Integer> p = RingBufferWorkProcessor.create("test", 32);

創(chuàng)建一個(gè)具有32個(gè)slot的內(nèi)部RingBuffer的Processor。

  現(xiàn)在,發(fā)布消息到Processor時(shí),將不會(huì)廣播給每一個(gè)consumer,會(huì)根據(jù)消費(fèi)者的數(shù)目分發(fā)給不同的消費(fèi)者。運(yùn)行示例,結(jié)果如下:

Thread[test-2,5,main] data=3 Thread[test-3,5,main] data=2 Thread[test-1,5,main] data=1

  注意,RingBufferWorkProcessor會(huì)重復(fù)終端的信號、檢測正在停止工作的Subscriber的取消異常,最終會(huì)被別的Subscriber執(zhí)行一次。我們保證適合事件至少發(fā)送一次。若你理解這個(gè)語義,你可能會(huì)立即說“等等,RingBufferWorkProcessor怎么作為一個(gè)消息代理工作啦?” 答案是肯定的。

Codecs和Buffer

字節(jié)碼操作對大量數(shù)據(jù)管道配置的應(yīng)用是一個(gè)核心關(guān)注點(diǎn)。reactor-net廣泛使用字節(jié)碼操作來對接收的字節(jié)碼進(jìn)行編組和分組或者通過IO發(fā)送。

reactor.io.buffer.Buffer是java byteBuffer處理的一個(gè)裝飾器,增加了一些列的操作。目的是通過使用ByteBuffer的limit和讀取/覆蓋預(yù)先分配的字節(jié)來減少字節(jié)的復(fù)制。追蹤ByteBuffer的位置是開發(fā)人員口頭的問題,Buffer簡化了這些,我們只需要關(guān)注這個(gè)簡單的工具就可以了。

下面是一個(gè)簡單的Buffer操作示例:

import reactor.io.buffer.Buffer//... given: "an empty Buffer and a full Buffer"def buff = new Buffer()def fullBuff = Buffer.wrap("Hello World!")when: "a Buffer is appended"buff.append(fullBuff)then: "the Buffer was added"buff.position() == 12buff.flip().asString() == "Hello World!"

Buffer的一個(gè)有用的應(yīng)用是Buffer.View,多個(gè)操作例如split都會(huì)返回Buffer.View。它提供了一個(gè)無需拷貝的方式去掃描和檢索ByteBuffer的字節(jié)碼。Buffer.View同樣也是一種Buffer。

使用一個(gè)分隔符和Buffer.view使塊數(shù)據(jù)讀取可以復(fù)用同樣的字節(jié)碼

byte delimiter = (byte) ';'; byte innerDelimiter = (byte) ',';Buffer buffer = Buffer.wrap("a;b-1,b-2;c;d");List<Buffer.View> views = buffer.split(delimiter);int viewCount = views.size(); Assert.isTrue(viewCount == 4);for (Buffer.View view : views) {System.out.println(view.asString()); //prints "a" then "b-1,b-2", then "c" and finally "d"if(view.indexOf(innerDelimiter) != -1){for(Buffer.View innerView : view.split(innerDelimiter)){System.out.println(innerView.asString()); //prints "b-1" and "b-2" }} }

使用Buffer應(yīng)用到普通的分組和編組對開發(fā)者來說可能顯得不夠高級,Reactor提供了一系列名稱為Codec的預(yù)定義的轉(zhuǎn)換器。一些Codec需要在classpath路徑下添加一些額外的依賴,如json操作的Jackson依賴。

codec以兩種方式工作:第一,繼承Function去直接編碼并返回編碼好的數(shù)據(jù),通常以Buffer的形式返回。這非常棒,但僅限于與無狀態(tài)的Codec才能起效,另外一個(gè)可選的方法是使用Codec.encoder來返回編碼函數(shù)。

Codec.encoder()對比Codec.apply(Source) Codec.encoder() 返回一個(gè)唯一的編碼函數(shù),這個(gè)編碼函數(shù)不能被不同線程共享。Codec.apply(Source) 直接編碼(并保存分配的編碼器), 但Codec本身可以在線程間共享。

對大部分實(shí)現(xiàn)了Buffer的codec來說,Codec同樣也可以根據(jù)source類型去解碼數(shù)據(jù)。

解碼數(shù)據(jù)源,需要使用Codec.decoder()獲取解碼函數(shù)。和編碼不同的是,沒有為編碼目的而重寫的快捷方法。和編碼相同的是,解碼函數(shù)不能在線程間共享。

有兩種形式的Code.decoder()函數(shù),Codec.decoder()是一個(gè)阻塞的解碼函數(shù),它直接從傳遞源數(shù)據(jù)解碼返回解碼后的數(shù)據(jù)。Codec.decoder(Consumer)用作非阻塞的解碼,它返回null,一旦解碼只觸發(fā)的Consumer,它可以和其它異步工具結(jié)合使用。

使用一個(gè)預(yù)定義的codec示例如下:

import reactor.io.json.JsonCodec//... given: 'A JSON codec'def codec = new JsonCodec<Map<String, Object>, Object>(Map);def latch = new CountDownLatch(1)when: 'The decoder is passed some JSON'Map<String, Object> decoded;def callbackDecoder = codec.decoder{decoded = itlatch.countDown()}def blockingDecoder = codec.decoder()//yes this is real simple async strategy, but that's not the point here :) Thread.start{callbackDecoder.apply(Buffer.wrap("{\"a\": \"alpha\"}"))}def decodedMap = blockingDecoder.apply(Buffer.wrap("{\"a\": \"beta\"}")then: 'The decoded maps have the expected entries'latch.await()decoded.size() == 1decoded['a'] == 'alpha'decodedMap['a'] == 'beta'

可用的核心Codec

名稱描述需要的依賴

ByteArrayCodec

Wrap/unwrap byte arrays from/to Buffer.

N/A

DelimitedCodec

Split/Aggregate Buffer and delegate to the passed Codec for unit marshalling.

N/A

FrameCodec

Split/Aggregate Buffer into?Frame?Buffers according to successive prefix lengths.

N/A

JavaSerializationCodec

Deserialize/Serialize Buffers using Java Serialization.

N/A

PassThroughCodec

Leave the Buffers untouched.

N/A

StringCodec

Convert String to/from Buffer

N/A

LengthFieldCodec

Find the length and decode/encode the appropriate number of bytes into/from Buffer

N/A

KryoCodec

Convert Buffer into Java objects using Kryo with Buffers

com.esotericsoftware.kryo:kryo

JsonCodec,JacksonJsonCodec

Convert Buffer into Java objects using Jackson with Buffers

com.fasterxml.jackson.core:jackson-databind

SnappyCodec

A Compression Codec which applies a delegate Codec after unpacking/before packing Buffer

org.xerial.snappy:snappy-java

GZipCodec

A Compression Codec which applies a delegate Codec after unpacking/before packing Buffer

N/A

?

參考文獻(xiàn):

1.?http://baike.baidu.com/link?url=kXnm3flViIx-4E7PxZtYVgb3xY5tlwovUqog2u_TgCCiN7FSFkxt7ze-Qio5j1FXPmIz2DGV2_lbOBoLeyXdaa

2.?http://projectreactor.io/docs/reference/

轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/4599644.html

總結(jié)

以上是生活随笔為你收集整理的reactor官方文档译文(2)Reactor-core模块的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。