日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Disruptor并发框架-2

發(fā)布時間:2024/4/13 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Disruptor并发框架-2 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
import java.util.concurrent.atomic.AtomicInteger;public class Trade { private String id;//ID private String name;private double price;//金額 private AtomicInteger count = new AtomicInteger(0);public String getId() {return id;}public void setId(String id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public double getPrice() {return price;}public void setPrice(double price) {this.price = price;}public AtomicInteger getCount() {return count;}public void setCount(AtomicInteger count) {this.count = count;} } import java.util.UUID;import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler;public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { //這里做具體的消費邏輯 event.setId(UUID.randomUUID().toString());//簡單生成下ID System.out.println(event.getId()); } } import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;import com.lmax.disruptor.BatchEventProcessor; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.YieldingWaitStrategy;public class Main1 { public static void main(String[] args) throws Exception { int BUFFER_SIZE=1024; int THREAD_NUMBERS=4; /* * createSingleProducer創(chuàng)建一個單生產(chǎn)者的RingBuffer, * 第一個參數(shù)叫EventFactory,從名字上理解就是"事件工廠",其實它的職責(zé)就是產(chǎn)生數(shù)據(jù)填充RingBuffer的區(qū)塊。 * 第二個參數(shù)是RingBuffer的大小,它必須是2的指數(shù)倍 目的是為了將求模運算轉(zhuǎn)為&運算提高效率 * 第三個參數(shù)是RingBuffer的生產(chǎn)都在沒有可用區(qū)塊的時候(可能是消費者(或者說是事件處理器) 太慢了)的等待策略 */ final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() { @Override public Trade newInstance() { return new Trade(); } }, BUFFER_SIZE, new YieldingWaitStrategy()); //創(chuàng)建線程池 ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); //創(chuàng)建SequenceBarrier SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); //創(chuàng)建消息處理器 BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>( ringBuffer, sequenceBarrier, new TradeHandler()); //這一步的目的就是把消費者的位置信息引用注入到生產(chǎn)者 如果只有一個消費者的情況可以省略 ringBuffer.addGatingSequences(transProcessor.getSequence()); //把消息處理器提交到線程池 executors.submit(transProcessor); //如果存在多個消費者 那重復(fù)執(zhí)行上面3行代碼 把TradeHandler換成其它消費者類 Future<?> future= executors.submit(new Callable<Void>() { @Override public Void call() throws Exception { long seq; for(int i=0;i<10;i++){ seq = ringBuffer.next();//占個坑 --ringBuffer一個可用區(qū)塊 ringBuffer.get(seq).setPrice(Math.random()*9999);//給這個區(qū)塊放入 數(shù)據(jù) ringBuffer.publish(seq);//發(fā)布這個區(qū)塊的數(shù)據(jù)使handler(consumer)可見 } return null; } }); future.get();//等待生產(chǎn)者結(jié)束 Thread.sleep(1000);//等上1秒,等消費都處理完成 transProcessor.halt();//通知事件(或者說消息)處理器 可以結(jié)束了(并不是馬上結(jié)束!!!) executors.shutdown();//終止線程 } }

?

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.IgnoreExceptionHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.WorkerPool;public class Main2 { public static void main(String[] args) throws InterruptedException { int BUFFER_SIZE=1024; int THREAD_NUMBERS=4; EventFactory<Trade> eventFactory = new EventFactory<Trade>() { public Trade newInstance() { return new Trade(); } }; RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS); WorkHandler<Trade> handler = new TradeHandler(); WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler); workerPool.start(executor); //下面這個生產(chǎn)8個數(shù)據(jù)for(int i=0;i<8;i++){ long seq=ringBuffer.next(); ringBuffer.get(seq).setPrice(Math.random()*9999); ringBuffer.publish(seq); } Thread.sleep(1000); workerPool.halt(); executor.shutdown(); } }

?

超強干貨來襲 云風(fēng)專訪:近40年碼齡,通宵達旦的技術(shù)人生

總結(jié)

以上是生活随笔為你收集整理的Disruptor并发框架-2的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。