每日一博 - DelayQueue阻塞队列源码解读
文章目錄
- Pre
- DelayQueue特征
- Leader/Followers模式
- DelayQueue源碼分析
- 類繼承關系
- 核心方法
- 成員變量
- 構造函數
- 入隊方法
- offer(E e)
- 出隊方法
- poll()
- poll(long timeout, TimeUnit unit)
- take()
- peek
- 小結
Pre
每日一博 - 延時任務的多種實現方式解讀
DelayQueue 由優先級支持的、基于時間的調度隊列,內部使用非線程安全的優先隊列(PriorityQueue)實現,而無界隊列基于數組的擴容實現。
在創建元素時,可以指定多久才能從隊列中獲取當前元素。只有延時期滿后才能從隊列中獲取元素。
創建隊列
-
BlockingQueue<String> blockingQueue = new DelayQueue();入隊的對象必須要實現 Delayed接口,而Delayed集成自 Comparable 接口。
Delayed 接口使對象成為延遲對象,它使存放在DelayQueue類中的對象具有了激活日期。該接口強制實現下列兩個方法。
- compareTo(Delayed o):Delayed接口繼承了Comparable接口,因此有了這個方法。讓元素按激活日期排隊
- getDelay(TimeUnit unit):這個方法返回到激活日期的剩余時間,時間單位由單位參數指定。
DelayQueue特征
- DelayQueue的泛型參數需要實現Delayed接口
Delayed接口繼承了Comparable接口,DelayQueue內部使用非線程安全的優先隊列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待時間。
- DelayQueue不允許包含null元素。
Leader/Followers模式
-
有若干個線程(一般組成線程池)用來處理大量的事件
-
有一個線程作為領導者,等待事件的發生;其他的線程作為追隨者,僅僅是睡眠
-
假如有事件需要處理,領導者會從追隨者中指定一個新的領導者,自己去處理事件
-
喚醒的追隨者作為新的領導者等待事件的發生
-
處理事件的線程處理完畢以后,就會成為追隨者的一員,直到被喚醒成為領導者
-
假如需要處理的事件太多,而線程數量不夠(能夠動態創建線程處理另當別論),則有的事件可能會得不到處理。
所以線程會有三種身份中的一種:leader 和 follower,以及一個干活中的狀態:processser。
基本原則就 永遠最多只有一個 leader。
而所有 follower 都在等待成為 leader。
線程池啟動時會自動產生一個 Leader 負責等待網絡 IO 事件,當有一個事件產生時,Leader 線程首先通知一個 Follower 線程將被其提拔為新的 Leader ,然后自己就去干活了,去處理這個網絡事件,處理完畢后加入 Follower 線程等待隊列,等待下次成為 Leader。
這種方法可以增強 CPU高速緩存相似性,以及消除動態內存分配和線程間的數據交換。
DelayQueue源碼分析
類繼承關系
核心方法
成員變量
DelayQueue 通過組合一個PriorityQueue 來實現元素的存儲以及優先級維護,通過ReentrantLock 來保證線程安全,通過Condition 來判斷是否可以取數據,對于leader 后面再來分析它的作用
// 可重入鎖 private final transient ReentrantLock lock = new ReentrantLock(); // 存儲元素的優先級隊列 private final PriorityQueue<E> q = new PriorityQueue<E>();// 獲取數據 等待線程標識 private Thread leader = null;// 條件控制,表示是否可以從隊列中取數據 private final Condition available = lock.newCondition();構造函數
DelayQueue 內部組合PriorityQueue,對元素的操作都是通過PriorityQueue 來實現的,DelayQueue 的構造方法很簡單,對于PriorityQueue 都是使用的默認參數,不能通過DelayQueue 來指定PriorityQueue的初始大小,也不能使用指定的Comparator,元素本身就需要實現Comparable ,因此不需要指定的Comparator。
入隊方法
雖然提供入隊的接口方式很多,實際都是調用的offer 方法,通過PriorityQueue 來進行入隊操作,入隊超時方法并沒有其超時功能。
-
add(E e),將指定的元素插入到此隊列中,在成功時返回 true
-
put(E e),將指定的元素插入此隊列中,隊列達到最大值,則拋oom異常
-
offer(E e),將指定的元素插入到此隊列中,在成功時返回 true
-
offer(E e, long timeout, TimeUnit unit),指定一個等待時間將元素放入隊列中并沒有意義
offer(E e)
/*** Inserts the specified element into this delay queue.** @param e the element to add* @return {@code true}* @throws NullPointerException if the specified element is null*/public boolean offer(E e) {final ReentrantLock lock = this.lock;lock.lock();try {q.offer(e);if (q.peek() == e) {leader = null;available.signal();}return true;} finally {lock.unlock();}}將指定的元素插入到此隊列中,在成功時返回 true,其他幾個方法內部都調用了offer 方法,我們也可以直接調用offer 方法來完成入隊操作。
peek并不一定是當前添加的元素,隊頭是當前添加元素,說明當前元素e的優先級最小也就即將過期的,這時候激活avaliable變量條件隊列里面的一個線程,通知他們隊列里面有元素了。
public boolean offer(E e) {final ReentrantLock lock = this.lock;// 獲取鎖lock.lock();try {//通過PriorityQueue 來將元素入隊q.offer(e);//peek 是獲取的隊頭元素,喚醒阻塞在available 條件上的一個線程,表示可以從隊列中取數據了if (q.peek() == e) {leader = null;// 喚醒通知available.signal();}return true;} finally {// 解鎖lock.unlock();} }出隊方法
- poll(),獲取并移除此隊列的頭,如果此隊列為空,則返回 null
- poll(long timeout, TimeUnit unit),獲取并移除此隊列的頭部,在指定的等待時間前等待
- take(),獲取并移除此隊列的頭部,在元素變得可用之前一直等待
- peek(),調用此方法,可以返回隊頭元素,但是元素并不出隊
poll()
獲取并移除此隊列的頭,如果此隊列為空,則返回 null
public E poll() {final ReentrantLock lock = this.lock;// 獲取同步鎖lock.lock();try {// 獲取隊頭元素E first = q.peek();// 如果對頭為null 或者 延時還沒有到,則返回 nullif (first == null || first.getDelay(NANOSECONDS) > 0)return null;elsereturn q.poll(); // 否則元素出隊} finally {lock.unlock();} }poll(long timeout, TimeUnit unit)
獲取并移除此隊列的頭部,在指定的等待時間前等待。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {// 超時等待時間long nanos = unit.toNanos(timeout);final ReentrantLock lock = this.lock;// 獲取可中斷鎖lock.lockInterruptibly();try // 無限循環for (;;) {// 獲取隊頭元素E first = q.peek// 隊頭為空,也就是隊列為空if (first == null) {// 達到超時指定時間,返回nullif (nanos <= 0)return null;else// 如果還沒有超時,那么在available條件上進行等待nanos時間nanos = available.awaitNanos(nanos);} else {// 獲取元素延遲時間long delay = first.getDelay(NANOSECONDS);// 延時到期if (delay <= 0)return q.poll(); // 返回出隊元素// 延時未到期,超時到期,返回nullif (nanos <= 0)return null;first = null; // don't retain ref while waiting// 超時等待時間 < 延遲時間 或者 有其他線程再取數據if (nanos < delay || leader != null)// 在available條件上進行等待nanos時間nanos = available.awaitNanos(nanos);else {// 超時等待時間 > 延遲時間 // 并且沒有其他線程在等待,那么當前元素成為leader,表示 leader線程最早 正在等待獲取元素Thread thisThread = Thread.currentThread();leader = thisThread;try {// 等待 延遲時間 超時long timeLeft = available.awaitNanos(delay);// 還需要繼續等待 nanosnanos -= delay - timeLeft;} finally {// 清除 leaderif (leader == thisThread)leader = null;}}}}} finally {// 喚醒阻塞在 available 的一個線程,表示可以取數據了if (leader == null && q.peek() != null)available.signal// 釋放鎖lock.unlock();} }梳理一下
需要注意的時Condition 條件在阻塞時會釋放鎖,在被喚醒時會再次獲取鎖,獲取成功才會返回。 當進行超時等待時,阻塞在Condition 上后會釋放鎖,一旦釋放了鎖,那么其它線程就有可能參與競爭,某一個線程就可能會成為leader(參與競爭的時間早,并且能在等待時間內能獲取到隊頭元素那么就可能成為leader) leader是用來減少不必要的競爭,如果leader不為空說明已經有線程在取了,設置當前線程等待即可。
leader 就是一個信號,告訴其它線程:你們不要再去獲取元素了,它們延遲時間還沒到期,我都還沒有取到數據呢,你們要取數據,等我取了再說
take()
獲取并移除此隊列的頭部,在元素變得可用之前一直等待
public E take() throws InterruptedException {final ReentrantLock lock = this.lock;// 獲取可中斷鎖lock.lockInterruptibly();try {for (;;) {// 獲取隊頭元素E first = q.peek();// 隊頭元素為空,則阻塞等待if (first == null)available.await();else {// 獲取元素延遲時間long delay = first.getDelay(NANOSECONDS);// 延時到期if (delay <= 0)return q.poll(); // 返回出隊元素first = null; // don't retain ref while waiting// 如果有其它線程在等待獲取元素,則當前線程不用去競爭,直接等待if (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {// 等待延遲時間到期available.awaitNanos(delay);} finally {//清除 leaderif (leader == thisThread)leader = null;}}}}} finally {//喚醒阻塞在available 的一個線程,表示可以取數據了if (leader == null && q.peek() != null)available.signal();// 釋放鎖lock.unlock();} }該方法就是相當于在前面的超時等待中,把超時時間設置為無限大,那么這樣只要隊列中有元素,要是元素延遲時間要求,那么就可以取出元素,否則就直接等待元素延遲時間到期,再取出元素,最先參與等待的線程會成為leader。
peek
調用此方法,可以返回隊頭元素,但是元素并不出隊。
public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {//返回隊列頭部元素,元素不出隊return q.peek();} finally {lock.unlock();} }小結
- DelayQueue 內部通過組合PriorityQueue 來實現存儲和維護元素順序的;
- DelayQueue 存儲元素必須實現Delayed 接口,通過實現Delayed 接口,可以獲取到元素延遲時間,以及可以比較元素大小(Delayed 繼承Comparable);
- DelayQueue 通過一個可重入鎖來控制元素的入隊出隊行為;
- DelayQueue 中leader 標識 用于減少線程的競爭,表示當前有其它線程正在獲取隊頭元素;
- PriorityQueue 只是負責存儲數據以及維護元素的順序,對于延遲時間取數據則是在DelayQueue 中進行判斷控制的;
- DelayQueue 沒有實現序列化接口
總結
以上是生活随笔為你收集整理的每日一博 - DelayQueue阻塞队列源码解读的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 每日一博 - Spring Boot A
- 下一篇: Docker Review - Dock