日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

disruptor入门

發布時間:2024/1/17 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 disruptor入门 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、Disruptor的簡介

  Disruptor是由LAMX(歐洲頂級金融公司)設計和開源的大規模、高并發、低延遲的異步處理框架,也可以說他
是最快的消息框架(JMS)。整個業務邏輯處理器完全運行在內存中,其LMAX架構可以達到一個線程里每秒處理6百萬
流水,用1微秒的延遲可以獲得100K+吞吐量的爆炸性能。非常適合那種實時性高、延遲率低、業務流水量大的應用場
景,比如銀行的實時交易處理、讀寫操作分離、數據緩存等。
  Disruptor是基于生產者-消費者模型,實現了"隊列“功能的無鎖高并發框架。他可以做到一個生產者對應多個消
費者且消費者之間可以并行的處理,也可以形成先后順序的處理。Disruptor本質上解決的就是在兩個獨立的處理過
程之間交換數據。Disruptor框架的一些核心類有:
 1.Disruptor:用于控制整個消費者-生產者模型的處理器
   2.RingBuffer:用于存放數據
   3.EventHandler:一個用于處理事件的接口(可以當做生產者,也可以當做消費者)。
   4.EventFactory:事件工廠類。
   5.WaitStrategy:用于實現事件處理等待RingBuffer游標策略的接口。
   6.SequeueBarrier:隊列屏障,用于處理訪問RingBuffer的序列。
   7.用于運行disruptor的線程或者線程池。

二、Disruptor的入門

  Disruptor的編寫一般可以分為以下幾步:
    (1)定義事件;
    (2)定義事件工廠;
    (3)消費者–定義事件處理的具體實現;
    (4)定義用于事件處理(消費者)的線程池;
    (5)指定等待策略:
      Disruptor 提供了多個WaitStrategy的實現,例如:BlockingWaitStrategy、SleepingWaitStrategy、
YieldingWaitStrategy等:
      BlockingWaitStrategy是最低效的策略,但其對CPU的消耗最小并且在各種不同部署環境中能提供
更加一致的性能表現;
      SleepingWaitStrategy 的性能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其
對生產者線程的影響最小,適合用于異步日志類似的場景;
      YieldingWaitStrategy 的性能是最好的,適合用于低延遲的系統。在要求極高性能且事件處理線
數小于 CPU 邏輯核心數的場景中,推薦使用此策略;例如,CPU開啟超線程的特性。
      WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
      WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
      WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
    (6)生產(發布)消息;
    (7)關閉disruptor業務邏輯處理器;
  Disruptor的一些核心概念有:
  ? - Ring Buffer(環形緩沖區) :
    曾經RingBuffer是Disruptor中的最主要的對象,但從3.0版本開始,其職責被簡化為僅僅負責對通過
Disruptor進行交換的數據(事件)進行存儲和更新。在一些更高級的應用場景中,Ring Buffer 可以由用戶的自定
義實現來完全替代。
  ? - Sequence Disruptor :
    通過順序遞增的序號來編號管理。通過其進行交換的數據(事件),對數據(事件)的處理過程總是沿著序
號逐個遞增處理。一個Sequence用于跟蹤標識某個特定的事件處理者( RingBuffer/Consumer )的處理進度。雖然一
個AtomicLong也可以用于標識進度,但定義Sequence來負責該問題還有另一個目的,那就是防止不同的 Sequence之間
的CPU緩存偽共享(Flase Sharing)問題。
  ? - Sequencer :
    Sequencer是Disruptor的真正核心。此接口有兩個實現類SingleProducerSequencer、MultiProducerSequencer
,它們定義在生產者和消費者之間快速、正確地傳遞數據的并發算法。
  ? - Sequence Barrier
    用于保持對RingBuffer的 main published Sequence 和Consumer依賴的其它Consumer的 Sequence 的引用。
