Disruptor并发框架-1
生活随笔
收集整理的這篇文章主要介紹了
Disruptor并发框架-1
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
//http://ifeve.com/disruptor-getting-started/
public class LongEvent { private long value;public long getValue() { return value; } public void setValue(long value) { this.value = value; }
}
import com.lmax.disruptor.EventFactory;
// 需要讓disruptor為我們創建事件,我們同時還聲明了一個EventFactory來實例化Event對象。
public class LongEventFactory implements EventFactory { @Override public Object newInstance() { return new LongEvent(); }
}
import com.lmax.disruptor.EventHandler;//我們還需要一個事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中存儲的數據打印到終端:
public class LongEventHandler implements EventHandler<LongEvent> {@Overridepublic void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {System.out.println(longEvent.getValue()); }}
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;public class LongEventMain {public static void main(String[] args) throws Exception {//創建緩沖池ExecutorService executor = Executors.newCachedThreadPool();//創建工廠LongEventFactory factory = new LongEventFactory();//創建bufferSize ,也就是RingBuffer大小,必須是2的N次方int ringBufferSize = 1024 * 1024; // /**//BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小并且在各種不同部署環境中能提供更加一致的性能表現WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();//SleepingWaitStrategy 的性能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其對生產者線程的影響最小,適合用于異步日志類似的場景WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();//YieldingWaitStrategy 的性能是最好的,適合用于低延遲的系統。在要求極高性能且事件處理線數小于CPU邏輯核心數的場景中,推薦使用此策略;例如,CPU開啟超線程的特性WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();*///創建disruptor// 1.第一個參數為工廠類對象,用于創建一個個的LongEvent,LongEvent是實際的消費數據// 2.第二個參數為緩沖區大小// 3.第三個參數線程池 進行 Disruptor 內部的數據接收處理調度// 4.第四個參數ProducerType.SINGLE 和 ProducerType.MULTI// 5.第五個參數是一種策略:WaitStrategyDisruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());// 連接消費事件方法disruptor.handleEventsWith(new LongEventHandler());// 啟動disruptor.start();//Disruptor 的事件發布過程是一個兩階段提交的過程://發布事件// 使用該方法獲取具體存放數據的容器ringBuffer(環形結構)RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();// LongEventProducer producer = new LongEventProducer(ringBuffer); LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);ByteBuffer byteBuffer = ByteBuffer.allocate(8);for(long l = 0; l<100; l++){byteBuffer.putLong(0, l);producer.onData(byteBuffer);//Thread.sleep(1000);}disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理;executor.shutdown();//關閉 disruptor 使用的線程池;如果需要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉; }
}
import java.nio.ByteBuffer;import com.lmax.disruptor.RingBuffer;
/*** 很明顯的是:當用一個簡單隊列來發布事件的時候會牽涉更多的細節,這是因為事件對象還需要預先創建。* 發布事件最少需要兩步:獲取下一個事件槽并發布事件(發布事件的時候要使用try/finnally保證事件一定會被發布)。* 如果我們使用RingBuffer.next()獲取一個事件槽,那么一定要發布對應的事件。* 如果不能發布事件,那么就會引起Disruptor狀態的混亂。* 尤其是在多個事件生產者的情況下會導致事件消費者失速,從而不得不重啟應用才能會恢復。* @since 2015年11月23日*/
public class LongEventProducer {private final RingBuffer<LongEvent> ringBuffer;public LongEventProducer(RingBuffer<LongEvent> ringBuffer){this.ringBuffer = ringBuffer;}/*** onData用來發布事件,每調用一次就發布一次事件* 它的參數會用過事件傳遞給消費者*/public void onData(ByteBuffer bb){//1.可以把ringBuffer看做一個事件隊列,那么next就是得到下面一個事件槽long sequence = ringBuffer.next();try {//2.用上面的索引取出一個空的事件用于填充(獲取該序號對應的事件對象)LongEvent event = ringBuffer.get(sequence);//3.獲取要通過事件傳遞的業務數據event.setValue(bb.getLong(0));} finally {//4.發布事件//注意,最后的 ringBuffer.publish 方法必須包含在 finally 中以確保必須得到調用;如果某個請求的 sequence 未被提交,將會堵塞后續的發布操作或者其它的 producer。ringBuffer.publish(sequence);}}}
import java.nio.ByteBuffer;import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;/*** Disruptor 3.0提供了lambda式的API。這樣可以把一些復雜的操作放在Ring Buffer,* 所以在Disruptor3.0以后的版本最好使用Event Publisher或者Event Translator來發布事件* @since 2015年11月23日*/
public class LongEventProducerWithTranslator {//一個translator可以看做一個事件初始化器,publicEvent方法會調用它//填充Eventprivate static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {@Overridepublic void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) {event.setValue(buffer.getLong(0));}};private final RingBuffer<LongEvent> ringBuffer;public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {this.ringBuffer = ringBuffer;}public void onData(ByteBuffer buffer){ringBuffer.publishEvent(TRANSLATOR, buffer);}}
?
?
?
?
?
總結
以上是生活随笔為你收集整理的Disruptor并发框架-1的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 线程池 Executors2
- 下一篇: Disruptor并发框架-2