日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java实现生产消费模型的5种方式

發布時間:2023/12/4 java 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java实现生产消费模型的5种方式 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

**

前言

**

生產者和消費者問題是線程模型中的經典問題:生產者和消費者在同一時間段內共用同一個存儲空間,生產者往存儲空間中添加產品,消費者從存儲空間中取走產品,當存儲空間為空時,消費者阻塞,當存儲空間滿時,生產者阻塞。


以下這些解法,其實本質上都是實現了一個阻塞隊列。為空,則消費者阻塞,滿了,則生產者阻塞。

**

1.使用wait()和notify()實現

**

這也是最簡單最基礎的實現,緩沖區滿和為空時都調用wait()方法等待,當生產者生產了一個產品或者消費者消費了一個產品之后會喚醒所有線程。

public static void testProductConsumeByWaitAndNotify() {final int size = 10;final Queue<String> queue = new ArrayDeque<String>(size);final Object lock = new Object();Runnable producer = new Runnable() {public void run() {for(int i=0;i<30;i++) {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}String msg = "消息:"+i;//隊列未滿,一直往里放消息synchronized (lock) {while (size == queue.size()) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}queue.offer(msg);lock.notifyAll();}System.out.println(msg+" 已發送");}}};Runnable consumer = new Runnable() {public void run() {while (true) {try {Thread.sleep(200);} catch (InterruptedException e1) {e1.printStackTrace();}synchronized (lock) {while (queue.size() == 0) {try {lock.wait();} catch (InterruptedException e) {e.printStackTrace();}}String msg = queue.poll();System.out.println(msg+"已消費");lock.notifyAll();}}}};new Thread(producer).start();new Thread(producer).start();new Thread(producer).start();new Thread(consumer).start();new Thread(consumer).start();}

**

2.可重入鎖ReentrantLock的實現

**

java.util.concurrent.lock 中的 Lock 框架是鎖定的一個抽象,通過對lock的lock()方法和unlock()方法實現了對鎖的顯示控制,而synchronize()則是對鎖的隱性控制。
可重入鎖,也叫做遞歸鎖,指的是同一線程 外層函數獲得鎖之后 ,內層遞歸函數仍然有獲取該鎖的代碼,但不受影響,簡單來說,該鎖維護這一個與獲取鎖相關的計數器,如果擁有鎖的某個線程再次得到鎖,那么獲取計數器就加1,函數調用結束計數器就減1,然后鎖需要被釋放兩次才能獲得真正釋放。已經獲取鎖的線程進入其他需要相同鎖的同步代碼塊不會被阻塞。

ReentrantLock的Condition:

//阻塞當前線程,直到收到通知或者被中斷(將當前線程加入到當前Condition對象的等待隊列里) //Block until signalled or interrupted public final void await() throws InterruptedException;/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * 把在當前Condition對象的等待隊列里的等待最久的線程,轉移到當前Lock的等待隊列里 * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() ;

ReentrantLock實現生產消費模型:

public static void testProductConsumeByLock() {final Lock lock = new ReentrantLock();final Condition empty = lock.newCondition();final Condition full = lock.newCondition();final int size = 10;final Queue<String> queue = new ArrayDeque<String>(size);Runnable producer = new Runnable() {public void run() {try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}for(int i=0;i<20;i++) {lock.lock();try {if(queue.size() == size) {try {full.await();} catch (InterruptedException e) {e.printStackTrace();}}String msg = "生產消息:"+i;queue.add(msg);System.out.println(msg);empty.signal();} finally {lock.unlock();}}}};Runnable consumer = new Runnable() {public void run() {try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}while (true) {lock.lock();try {if(queue.isEmpty()) {try {empty.await();} catch (InterruptedException e) {e.printStackTrace();}}else {String msg = queue.remove();System.out.println(msg + "已消費");full.signal();}} finally {lock.unlock();}}}};new Thread(producer).start();new Thread(producer).start();new Thread(producer).start();new Thread(consumer).start();new Thread(consumer).start();}

**

3.阻塞隊列BlockingQueue實現

**

BlockingQueue即阻塞隊列,從阻塞這個詞可以看出,在某些情況下對阻塞隊列的訪問可能會造成阻塞。被阻塞的情況主要有如下兩種:

  • 當隊列滿了的時候進行入隊列操作
  • 當隊列空了的時候進行出隊列操作
  • 因此,當一個線程對已經滿了的阻塞隊列進行入隊操作時會阻塞,除非有另外一個線程進行了出隊操作,當一個線程對一個空的阻塞隊列進行出隊操作時也會阻塞,除非有另外一個線程進行了入隊操作。
    從上可知,阻塞隊列是線程安全的。

    下面是BlockingQueue接口的一些方法:

    操作拋異常特定值阻塞超時
    插入add(o)offer(o)put(o)offer(o, timeout, timeunit)
    移除remove(o)poll(o)take(o)poll(timeout, timeunit)
    檢查element(o)peek(o)

    這四類方法分別對應的是:

  • ThrowsException:如果操作不能馬上進行,則拋出異常
  • SpecialValue:如果操作不能馬上進行,將會返回一個特殊的值,一般是true或者false
  • Blocks:如果操作不能馬上進行,操作會被阻塞
  • TimesOut:如果操作不能馬上進行,操作會被阻塞指定的時間,如果指定時間沒執行,則返回一個特殊值,一般是true或者false
  • 下面來看由阻塞隊列實現的生產消費模型,這里我們使用take()和put()方法,這里生產者和生產者,消費者和消費者之間不存在同步,所以會出現連續生成和連續消費的現象

    /*** 生產者消費者* 使用阻塞隊列實現* @throws InterruptedException */public static void testProductConsumeByBlockingQueue() throws InterruptedException {//因為SynchronousQueue沒有存儲功能,因此put和take會一直阻塞,直到有另一個線程已經準備好參與到交付過程中。僅當有足夠多的消費者,并且總是有一個消費者準備好獲取交付的工作時,才適合使用同步隊列。 // final BlockingQueue<String> queue = new SynchronousQueue<String>(true);//使用有界阻塞隊列final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);Runnable producer = new Runnable() {public void run() {for(int i=0;i<100;i++) {try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}String msg = "消息:"+i;try {queue.put(msg);} catch (InterruptedException e1) {e1.printStackTrace();}System.out.println(msg+" 已發送");}}};Runnable consumer = new Runnable() {public void run() {while (true) {try {Thread.sleep(200);} catch (InterruptedException e1) {e1.printStackTrace();}String msg = null;try {msg = queue.take();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(msg+"已消費");}}};new Thread(producer).start();new Thread(consumer).start();}

    **

    4.信號量Semaphore的實現

    **
    信號量可以控制訪問相應資源的線程的數量,從而實現生產消費模型

    import java.util.concurrent.Semaphore;public class BySemaphore {int count = 0;final Semaphore put = new Semaphore(5);// 初始令牌個數final Semaphore get = new Semaphore(0);final Semaphore mutex = new Semaphore(1); //該信號量相當于鎖public static void main(String[] args) {BySemaphore bySemaphore = new BySemaphore();new Thread(bySemaphore.new Producer()).start();new Thread(bySemaphore.new Consumer()).start();new Thread(bySemaphore.new Consumer()).start();new Thread(bySemaphore.new Producer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 5; i++) {try {Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}try {put.acquire();// 注意順序mutex.acquire();count++;System.out.println("生產者" + Thread.currentThread().getName()+ "已生產完成,商品數量:" + count);} catch (Exception e) {e.printStackTrace();} finally {mutex.release();get.release();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 5; i++) {try {Thread.sleep(1000);} catch (InterruptedException e1) {e1.printStackTrace();}try {get.acquire();// 注意順序mutex.acquire();count--;System.out.println("消費者" + Thread.currentThread().getName()+ "已消費,剩余商品數量:" + count);} catch (Exception e) {e.printStackTrace();} finally {mutex.release();put.release();}}}} }

    **

    5.使用消息隊列

    **

    這個是取巧的辦法,直接使用現成的消息中間件服務(如RocketMq、RabbitMq、Kafka等),分分鐘搞定。手動微笑~~

    總結

    以上是生活随笔為你收集整理的Java实现生产消费模型的5种方式的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。