Sequence Barrier 還定義了決定Consumer是否還有可處理的事件的邏輯。
  ? - Wait Strategy
    定義 Consumer 如何進行等待下一個事件的策略。(注:Disruptor定義了多種不同的策略,針對不同的場
景,提供了不一樣的性能表現)
  ? - Event
  在Disruptor的語義中,生產者和消費者之間進行交換的數據被稱為事件(Event)。它不是一個被Disruptor
定義的特定類型,而是由 Disruptor 的使用者定義并指定。
  ? - EventProcessor
    EventProcessor持有特定消費者的Sequence,并提供用于調用事件處理實現的事件循環(Event Loop)。
  ? - EventHandler
    Disruptor 定義的事件處理接口,由用戶實現,用于處理事件,是 Consumer 的真正實現。
  ? - Producer
    即生產者,只是泛指調用 Disruptor 發布事件的用戶代碼,Disruptor 沒有定義特定接口或類型。
? 栗子:

  Event:

/*** 事件(Event)就是通過 Disruptor 進行交換的數據類型。* @author lcy**/ public class TransactionEvent {private long seq;private double amount;private long callNumber;public long getCallNumber() {return callNumber;}@Overridepublic String toString() {return "TransactionEvent [seq=" + seq + ", amount=" + amount + ", callNumber=" + callNumber + "]";}public void setCallNumber(long callNumber) {this.callNumber = callNumber;}public long getSeq() {return seq;}public void setSeq(long seq) {this.seq = seq;}public double getAmount() {return amount;}public void setAmount(double amount) {this.amount = amount;} }

  Factory:

