日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

DelayQueue源码

發布時間:2024/4/13 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 DelayQueue源码 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

介紹

一個實現PriorityBlockingQueue實現延遲獲取的無界隊列,在創建元素時,可以指定多久才能從隊列中獲取當前元素。只有延時期滿后才能從隊列中獲取元素。(DelayQueue可以運用在以下應用場景:1.緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。2.定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。)

數據結構

public interface Delayed extends Comparable<Delayed> {/*** 返回與此對象相關的剩余延遲時間,以給定的時間單位表示*/long getDelay(TimeUnit unit); }

?getDelay方法一般用內部存儲的事件,減去當前事件,即為剩余延遲事件

屬性

private final transient ReentrantLock lock = new ReentrantLock();private final PriorityQueue<E> q = new PriorityQueue<E>();/***用于優化內部阻塞通知的線程*/private Thread leader = null;private final Condition available = lock.newCondition();

以支持優先級的PriorityQueue無界隊列作為一個容器,因為元素都必須實現Delayed接口,可以根據元素的過期時間來對元素進行排列,因此,先過期的元素會在隊首,每次從隊列里取出來都是最先要過期的元素。

leader是一個Thread元素,它在offer和take中都有使用,它代表當前獲取到鎖的消費者線程,

DelayQueue實現Leader-Folloer pattern
1、當存在多個take線程時,同時只生效一個,即,leader線程
2、當leader存在時,其它的take線程均為follower,其等待是通過condition實現的
3、當leader不存在時,當前線程即成為leader,在delay之后,將leader角色釋放還原
4、最后如果隊列還有內容,且leader空缺,則調用一次condition的signal,喚醒掛起的take線程,其中之一將成為新的leader
5、最后在finally中釋放鎖

方法實現

offer,poll,peek

public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);//如果插入元素是第一個元素if (q.peek() == e) {//leader設置為nullleader = null;//喚醒available.signal();}return true;} finally {lock.unlock();}}public boolean offer(E e, long timeout, TimeUnit unit) {return offer(e);}public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {E first = q.peek();//如果未到期,則返回null,否則刪除if (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn q.poll();} finally {lock.unlock();}}public E poll(long timeout, TimeUnit unit) throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {E first = q.peek();if (first == null) {if (nanos <= 0)return null;elsenanos = available.awaitNanos(nanos);} else {long delay = first.getDelay(NANOSECONDS);//到期,則pollif (delay <= 0)return q.poll();if (nanos <= 0)return null;first = null; // don't retain ref while waitingif (nanos < delay || leader != null)//nanos<delay,表示超時剩余時間小于到期時間,nanos = available.awaitNanos(nanos);else {Thread thisThread = Thread.currentThread();//設置當前線程為leaderleader = thisThread;try {//等待條件long timeLeft = available.awaitNanos(delay);//剩余超時時間nanos -= delay - timeLeft;} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && q.peek() != null)available.signal();lock.unlock();}}public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {return q.peek();} finally {lock.unlock();}}

put,take

/*** Retrieves and removes the head of this queue, waiting if necessary* until an element with an expired delay is available on this queue.** @return the head of this queue* @throws InterruptedException {@inheritDoc}*/public E take() throws InterruptedException {final ReentrantLock lock = this.lock;// 獲取可中斷鎖。lock.lockInterruptibly();try {for (;;) {// 從優先級隊列中獲取隊列頭元素E first = q.peek();if (first == null)// 無元素,當前線程加入等待隊列,并阻塞available.await();else {// 通過getDelay 方法獲取延遲時間long delay = first.getDelay(NANOSECONDS);if (delay <= 0)// 延遲時間到期,獲取并刪除頭部元素。return q.poll();first = null; // don't retain ref while waitingif (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {// 線程節點進入等待隊列 x 納秒。available.awaitNanos(delay);} finally {if (leader == thisThread)leader = null;}}}}} finally {// leader == null且還存在元素的話,喚醒一個消費線程。if (leader == null && q.peek() != null)available.signal();lock.unlock();}}public void put(E e) {offer(e);}

take()方法邏輯:

1.獲取鎖
2.取出優先級隊列q的首元素
3.如果元素q的隊首/隊列為空,阻塞
3.如果元素q的隊首(first)不為空,獲得這個元素的delay時間值,如果first的延遲delay時間值為0的話,說明該元素已經到了可以使用的時間,調用poll方法彈出該元素,跳出方法
4.如果first的延遲delay時間值不為0的話,釋放元素first的引用,避免內存泄露
5.循環以上操作,直到return

leader作用

如果leader不為null,說明已經有消費者線程拿到鎖,直接阻塞當前線程,如果leader為null,把當前線程賦值給leader,并等待剩余的到期時間,最后釋放leader,這里我們想象著我們有個多個消費者線程用take方法去取,如果沒有leader!=null的判斷,這些線程都會無限循環,直到返回第一個元素,很顯然很浪費資源。所以leader的作用是設置一個標記,來避免消費者的無腦競爭。

參考

  • ReentrantLock源碼
  • Java并發包--阻塞隊列(BlockingQueue)
  • PriorityBlockingQueue源碼

總結

以上是生活随笔為你收集整理的DelayQueue源码的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。