reactor官方文档译文(2)Reactor-core模块
名稱解釋:back pressure:背壓。在交換機(jī)在阻止外來數(shù)據(jù)包發(fā)送到堵塞端口的時(shí)候可能會(huì)發(fā)生丟包。而背壓就是考驗(yàn)交換機(jī)在這個(gè)時(shí)候避免丟包的能力。很多的交換機(jī)當(dāng)發(fā)送或接收緩沖區(qū)溢出的時(shí)候通過將阻塞信號發(fā)送回源地址來實(shí)現(xiàn)背壓。交換機(jī)在全雙工時(shí)使用IEEE802.3x流控制達(dá)到同樣目的。
首先,我們使用groovy示例來展示core模塊的功能:
//Initialize context and get default dispatcher Environment.initialize()//RingBufferDispatcher with 8192 slots by default def dispatcher = Environment.sharedDispatcher()//Create a callback Consumer<Integer> c = { data ->println "some data arrived: $data" }//Create an error callback Consumer<Throwable> errorHandler = { it.printStackTrace }//Dispatch data asynchronously r.dispatch(1234, c, errorHandler)Environment.terminate()下面,我們使用Stream reactive實(shí)現(xiàn)來看:
//standalone async processor def processor = RingBufferProcessor.<Integer>create()//send data, will be kept safe until a subscriber attaches to the processor processor.onNext(1234) processor.onNext(5678)//consume integer data processor.subscribe(new Subscriber<Integer>(){void onSubscribe(Subscription s){//unbounded subscriber s.request Long.MAX}void onNext(Integer data){println data}void onError(Throwable err){err.printStackTrace()}void onComplete(){println 'done!'} }//Shutdown internal thread and call complete processor.onComplete()Core模塊概覽
Reactor core模塊的子單元:
Common IO和功能類型,一些是直接從java8 功能接口回遷的。Function、Supplier、consumer、Predicate、BiConsumer、BiFunctionTuplesResource、Pausable、TimerBuffer,Codec和一組預(yù)定義的Codec。Environment 上下文Dispatcher 協(xié)議和一組預(yù)定義的Dispatcher。預(yù)定義的Reactive Stream Processorreactor-core可以用來逐漸替代另外的消息傳遞策略、調(diào)度時(shí)間任務(wù)或者以小的功能塊組織代碼。這種突破使開發(fā)者與其它Reactive基礎(chǔ)庫更好的合作,特別是對于沒有耐心的開發(fā)者,沒有了對RingBuffer的理解負(fù)擔(dān)。
注意:Reactor-core隱藏了LMAX disruptor,因此不會(huì)出現(xiàn)也不會(huì)和現(xiàn)有的Disruptor依賴沖突。
功能模塊
?功能模塊重用是核心,通常情況下在你使用Reactor時(shí)就需要的功能。因此,功能編程酷在哪里?其中一個(gè)核心理念是將可執(zhí)行代碼看做別的數(shù)據(jù)。另一點(diǎn),類似于Closure或者匿名函數(shù),此時(shí)業(yè)務(wù)邏輯由最初的調(diào)用者決定。它同樣避免了過量的If/SWITCH模塊,并且這種分離是概念更清晰:每個(gè)模塊完成一個(gè)功能且不需要共享任何東西。
?組織功能模塊
每個(gè)功能組件都給出它的一般任務(wù)的明確意圖:
Consumer:簡單回調(diào)--一勞永逸的
BiCounsumer:兩個(gè)參數(shù)的簡單回調(diào),通常用在序列比較,例如:前一個(gè)和下一個(gè)參數(shù)。
Function:轉(zhuǎn)換邏輯--請求/應(yīng)答
BiFunction:兩個(gè)參數(shù)的轉(zhuǎn)換,通常用在累加器,比較前一個(gè)和下一個(gè)參數(shù),返回一個(gè)新的值。
Supplier:工廠邏輯--輪詢
Predicate:測試路徑--過濾
注意:我們也將Publisher和Subscriber視作功能塊,敢于稱之為Reactive功能塊。盡管如此,它們作為基礎(chǔ)組件,廣泛應(yīng)用到Reactor及其其它地方。Stream API接收reactor.fn參數(shù),為你創(chuàng)建合適的Subscriber。
好消息是在功能模塊中包裝可執(zhí)行指令可以向磚塊一樣進(jìn)行復(fù)用。
Consumer<String> consumer = new Consumer<String>(){@Overridevoid accept(String value){System.out.println(value);} };//Now in Java 8 style for brievety Function<Integer, String> transformation = integer -> ""+integer;Supplier<Integer> supplier = () -> 123;BiConsumer<Consumer<String>, String> biConsumer = (callback, value) -> {for(int i = 0; i < 10; i++){//lazy evaluate the final logic to run callback.accept(value);} };//note how the execution flows from supplier to biconsumer biConsumer.accept(consumer,transformation.apply(supplier.get()) );?最初聽起來,這可能不是一個(gè)引人注目的革命性變革。但是這種基本思維模式的改變,將揭示我們使異步代碼變的穩(wěn)健和可組合性的使命是多么可貴。Dispatcher分發(fā)器將輸入數(shù)據(jù)和錯(cuò)誤回調(diào)分發(fā)給consumer來處理。Reactor Stream模塊將更好的使用這些組件。
當(dāng)使用Ioc容器如spring時(shí),一個(gè)好的開發(fā)者將利用Java的配置屬性來返回一個(gè)無狀態(tài)的功能bean。然后可以優(yōu)美的注入到stream Pipeline或者分發(fā)他們的執(zhí)行代碼中的block中。
元組
你可以注意到這些接口,它們對輸入?yún)?shù)和比較少的固定數(shù)量的參數(shù)的泛型有很好的支持。你怎么傳遞超過1個(gè)或者超過2個(gè)的參數(shù)呢?答案是使用元組Tuple,Tuple類似于csv中一個(gè)單獨(dú)實(shí)例的一樣,可以在在功能性編程中保證它們的類型安全和支持多個(gè)數(shù)量的參數(shù)。
以前面的例子為例,我們嘗試提供兩個(gè)參數(shù)的BiConsumer而使用單個(gè)參數(shù)的Consumer
Consumer<Tuple2<Consumer<String>, String>> biConsumer = tuple -> {for(int i = 0; i < 10; i++){//Correct typing, compiler happy tuple.getT1().accept(tuple.getT2());} };biConsumer.accept(Tuple.of(consumer,transformation.apply(supplier.get())) );注意:Tuple需要分配更多的空間,因此在比較或者鍵值信號等一般使用場景中更多直接使用Bi***組件。
Environment和Dispatcher
功能性構(gòu)建塊已經(jīng)準(zhǔn)備就緒,讓我們使用它們來進(jìn)行異步編程。第一步是到Dispatcher分區(qū)。
在我們啟動(dòng)任意Dispatcher前,需要保證可以有效的創(chuàng)建它們。通常,創(chuàng)建它們的代價(jià)比較高,原因是需要預(yù)分配一個(gè)內(nèi)存分區(qū)來保持分配的信號,這就是前言中介紹的著名的運(yùn)行時(shí)分配和啟動(dòng)時(shí)預(yù)分配的不同對比。因此提出了一個(gè)名為"Environment"共享上下文概念,使用它來管理這些不同類型的Dispatcher,從而避免不必要的創(chuàng)建開銷。
Environment
reactor的使用者(或者可用的擴(kuò)展庫如@Spring)創(chuàng)建或者停止Environment。它們自動(dòng)從META_INF/reactor/reactor-environment.properties處讀取配置文件。
注意,屬性文件可以改變,通過在classpath下的META-INFO/reactor目錄下一個(gè)新的屬性配置可以改變屬性文件。
? ? ? ?通過傳遞下面的環(huán)境變量reactor.profiles.active來在運(yùn)行時(shí)段改變默認(rèn)的配置文件。
java - jar reactor-app.jar -Dreactor.profiles.active=turbo啟動(dòng)和停止Environment
Environment env = Environment.initialize();//Current registered environment is the same than the one initialized Assert.isTrue(Environment.get() == env);//Find a dispatcher named "shared" Dispatcher d = Environment.dispatcher("shared");//get the Timer bound to this environment Timer timer = Environment.timer();//Shutdown registered Dispatchers and Timers that might run non-daemon threads Environment.terminate(); //An option could be to register a shutdownHook to automatically invoke terminate.注意:在一個(gè)給定的Jvm應(yīng)用中,最好只維護(hù)一個(gè)Enviroment.在大多數(shù)情況下,使用Environment.initializeIfEmpty()就完全ok。
Dispacher分發(fā)器
從Reactor 1開始,Dispatcher就存在了。Dispatcher通常抽象消息傳遞的方法,和Java Executor有類似的通用約定。事實(shí)上Dispatcher繼承自Executor。
Dispatcher對有數(shù)據(jù)信號的傳送方式及消費(fèi)者同步或異步執(zhí)行的錯(cuò)誤信息有一套比較嚴(yán)格的類型限制約定。這種方式在面對經(jīng)典的Executors時(shí)解決了第一個(gè)問題--錯(cuò)誤隔離。效果如下:
錯(cuò)誤消費(fèi)者的調(diào)用不需要終端當(dāng)前分配的資源。如果沒有指定,它默認(rèn)從當(dāng)前存在的Environment中去尋找,并使用指定給它的errorJournalConsumer。
異步Dispatche提供的第二個(gè)獨(dú)特的特征是運(yùn)行使用尾部遞歸策略來再次調(diào)度。尾部遞歸的應(yīng)用場景是分發(fā)器發(fā)現(xiàn)Dispatcher的classLoader已經(jīng)分配到正在運(yùn)行的線程,這時(shí),當(dāng)當(dāng)前消費(fèi)者返回時(shí)將要執(zhí)行的task放入到隊(duì)列中。
使用一個(gè)類似于?Groovy Spock test的異步的多線程分發(fā)器:
import reactor.core.dispatch.*//... given:def sameThread = new SynchronousDispatcher()def diffThread = new ThreadPoolExecutorDispatcher(1, 128)def currentThread = Thread.currentThread()Thread taskThread = nulldef consumer = { ev ->taskThread = Thread.currentThread()}def errorConsumer = { error ->error.printStackTrace()}when: "a task is submitted"sameThread.dispatch('test', consumer, errorConsumer)then: "the task thread should be the current thread"currentThread == taskThreadwhen: "a task is submitted to the thread pool dispatcher"def latch = new CountDownLatch(1)diffThread.dispatch('test', { ev -> consumer(ev); latch.countDown() }, errorConsumer)latch.await(5, TimeUnit.SECONDS) // Wait for task to execute then: "the task thread should be different when the current thread"taskThread != currentThread注意:
如Java Executor一樣,它們?nèi)鄙倭宋覀儗⒓尤氲絉eactor 2.x的一個(gè)特點(diǎn):Reactive stream協(xié)議。這時(shí)在Reactor中僅有幾個(gè)未完成事項(xiàng)中的一個(gè)未完成事項(xiàng)--沒有將Reactive stream標(biāo)準(zhǔn)直接綁定到Reactor中。然后,你可以在Stream章節(jié)部分找到快速結(jié)合Reactor stream的方法。
表3 Dispatcher家族介紹
| RingBuffer | sharedDispatcher() | An LMAX DisruptorRingBuffer based Dispatcher. | Small latency peaks tolerated Fastest Async Dispatcher, 10-15M+ dispatch/sec on commodity hardware Support ordering | 'Spin' Loop when getting the next slot on full capcity Single Threaded, no concurrent dispatch |
| Mpsc | sharedDispatcher() if Unsafe not available | Alternative optimized message-passing structure. | Latency peaks tolerated 5-10M+ dispatch/sec on commodity hardware Support ordering | Unbounded and possibly using as much available heap memory as possible Single Threaded, no concurrent dispatch |
| WorkQueue | workDispatcher() | An LMAX DisruptorRingBuffer based Dispatcher. | Latency Peak tolerated for a limited time Fastest Multi-Threaded Dispatcher, 5-10M+ dispatch/sec on commodity hardware | 'Spin' Loop when getting the next slot on full capcity Concurrent dispatch Doesn’t support ordering |
| Synchronous | dispatcher("sync") or SynchronousDispatcher. INSTANCE | Runs on the current thread. | Upstream and Consumer executions are colocated Useful for Test support Support ordering if the reentrant dispatch is on the current thread | No Tail Recursion support Blocking |
| TailRecurse | tailRecurse() or TailRecurse Dispatcher. INSTANCE | Synchronous Reentrant Dispatcher that enqueue dispatches when currently dispatching. | Upstream and Consumer executions are colocated Reduce execution stack, greatly expanded by functional call chains | Unbounded Tail Recurse depth Blocking Support ordering (Thread Stealing) |
| ThreadPoolExecutor | newDispatcher(int, int, DispatcherType. THREAD_POOL_EXECUTOR) | Use underlying ThreadPoolExecutor message-passing | Multi-Threaded Blocking Consumers, permanent latency tolerated 1-5M+ dispatch/sec on commodity hardware | Concurrent run on a given consumer executed twice or more Unbounded by default Doesn’t support ordering |
| Traceable Delegating | N/A | Decorate an existing dispatcher with TRACE level logs. | Dispatch tapping Runs slower than the delegated dispatcher alone | Log overhead (runtime, disk) |
?
?
DispatcherSupplier
? 你可能已經(jīng)注意到了,一些Dispatcher事單線程的,特別是RingBufferDispatcher和MpsDispatcher。更進(jìn)一步,根據(jù)Reactive Stream規(guī)范,Subscriber/Processor的實(shí)現(xiàn)是不允許并發(fā)通知的。這一點(diǎn)尤其對Reactor Streams產(chǎn)生了影響,使用Stream.dispachOn(Dispatcher)和一個(gè)Dispatcher來給并發(fā)信號的顯示失敗留后門。
然后,有一個(gè)方法來避免這個(gè)缺點(diǎn),使用Dispatcher池DispatcherSupplier。實(shí)際上,作為Supplier的工廠,Supplier.get()方法根據(jù)有趣的共享策略:輪詢、最少使用。。等間接提供一個(gè)Dispatcher。
Enviroment提供了一個(gè)靜態(tài)方法去創(chuàng)建、并注冊到當(dāng)前活躍Environment的Dispatcher池:一組輪詢的返回Dispatcher。一旦就緒,Supplier提供對Dispatcher數(shù)目的控制。
不同于一般的Dispatcher,Environment提供了一站式的管理服務(wù):
Environment.initialize(); //....//Create an anonymous pool of 2 dispatchers with automatic default settings (same type than default dispatcher, default backlog size...) DispatcherSupplier supplier = Environment.newCachedDispatchers(2);Dispatcher d1 = supplier.get(); Dispatcher d2 = supplier.get(); Dispatcher d3 = supplier.get(); Dispatcher d4 = supplier.get();Assert.isTrue( d1 == d3 && d2 == d4); supplier.shutdown();//Create and register a new pool of 3 dispatchers DispatcherSupplier supplier1 = Environment.newCachedDispatchers(3, "myPool"); DispatcherSupplier supplier2 = Environment.cachedDispatchers("myPool");Assert.isTrue( supplier1 == supplier2 ); supplier1.shutdown();Timer定時(shí)器
Dispatcher盡可能快的計(jì)算接收的任務(wù),然而,Timer定時(shí)器提供一次性或者周期性的調(diào)度API。Reactor Core模塊默認(rèn)提供了一個(gè)HashWheelTimer定時(shí)器,它自動(dòng)綁定到任意的新的Environment中。HashWheelTimer對處理大量的、并發(fā)的、內(nèi)存調(diào)度任務(wù)有巨大的優(yōu)勢,它是替換java TaskScheduler的一個(gè)強(qiáng)大的選項(xiàng)。
注意:它不是一個(gè)持久化的調(diào)度器,應(yīng)用關(guān)閉時(shí)task將會(huì)丟失。下個(gè)正式版本Timer定時(shí)器將會(huì)有一些改變,例如使用redis增加持久化/共享,請關(guān)注。
? 創(chuàng)建一個(gè)簡單的定時(shí)器:
import reactor.fn.timer.Timer//... given: "a new timer"Environment.initializeIfEmpty()Timer timer = Environment.timer()def latch = new CountDownLatch(10)when: "a task is submitted"timer.schedule({ Long now -> latch.countDown() } as Consumer<Long>,period,TimeUnit.MILLISECONDS)then: "the latch was counted down"latch.await(1, TimeUnit.SECONDS)timer.cancel()Environment.terminate()核心Processor
核心Processor用來做比Dispatcher更集中的job:支持背壓計(jì)算異步task。
提供了org.reactivestreams.Processor接口的直接實(shí)現(xiàn),因此可以很好的和別的Reactive Stream廠商一起工作。
記住:Processor即是Subscriber也是Publisher,因此你可以在想要的地方(source,processing,sink)將一個(gè)Processor插入到Reactive stream chain中。
注意:規(guī)范不推薦直接使用Processor.onNext(d)。
?
RingBuffer Processors
?基于RingBuffer的Reactive Stream Processor的優(yōu)點(diǎn)如下:
高吞吐量
重啟時(shí)不會(huì)丟掉沒有消費(fèi)的數(shù)據(jù),且從最近的沒有消費(fèi)的數(shù)據(jù)開始執(zhí)行
若沒有Subscriber監(jiān)聽,數(shù)據(jù)不會(huì)丟失(不想Reactor-stream的Broadcaster會(huì)丟掉數(shù)據(jù))
若在消息處理過程中取消Subscriber,信號將會(huì)安全的重新執(zhí)行,實(shí)際上它能在RingBufferProcessor上很好的工作。
靈活的背壓,它允許任意時(shí)間內(nèi)有限數(shù)量的背壓,Subscriber會(huì)消費(fèi)掉并且請求更多的數(shù)據(jù)。
傳播的背壓,因?yàn)樗且粋€(gè)Processor,它可以通過訂閱方式傳遞消息。
多線程的出/入Processor。
事實(shí)上,RingBuffer*Process類似于典型的MicroMessageBroker!
? 它們的唯一缺點(diǎn)是它們在運(yùn)行時(shí)創(chuàng)建它們會(huì)消耗大量的資源,原因是它們不像它們的兄弟RingBufferDispatcher可以很容易的共享,這種特性使它們更適應(yīng)于高吞吐量的預(yù)定義數(shù)據(jù)管道。
RingBufferProcessor
?Reactor的RingBufferProcessor組件本質(zhì)上是Disruptor的RingBuffer,設(shè)計(jì)的目的是盡可能的和原生的效率一樣。使用場景是:你需要分發(fā)task到另外一個(gè)線程,且該線程具有低耗、高吞吐量還在你的工作流中管理背壓。
我使用RingBufferProcessor來計(jì)算遠(yuǎn)程異步調(diào)用的各種輸出:AMQP, SSD存儲(chǔ)和內(nèi)存存儲(chǔ),Process完全處理掉易變的延遲,每秒百萬級別的消息的數(shù)據(jù)源從來沒有阻塞過。 — 友好的Reactor使用者 RingBufferProcessor的使用場景?圖7 在跟定時(shí)間T內(nèi),一個(gè)ringbufferprocessor,2個(gè)消費(fèi)同一個(gè)sequence的Subscriber。
你可以使用靜態(tài)工具方法去創(chuàng)建一個(gè)ringbufferprocessor:
?
Processor<Integer, Integer> p = RingBufferProcessor.create("test", 32); //1 Stream<Integer> s = Streams.wrap(p); //2s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //3 s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //4 s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); //5input.subscribe(p); //51.創(chuàng)建一個(gè)Processor,讓它具有32個(gè)slot的內(nèi)部RingBuffer。
2. 從Reactive Streams Processor創(chuàng)建一個(gè)Reactor。
3. 每個(gè)請求調(diào)用consume方法在自己的線程內(nèi)創(chuàng)建一個(gè)Disruptor的EventProcessor。
4. 每個(gè)請求調(diào)用consume方法在自己的線程內(nèi)創(chuàng)建一個(gè)Disruptor的EventProcessor。
5. 每個(gè)請求調(diào)用consume方法在自己的線程內(nèi)創(chuàng)建一個(gè)Disruptor的EventProcessor。
6. 向一個(gè)Reactive Streams Publisher訂閱這個(gè)Processor。
傳遞到Processor的Subscribe.onNext(Buffer)方法的每個(gè)數(shù)據(jù)元素將廣播給所有的消費(fèi)者。這個(gè)Processor沒有使用輪詢分發(fā),因?yàn)樗赗ingBufferWorkProcess中,RingBufferWorkProcess下面將要討論。若傳遞1、2、3三個(gè)整數(shù)到Processor,可以看到控制臺(tái)輸出結(jié)果如下:
Thread[test-2,5,main] data=1 Thread[test-1,5,main] data=1 Thread[test-3,5,main] data=1 Thread[test-1,5,main] data=2 Thread[test-2,5,main] data=2 Thread[test-1,5,main] data=3 Thread[test-3,5,main] data=2 Thread[test-2,5,main] data=3 Thread[test-3,5,main] data=3每個(gè)線程接收到傳給Process的所有數(shù)據(jù),每個(gè)線程順序獲得數(shù)據(jù),因?yàn)閮?nèi)部使用RingBuffer管理
slot來發(fā)布數(shù)據(jù)。
RingBufferWorkProcessor
?不像標(biāo)準(zhǔn)的RingBufferProcessor只廣播它的值給所有的消費(fèi)者,RingBufferWorkProcessor基于消費(fèi)者的多少來分發(fā)請求值。Processor接收信息,然后輪詢發(fā)送到不同的線程中(因?yàn)槊總€(gè)消費(fèi)者有自己獨(dú)立的線程),然而使用內(nèi)部RingBuffer來有效管理消息的發(fā)布。
我們構(gòu)造了一個(gè)可擴(kuò)展的、多種htp微服務(wù)器請求負(fù)載均衡的RingBufferWorkProcessor.說它看起來快過光速可能是我錯(cuò)了,另外gc的壓力完全可控。— 使用RingBufferWorkProcessor的Reactor友好者
使用RingBufferWorkProcessor非常簡單,你只要改變上面示例代碼的引用到靜態(tài)的create方法創(chuàng)建。使用RingBufferWorkProcessor如下,其它的代碼時(shí)一樣的。
Processor<Integer, Integer> p = RingBufferWorkProcessor.create("test", 32);
創(chuàng)建一個(gè)具有32個(gè)slot的內(nèi)部RingBuffer的Processor。
現(xiàn)在,發(fā)布消息到Processor時(shí),將不會(huì)廣播給每一個(gè)consumer,會(huì)根據(jù)消費(fèi)者的數(shù)目分發(fā)給不同的消費(fèi)者。運(yùn)行示例,結(jié)果如下:
Thread[test-2,5,main] data=3 Thread[test-3,5,main] data=2 Thread[test-1,5,main] data=1注意,RingBufferWorkProcessor會(huì)重復(fù)終端的信號、檢測正在停止工作的Subscriber的取消異常,最終會(huì)被別的Subscriber執(zhí)行一次。我們保證適合事件至少發(fā)送一次。若你理解這個(gè)語義,你可能會(huì)立即說“等等,RingBufferWorkProcessor怎么作為一個(gè)消息代理工作啦?” 答案是肯定的。
Codecs和Buffer
字節(jié)碼操作對大量數(shù)據(jù)管道配置的應(yīng)用是一個(gè)核心關(guān)注點(diǎn)。reactor-net廣泛使用字節(jié)碼操作來對接收的字節(jié)碼進(jìn)行編組和分組或者通過IO發(fā)送。
reactor.io.buffer.Buffer是java byteBuffer處理的一個(gè)裝飾器,增加了一些列的操作。目的是通過使用ByteBuffer的limit和讀取/覆蓋預(yù)先分配的字節(jié)來減少字節(jié)的復(fù)制。追蹤ByteBuffer的位置是開發(fā)人員口頭的問題,Buffer簡化了這些,我們只需要關(guān)注這個(gè)簡單的工具就可以了。
下面是一個(gè)簡單的Buffer操作示例:
import reactor.io.buffer.Buffer//... given: "an empty Buffer and a full Buffer"def buff = new Buffer()def fullBuff = Buffer.wrap("Hello World!")when: "a Buffer is appended"buff.append(fullBuff)then: "the Buffer was added"buff.position() == 12buff.flip().asString() == "Hello World!"Buffer的一個(gè)有用的應(yīng)用是Buffer.View,多個(gè)操作例如split都會(huì)返回Buffer.View。它提供了一個(gè)無需拷貝的方式去掃描和檢索ByteBuffer的字節(jié)碼。Buffer.View同樣也是一種Buffer。
使用一個(gè)分隔符和Buffer.view使塊數(shù)據(jù)讀取可以復(fù)用同樣的字節(jié)碼
byte delimiter = (byte) ';'; byte innerDelimiter = (byte) ',';Buffer buffer = Buffer.wrap("a;b-1,b-2;c;d");List<Buffer.View> views = buffer.split(delimiter);int viewCount = views.size(); Assert.isTrue(viewCount == 4);for (Buffer.View view : views) {System.out.println(view.asString()); //prints "a" then "b-1,b-2", then "c" and finally "d"if(view.indexOf(innerDelimiter) != -1){for(Buffer.View innerView : view.split(innerDelimiter)){System.out.println(innerView.asString()); //prints "b-1" and "b-2" }} }使用Buffer應(yīng)用到普通的分組和編組對開發(fā)者來說可能顯得不夠高級,Reactor提供了一系列名稱為Codec的預(yù)定義的轉(zhuǎn)換器。一些Codec需要在classpath路徑下添加一些額外的依賴,如json操作的Jackson依賴。
codec以兩種方式工作:第一,繼承Function去直接編碼并返回編碼好的數(shù)據(jù),通常以Buffer的形式返回。這非常棒,但僅限于與無狀態(tài)的Codec才能起效,另外一個(gè)可選的方法是使用Codec.encoder來返回編碼函數(shù)。
Codec.encoder()對比Codec.apply(Source) Codec.encoder() 返回一個(gè)唯一的編碼函數(shù),這個(gè)編碼函數(shù)不能被不同線程共享。Codec.apply(Source) 直接編碼(并保存分配的編碼器), 但Codec本身可以在線程間共享。對大部分實(shí)現(xiàn)了Buffer的codec來說,Codec同樣也可以根據(jù)source類型去解碼數(shù)據(jù)。
解碼數(shù)據(jù)源,需要使用Codec.decoder()獲取解碼函數(shù)。和編碼不同的是,沒有為編碼目的而重寫的快捷方法。和編碼相同的是,解碼函數(shù)不能在線程間共享。
有兩種形式的Code.decoder()函數(shù),Codec.decoder()是一個(gè)阻塞的解碼函數(shù),它直接從傳遞源數(shù)據(jù)解碼返回解碼后的數(shù)據(jù)。Codec.decoder(Consumer)用作非阻塞的解碼,它返回null,一旦解碼只觸發(fā)的Consumer,它可以和其它異步工具結(jié)合使用。
使用一個(gè)預(yù)定義的codec示例如下:
import reactor.io.json.JsonCodec//... given: 'A JSON codec'def codec = new JsonCodec<Map<String, Object>, Object>(Map);def latch = new CountDownLatch(1)when: 'The decoder is passed some JSON'Map<String, Object> decoded;def callbackDecoder = codec.decoder{decoded = itlatch.countDown()}def blockingDecoder = codec.decoder()//yes this is real simple async strategy, but that's not the point here :) Thread.start{callbackDecoder.apply(Buffer.wrap("{\"a\": \"alpha\"}"))}def decodedMap = blockingDecoder.apply(Buffer.wrap("{\"a\": \"beta\"}")then: 'The decoded maps have the expected entries'latch.await()decoded.size() == 1decoded['a'] == 'alpha'decodedMap['a'] == 'beta'可用的核心Codec
| ByteArrayCodec | Wrap/unwrap byte arrays from/to Buffer. | N/A |
| DelimitedCodec | Split/Aggregate Buffer and delegate to the passed Codec for unit marshalling. | N/A |
| FrameCodec | Split/Aggregate Buffer into?Frame?Buffers according to successive prefix lengths. | N/A |
| JavaSerializationCodec | Deserialize/Serialize Buffers using Java Serialization. | N/A |
| PassThroughCodec | Leave the Buffers untouched. | N/A |
| StringCodec | Convert String to/from Buffer | N/A |
| LengthFieldCodec | Find the length and decode/encode the appropriate number of bytes into/from Buffer | N/A |
| KryoCodec | Convert Buffer into Java objects using Kryo with Buffers | com.esotericsoftware.kryo:kryo |
| JsonCodec,JacksonJsonCodec | Convert Buffer into Java objects using Jackson with Buffers | com.fasterxml.jackson.core:jackson-databind |
| SnappyCodec | A Compression Codec which applies a delegate Codec after unpacking/before packing Buffer | org.xerial.snappy:snappy-java |
| GZipCodec | A Compression Codec which applies a delegate Codec after unpacking/before packing Buffer | N/A |
?
參考文獻(xiàn):
1.?http://baike.baidu.com/link?url=kXnm3flViIx-4E7PxZtYVgb3xY5tlwovUqog2u_TgCCiN7FSFkxt7ze-Qio5j1FXPmIz2DGV2_lbOBoLeyXdaa
2.?http://projectreactor.io/docs/reference/
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/4599644.html
總結(jié)
以上是生活随笔為你收集整理的reactor官方文档译文(2)Reactor-core模块的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: reactor官方文档译文(1)Reac
- 下一篇: spring jdbcTemplate使