Java实现生产消费模型的5种方式
**
前言
**
生產者和消費者問題是線程模型中的經典問題:生產者和消費者在同一時間段內共用同一個存儲空間,生產者往存儲空間中添加產品,消費者從存儲空間中取走產品,當存儲空間為空時,消費者阻塞,當存儲空間滿時,生產者阻塞。
以下這些解法,其實本質上都是實現了一個阻塞隊列。為空,則消費者阻塞,滿了,則生產者阻塞。
**
1.使用wait()和notify()實現
**
這也是最簡單最基礎的實現,緩沖區滿和為空時都調用wait()方法等待,當生產者生產了一個產品或者消費者消費了一個產品之后會喚醒所有線程。
public static void testProductConsumeByWaitAndNotify() {final int size = 10;final Queue<String> queue = new ArrayDeque<String>(size);final Object lock = new Object();Runnable producer = new Runnable() {public void run() {for(int i=0;i<30;i++) {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}String msg = "消息:"+i;//隊列未滿,一直往里放消息synchronized (lock) {while (size == queue.size()) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}queue.offer(msg);lock.notifyAll();}System.out.println(msg+" 已發送");}}};Runnable consumer = new Runnable() {public void run() {while (true) {try {Thread.sleep(200);} catch (InterruptedException e1) {e1.printStackTrace();}synchronized (lock) {while (queue.size() == 0) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}String msg = queue.poll();System.out.println(msg+"已消費");lock.notifyAll();}}}};new Thread(producer).start();new Thread(producer).start();new Thread(producer).start();new Thread(consumer).start();new Thread(consumer).start();}**
2.可重入鎖ReentrantLock的實現
**
java.util.concurrent.lock 中的 Lock 框架是鎖定的一個抽象,通過對lock的lock()方法和unlock()方法實現了對鎖的顯示控制,而synchronize()則是對鎖的隱性控制。
可重入鎖,也叫做遞歸鎖,指的是同一線程 外層函數獲得鎖之后 ,內層遞歸函數仍然有獲取該鎖的代碼,但不受影響,簡單來說,該鎖維護這一個與獲取鎖相關的計數器,如果擁有鎖的某個線程再次得到鎖,那么獲取計數器就加1,函數調用結束計數器就減1,然后鎖需要被釋放兩次才能獲得真正釋放。已經獲取鎖的線程進入其他需要相同鎖的同步代碼塊不會被阻塞。
ReentrantLock的Condition:
//阻塞當前線程,直到收到通知或者被中斷(將當前線程加入到當前Condition對象的等待隊列里) //Block until signalled or interrupted public final void await() throws InterruptedException;/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * 把在當前Condition對象的等待隊列里的等待最久的線程,轉移到當前Lock的等待隊列里 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() ;ReentrantLock實現生產消費模型:
public static void testProductConsumeByLock() {final Lock lock = new ReentrantLock();final Condition empty = lock.newCondition();final Condition full = lock.newCondition();final int size = 10;final Queue<String> queue = new ArrayDeque<String>(size);Runnable producer = new Runnable() {public void run() {try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}for(int i=0;i<20;i++) {lock.lock();try {if(queue.size() == size) {try {full.await();} catch (InterruptedException e) {e.printStackTrace();}}String msg = "生產消息:"+i;queue.add(msg);System.out.println(msg);empty.signal();} finally {lock.unlock();}}}};Runnable consumer = new Runnable() {public void run() {try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}while (true) {lock.lock();try {if(queue.isEmpty()) {try {empty.await();} catch (InterruptedException e) {e.printStackTrace();}}else {String msg = queue.remove();System.out.println(msg + "已消費");full.signal();}} finally {lock.unlock();}}}};new Thread(producer).start();new Thread(producer).start();new Thread(producer).start();new Thread(consumer).start();new Thread(consumer).start();}**
3.阻塞隊列BlockingQueue實現
**
BlockingQueue即阻塞隊列,從阻塞這個詞可以看出,在某些情況下對阻塞隊列的訪問可能會造成阻塞。被阻塞的情況主要有如下兩種:
因此,當一個線程對已經滿了的阻塞隊列進行入隊操作時會阻塞,除非有另外一個線程進行了出隊操作,當一個線程對一個空的阻塞隊列進行出隊操作時也會阻塞,除非有另外一個線程進行了入隊操作。
從上可知,阻塞隊列是線程安全的。
下面是BlockingQueue接口的一些方法:
| 插入 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
| 移除 | remove(o) | poll(o) | take(o) | poll(timeout, timeunit) |
| 檢查 | element(o) | peek(o) |
這四類方法分別對應的是:
下面來看由阻塞隊列實現的生產消費模型,這里我們使用take()和put()方法,這里生產者和生產者,消費者和消費者之間不存在同步,所以會出現連續生成和連續消費的現象
/*** 生產者消費者* 使用阻塞隊列實現* @throws InterruptedException */public static void testProductConsumeByBlockingQueue() throws InterruptedException {//因為SynchronousQueue沒有存儲功能,因此put和take會一直阻塞,直到有另一個線程已經準備好參與到交付過程中。僅當有足夠多的消費者,并且總是有一個消費者準備好獲取交付的工作時,才適合使用同步隊列。 // final BlockingQueue<String> queue = new SynchronousQueue<String>(true);//使用有界阻塞隊列final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);Runnable producer = new Runnable() {public void run() {for(int i=0;i<100;i++) {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}String msg = "消息:"+i;try {queue.put(msg);} catch (InterruptedException e1) {e1.printStackTrace();}System.out.println(msg+" 已發送");}}};Runnable consumer = new Runnable() {public void run() {while (true) {try {Thread.sleep(200);} catch (InterruptedException e1) {e1.printStackTrace();}String msg = null;try {msg = queue.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(msg+"已消費");}}};new Thread(producer).start();new Thread(consumer).start();}**
4.信號量Semaphore的實現
**
信號量可以控制訪問相應資源的線程的數量,從而實現生產消費模型
**
5.使用消息隊列
**
這個是取巧的辦法,直接使用現成的消息中間件服務(如RocketMq、RabbitMq、Kafka等),分分鐘搞定。手動微笑~~
總結
以上是生活随笔為你收集整理的Java实现生产消费模型的5种方式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java ThreadLocal
- 下一篇: JVM内存结构 VS Java内存模型