3.集--LinkedTransferQueue得知
近期在閱讀開源項目里,發現有幾個project都不盡同樣地使用LinkedTransferQueue這個數據結構。比方netty,grizzly,xmemcache,Bonecp。
Bonecp還擴展出一個BoundTransferQueue。
LinkedTransferQueue最早出如今JSR66R(一個輕量級并行運行框架)包中。眼下已合并到JDK7中。
JSR66的負責人正是大名頂頂的Doug Lea.
盡管LinkedTransferQueue被集成在JDK7中,但眼下主流的JDK平臺仍然是JDK6。以致開源項目開發人員都不迫及地把他集成在自已的項目中。
Doug Lea說LinkedTransferQueue是一個聰明的隊列。他是ConcurrentLinkedQueue,?
SynchronousQueue (in “fair” mode), and unbounded LinkedBlockingQueue的超集。
有一篇論文討論了其算法與性能:地址:http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf
LinkedTransferQueue實現了一個重要的接口TransferQueue,該接口含有以下幾個重要方法:
1. transfer(E e)
?? 若當前存在一個正在等待獲取的消費者線程。即立馬移交之;否則,會插入當前元素e到隊列尾部,而且等待進入堵塞狀態。到有消費者線程取走該元素。
2. tryTransfer(E e)
?? 若當前存在一個正在等待獲取的消費者線程(使用take()或者poll()函數)。使用該方法會即刻轉移/傳輸對象元素e;
?? 若不存在,則返回false,而且不進入隊列。這是一個不堵塞的操作。
3. tryTransfer(E e, long timeout, TimeUnit unit)
?? 若當前存在一個正在等待獲取的消費者線程,會馬上傳輸給它; 否則將插入元素e到隊列尾部,而且等待被消費者線程獲取消費掉,
?? 若在指定的時間內元素e無法被消費者線程獲取。則返回false,同一時候該元素被移除。
4. hasWaitingConsumer()
?? 推斷是否存在消費者線程
5. getWaitingConsumerCount()
?? 獲取全部等待獲取元素的消費線程數量
事實上transfer方法在SynchronousQueue的實現中就已存在了,僅僅是沒有做為API暴露出來。SynchronousQueue有一個特性:它本身不存在容量,僅僅能進行線程之間的
元素傳送。SynchronousQueue在運行offer操作時。假設沒有其它線程運行poll,則直接返回false.線程之間元素傳送正是通過transfer方法完畢的。
有一個使用案例。我們知道ThreadPoolExecutor調節線程的原則是:先調整到最小線程,最小線程用完后,他會將優先將任務放入緩存隊列(offer(task)),等緩沖隊列用完了,才會向最大線程數調節。這似乎與我們所理解的線程池模型有點不同。我們一般採用添加到最大線程后,才會放入緩沖隊列中,以達到最大性能。
ThreadPoolExecutor代碼段:
? public void execute(Runnable command) {
??????? if (command == null)
??????????? throw new NullPointerException();
??????? if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
??????????? if (runState == RUNNING && workQueue.offer(command)) {
??????????????? if (runState != RUNNING || poolSize == 0)
??????????????????? ensureQueuedTaskHandled(command);
??????????? }
??????????? else if (!addIfUnderMaximumPoolSize(command))
??????????????? reject(command); // is shutdown or saturated
??????? }
??? }
假設我們採用SynchronousQueue作為ThreadPoolExecuto的緩沖隊列時,在沒有線程運行poll時(即存在等待線程)。則workQueue.offer(command)返回false,這時ThreadPoolExecutor就會添加線程,最快地達到最大線程數。
但也僅此而已,也由于SynchronousQueue本身不存在容量,也決定了我們一般無法採用SynchronousQueue作為ThreadPoolExecutor的緩存隊列。而一般採用LinkedBlockingQueue的offer方法來實現。
最新的LinkedTransferQueue或許能夠幫我們解決問題,后面再說。
transfer算法比較復雜,實現非常難看明確。大致的理解是採用所謂雙重數據結構(dual data structures)。之所以叫雙重,其原因是方法都是通過兩個步驟完畢:
保留與完畢。比方消費者線程從一個隊列中取元素,發現隊列為空。他就生成一個空元素放入隊列,所謂空元素就是數據項字段為空。
然后消費者線程在這個字段上旅轉等待。這叫保留。直到一個生產者線程意欲向隊例中放入一個元素,這里他發現最前面的元素的數據項字段為NULL,他就直接把自已數據填充到這個元素中。即完畢了元素的傳送。大體是這個意思。這樣的方式優美了完畢了線程之間的高效協作。
對于LinkedTransferQueue,Doug Lea進行了盡乎極致的優化。Grizzly的採用了PaddedAtomicReference:
?? public LinkedTransferQueue() {
??????? QNode dummy = new QNode(null, false);
??????? head = new PaddedAtomicReference<QNode>(dummy);
??????? tail = new PaddedAtomicReference<QNode>(dummy);
??????? cleanMe = new PaddedAtomicReference<QNode>(null);
??? }
?? static final class PaddedAtomicReference<T> extends AtomicReference<T> {??????? // enough padding for 64bytes with 4byte refs
??????? Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
??????? PaddedAtomicReference(T r) { super(r); }
??? }
PaddedAtomicReference相對于父類AtomicReference僅僅做了一件事情,就將共享變量追加到64字節。我們能夠來計算下。一個對象的引用占4個字節,
它追加了15個變量共占60個字節,再加上父類的Value變量,一共64個字節。這么做的原因。
請參考http://www.infoq.com/cn/articles/ftf-java-volatile
http://rdc.taobao.com/team/jm/archives/1719 這兩文章。做JAVA。假設想成為Doug Lea這種大師,也要懂體系結構(待續)
原文地址:http://guojuanjun.blog.51cto.com/277646/948298/
版權聲明:本文博主原創文章。博客,未經同意不得轉載。
總結
以上是生活随笔為你收集整理的3.集--LinkedTransferQueue得知的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 品牌推广不得不注意的几个误区
- 下一篇: 黑马程序员——iOS学习——启动App界