/*** Event Factory 定義了如何實例化前面第1步中定義的事件(Event)* Disruptor 通過 EventFactory 在 RingBuffer 中預創建 Event 的實例。 一個 Event 實例實際上被用作一個“數據槽”,發布者發布前,先從 RingBuffer 獲得一個 Event 的實例,然后往 Event 實例中填充數據,之后再發布到 RingBuffer中,之后由 Consumer 獲得該 Event 實例并從中讀取數據。* @author lcy**/ public class TransactionEventFactory implements EventFactory<TransactionEvent>{@Overridepublic TransactionEvent newInstance() {// TODO Auto-generated method stubreturn new TransactionEvent();} }

  Customer:

/*** 事件處理類-交易流水初始化* @author lcy**/ public class AmountTrasfer implements EventTranslator<TransactionEvent>{@Overridepublic void translateTo(TransactionEvent arg0, long arg1) {arg0.setAmount(Math.random()*99);arg0.setCallNumber(17088888888L);arg0.setSeq(System.currentTimeMillis());System.out.println("設置交易流水:"+arg0.getSeq());} } /*** 消費者–定義事件處理的具體實現* 攔截交易流水* @author lcy**/ public class TransHandler implements EventHandler<TransactionEvent>,WorkHandler<TransactionEvent>{@Overridepublic void onEvent(TransactionEvent transactionEvent) throws Exception {System.out.println("交易流水號為:"+transactionEvent.getSeq()+"||交易金額為:"+transactionEvent.getAmount());}@Overridepublic void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception {// TODO Auto-generated method stubthis.onEvent(arg0);} } /*** 發送驗證短信* @author lcy**/ public class SendMsgHandler implements EventHandler<TransactionEvent>{@Overridepublic void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception {// TODO Auto-generated method stubSystem.out.println("向手機號:"+arg0.getCallNumber()+"發送驗證短信......");} } /*** 交易流水入庫操作* @author lcy**/ public class InnerDBHandler implements EventHandler<TransactionEvent>,WorkHandler<TransactionEvent>{@Overridepublic void onEvent(TransactionEvent arg0, long arg1, boolean arg2) throws Exception {// TODO Auto-generated method stubthis.onEvent(arg0);}@Overridepublic void onEvent(TransactionEvent arg0) throws Exception {arg0.setSeq(arg0.getSeq()*10000);System.out.println("攔截入庫流水號------------ "+arg0.getSeq());} }

  Producer:

/*** 生產者、發布事件* @author lcy**/ public class TransactionEventProducer implements Runnable {// 線程同步輔助類 - 允許一個或多個線程一直等待 CountDownLatch cdl;Disruptor disruptor;public TransactionEventProducer(CountDownLatch cdl, Disruptor disruptor) {super();this.cdl = cdl;this.disruptor = disruptor;}public TransactionEventProducer() {super();// TODO Auto-generated constructor stub }@Overridepublic void run() {AmountTrasfer th;try {//Event對象初始化類th = new AmountTrasfer();//發布事件 disruptor.publishEvent(th);} finally {// 遞減鎖存器的計數 -如果計數到達零,則釋放所有等待的線程。 cdl.countDown();}}// 定義環大小,2的倍數private static final int BUFFER_SIZE = 1024;// 定義處理事件的線程或線程池ExecutorService pool = Executors.newFixedThreadPool(7);/*** 批處理模式* @throws Exception*/public void BatchDeal() throws Exception {//創建一個單生產者的ringBufferfinal RingBuffer<TransactionEvent> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TransactionEvent>() {@Overridepublic TransactionEvent newInstance() {return new TransactionEvent();}//設置等待策略,YieldingWaitStrategy 的性能是最好的,適合用于低延遲的系統。}, BUFFER_SIZE,new YieldingWaitStrategy());//創建SequenceBarrierSequenceBarrier barrier = ringBuffer.newBarrier();//創建消息處理器BatchEventProcessor<TransactionEvent> eventProcessor = new BatchEventProcessor<TransactionEvent>(ringBuffer,barrier,new InnerDBHandler());//構造反向依賴,eventProcessor之間沒有依賴關系則可以將Sequence直接加入 ringBuffer.addGatingSequences(eventProcessor.getSequence());//提交消息處理器 pool.submit(eventProcessor);//提交一個有返回值的任務用于執行,返回一個表示任務的未決結果的 Future。Future<Void> submit = pool.submit(new Callable<Void>() {//計算結果,如果無法計算結果則拋出異常 @Overridepublic Void call() throws Exception {long seq;for (int i=0;i<7000;i++) {System.out.println("生產者:"+i);//環里一個可用的區塊seq=ringBuffer.next();//為環里的對象賦值ringBuffer.get(seq).setAmount(Math.random()*10);System.out.println("TransactionEvent: "+ringBuffer.get(seq).toString());//發布這個區塊的數據, ringBuffer.publish(seq);}return null;}});//等待計算完成,然后獲取其結果。 submit.get();Thread.sleep(1000);//關閉消息處理器 eventProcessor.halt();//關閉線程池 pool.shutdown();}/*** 工作池模式* @throws Exception*/@SuppressWarnings("unchecked")public void poolDeal() throws Exception {RingBuffer<TransactionEvent> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TransactionEvent>() {@Overridepublic TransactionEvent newInstance() {return new TransactionEvent();}}, BUFFER_SIZE, new YieldingWaitStrategy());SequenceBarrier barrier = ringBuffer.newBarrier();//創建一個定長的線程池ExecutorService pool2 = Executors.newFixedThreadPool(5);//交易流水入庫操作WorkHandler<TransactionEvent> innerDBHandler = new InnerDBHandler();ExceptionHandler arg2;WorkerPool<TransactionEvent> workerPool = new WorkerPool<TransactionEvent>(ringBuffer, barrier, new IgnoreExceptionHandler(), innerDBHandler);workerPool.start(pool2);long seq;for(int i =0;i<7;i++){seq = ringBuffer.next();ringBuffer.get(seq).setAmount(Math.random()*99);ringBuffer.publish(seq);}Thread.sleep(1000);workerPool.halt();pool2.shutdown();}/*** disruptor處理器用來組裝生產者和消費者* @throws Exception */@SuppressWarnings("unchecked")public void disruptorManage() throws Exception{//創建用于處理事件的線程池ExecutorService pool2 = Executors.newFixedThreadPool(7);//創建disruptor對象/*** 用來指定數據生成者有一個還是多個,有兩個可選值ProducerType.SINGLE和ProducerType.MULTI* BusySpinWaitStrategy是一種延遲最低,最耗CPU的策略。通常用于消費線程數小于CPU數的場景*/Disruptor<TransactionEvent> disruptor2 = new Disruptor<TransactionEvent>(new EventFactory<TransactionEvent>() {@Overridepublic TransactionEvent newInstance() {return new TransactionEvent();} },BUFFER_SIZE,pool2,ProducerType.SINGLE,new BusySpinWaitStrategy());//創建消費者組,先執行攔截交易流水和入庫操作EventHandlerGroup<TransactionEvent> eventsWith = disruptor2.handleEventsWith(new InnerDBHandler(),new TransHandler());//在進行風險交易的2次驗證操作eventsWith.then(new SendMsgHandler());//啟動disruptor disruptor2.start();//在線程能通過 await()之前,必須調用 countDown() 的次數CountDownLatch latch = new CountDownLatch(1);//將封裝好的TransactionEventProducer類提交pool2.submit(new TransactionEventProducer(latch,disruptor2));//使當前線程在鎖存器倒計數至零之前一直等待,以保證生產者任務完全消費掉 latch.await();//關閉disruptor業務邏輯處理器 disruptor2.shutdown();//銷毀線程池 pool2.shutdown();} }

  Test:

/*** 測試類* @author lcy**/ public class Test {public static void main(String[] args) throws Exception {TransactionEventProducer producer = new TransactionEventProducer();for (int i = 0; i < 100; i++)producer.disruptorManage();System.out.println("--------------------------------------------------");} }

三、記一次生產上的BUG

  前段時間升級的時候出現了這樣一個BUG,導致了近萬用戶的交易失敗。首先確認了我們在生產上并沒有部署攔
截交易的規則,所有的交易流水都是放行的不會加入我們的風險名單庫。那么內存庫里的幾萬灰名單是怎么來的呢?
  我們在上線成功后需使用真實的用戶進行一波生產上的測試,而在測試的過程中為了配合測試那邊的需求,
需將特定的幾個測試賬號配置成加入灰名單并進行2次認證的攔截規則。測試通過后便將那幾條測試規則給撤回了。但
是我們忽略了一個問題,因為Disruptor框架在初始化環的時候,只會new一次這個對象。這就導致了插入環里“槽”的對
象始終都是第一次進入“灰名單”對象,等到環被塞滿后下條流水進來的時候就會使用“槽”里的“灰名單”對象。即使這筆
交易不是風險交易也會加入到灰名單中,導致了大量的用戶交易失敗。
  上線后的第二天,我們頭兒就意識到了這個問題,想通過重啟服務、清空環來暫時解決這個問題(服務器有負載均
衡),因為環被清空后,之前在環里的“灰名單”對象也就不存在了,而且生產上沒有部署將用戶加入“灰名單”的規則,環
里的對象就一定是“干凈的”,這個問題也就得到了解決。但是、可是、可但是、萬萬沒想到啊,當晚生產上還是出現了問題。
灰名單里的用戶數量已經逼近2萬了,大量用戶不能進行電子交易。
  為什么項目已經被重啟了,環也被清空了,也沒有規則會產生新的灰名單,那2萬的灰名單用戶是從哪來的?事后
通過查看代碼發現,雖然環被清空了,但是在清空之前已經有部分用戶被存到了灰名單里。這些用戶在某一時間再次
進行交易時,會重新將這條交易的狀態設置為“灰名單”(其他業務需要),這就導致了接待這條交易流水的“槽”會被重
新賦值為“灰名單”的狀態,然后環里的“灰名單”槽就會越滾越多。
  Event在使用的時候一定不要忘記將關鍵的屬性進行初始化,這樣才能保證從環里取出的對象是初始狀態的,不會被上次處理的數據所影響。

轉載于:https://www.cnblogs.com/0813lichenyu/p/9244410.html

總結

以上是生活随笔為你收集整理的disruptor入门的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。