日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java中生产者和消费者总结

發布時間:2024/3/12 java 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java中生产者和消费者总结 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

? ? ? ? 生產者和消費者問題是線程模型中的經典問題,生產者和消費者在同一時間段共用同一個存儲空間,這個存儲空間是一個緩沖區的倉庫,生產者可以將產品放入倉庫,消費者可以從倉庫中取出產品。

? ? ? ?

?生產者/消費者模型是基于等待/通知機制,主要關注以下幾點:

  • 生產者生產的時候消費者不能消費
  • 消費者消費的時候生產者不能生產
  • 緩沖區空時消費者不能消費
  • 緩沖區滿時生產者不能生產
  • 主要優點:

    • 解耦。因為多了一個緩沖區,所以生產者和消費者并不直接相互調用,這一點很容易想到,這樣生產者和消費者的代碼發生變化,都不會對對方產生影響,這樣其實就把生產者和消費者之間的強耦合解開,變為了生產者和緩沖區/消費者和緩沖區之間的弱耦合
    • 通過平衡生產者和消費者的處理能力來提高整體處理數據的速度,這是生產者/消費者模型最重要的一個優點。如果消費者直接從生產者這里拿數據,如果生產者生產的速度很慢,但消費者消費的速度很快,那消費者就得占用CPU的時間片白白等在那邊。有了生產者/消費者模型,生產者和消費者就是兩個獨立的并發體,生產者把生產出來的數據往緩沖區一丟就好了,不必管消費者;消費者也是,從緩沖區去拿數據就好了,也不必管生產者,緩沖區滿了就不生產,緩沖區空了就不消費,使生產者/消費者的處理能力達到一個動態的平衡

    實現生產者和消費者的5種方式?

    wait()和notify()方法的實現:

    這也是最簡單最基礎的實現,緩沖區滿和為空時都調用wait()方法等待,當生產者生產了一個產品或者消費者消費了一個產品之后會喚醒所有線程。

    package org.example;import java.sql.SQLOutput;public class Test {private static Integer count = 0;private static final Integer Total = 10;private static String flag = "agree";public static void main(String[] args) {Test test = new Test();new Thread(test.new Producer()).start();new Thread(test.new Consumer()).start();new Thread(test.new Producer()).start();new Thread(test.new Consumer()).start();new Thread(test.new Producer()).start();new Thread(test.new Consumer()).start();new Thread(test.new Producer()).start();new Thread(test.new Consumer()).start();}public class Producer implements Runnable{@Overridepublic void run() {for (int i = 0;i < 5; i++){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}synchronized (flag) {while (count == Total) {try {flag.wait();} catch (InterruptedException e) {e.printStackTrace();}}count++;System.out.println(Thread.currentThread().getName()+"生產者共有" + count);flag.notifyAll();}}}}public class Consumer implements Runnable{@Overridepublic void run() {for(int i = 0; i < 5; i++){try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}synchronized (flag){while(count == 0){try {flag.wait();} catch (InterruptedException e) {e.printStackTrace();}}count--;System.out.println(Thread.currentThread().getName() + "消費者消費,目前總共有" + count);flag.notifyAll();}}}} }

    運行結果:

    Thread-6生產者共有1 Thread-1消費者消費,目前總共有0 Thread-0生產者共有1 Thread-4生產者共有2 Thread-5消費者消費,目前總共有1 Thread-7消費者消費,目前總共有0 Thread-2生產者共有1 Thread-3消費者消費,目前總共有0 Thread-4生產者共有1 Thread-6生產者共有2 Thread-7消費者消費,目前總共有1 Thread-0生產者共有2 Thread-1消費者消費,目前總共有1 Thread-5消費者消費,目前總共有0 Thread-2生產者共有1 Thread-3消費者消費,目前總共有0 Thread-4生產者共有1 Thread-5消費者消費,目前總共有0 Thread-2生產者共有1 Thread-3消費者消費,目前總共有0 Thread-6生產者共有1 Thread-7消費者消費,目前總共有0 Thread-0生產者共有1 Thread-1消費者消費,目前總共有0 Thread-6生產者共有1 Thread-3消費者消費,目前總共有0 Thread-4生產者共有1 Thread-0生產者共有2 Thread-1消費者消費,目前總共有1 Thread-2生產者共有2 Thread-5消費者消費,目前總共有1 Thread-7消費者消費,目前總共有0 Thread-0生產者共有1 Thread-3消費者消費,目前總共有0 Thread-6生產者共有1 Thread-7消費者消費,目前總共有0 Thread-4生產者共有1 Thread-2生產者共有2 Thread-5消費者消費,目前總共有1 Thread-1消費者消費,目前總共有0Process finished with exit code 0

    可重入鎖ReentrantLock的實現

    java.util.concurrent.lock 中的 Lock 框架是鎖定的一個抽象,通過對lock的lock()方法和unlock()方法實現了對鎖的顯示控制,而synchronize()則是對鎖的隱性控制。
    可重入鎖,也叫做遞歸鎖,指的是同一線程 外層函數獲得鎖之后 ,內層遞歸函數仍然有獲取該鎖的代碼,但不受影響,簡單來說,該鎖維護這一個與獲取鎖相關的計數器,如果擁有鎖的某個線程再次得到鎖,那么獲取計數器就加1,函數調用結束計數器就減1,然后鎖需要被釋放兩次才能獲得真正釋放。已經獲取鎖的線程進入其他需要相同鎖的同步代碼塊不會被阻塞。

    import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /*** 生產者和消費者,ReentrantLock的實現*/ public class Test{private static Integer count = 0;private static final Integer FULL = 10;//創建一個鎖對象private Lock lock = new ReentrantLock();//創建兩個條件變量,一個為緩沖區非滿,一個為緩沖區非空private final Condition notFull = lock.newCondition();private final Condition notEmpty = lock.newCondition();public static void main(String[] args) {Test test2 = new Test();new Thread(test2.new Producer()).start();new Thread(test2.new Consumer()).start();new Thread(test2.new Producer()).start();new Thread(test2.new Consumer()).start();new Thread(test2.new Producer()).start();new Thread(test2.new Consumer()).start();new Thread(test2.new Producer()).start();new Thread(test2.new Consumer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (Exception e) {e.printStackTrace();}//獲取鎖lock.lock();try {while (count == FULL) {try {notFull.await();} catch (InterruptedException e) {e.printStackTrace();}}count++;System.out.println(Thread.currentThread().getName()+ "生產者生產,目前總共有" + count);//喚醒消費者notEmpty.signal();} finally {//釋放鎖lock.unlock();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e1) {e1.printStackTrace();}lock.lock();try {while (count == 0) {try {notEmpty.await();} catch (Exception e) {e.printStackTrace();}}count--;System.out.println(Thread.currentThread().getName()+ "消費者消費,目前總共有" + count);notFull.signal();} finally {lock.unlock();}}}} }

    ?運行結果

    Thread-2生產者生產,目前總共有1 Thread-6生產者生產,目前總共有2 Thread-4生產者生產,目前總共有3 Thread-5消費者消費,目前總共有2 Thread-1消費者消費,目前總共有1 Thread-0生產者生產,目前總共有2 Thread-3消費者消費,目前總共有1 Thread-7消費者消費,目前總共有0 Thread-4生產者生產,目前總共有1 Thread-3消費者消費,目前總共有0 Thread-6生產者生產,目前總共有1 Thread-7消費者消費,目前總共有0 Thread-0生產者生產,目前總共有1 Thread-5消費者消費,目前總共有0 Thread-2生產者生產,目前總共有1 Thread-1消費者消費,目前總共有0 Thread-4生產者生產,目前總共有1 Thread-1消費者消費,目前總共有0 Thread-6生產者生產,目前總共有1 Thread-5消費者消費,目前總共有0 Thread-0生產者生產,目前總共有1 Thread-2生產者生產,目前總共有2 Thread-3消費者消費,目前總共有1 Thread-7消費者消費,目前總共有0 Thread-4生產者生產,目前總共有1 Thread-5消費者消費,目前總共有0 Thread-0生產者生產,目前總共有1 Thread-6生產者生產,目前總共有2 Thread-3消費者消費,目前總共有1 Thread-2生產者生產,目前總共有2 Thread-1消費者消費,目前總共有1 Thread-7消費者消費,目前總共有0 Thread-6生產者生產,目前總共有1 Thread-3消費者消費,目前總共有0 Thread-4生產者生產,目前總共有1 Thread-0生產者生產,目前總共有2 Thread-2生產者生產,目前總共有3 Thread-1消費者消費,目前總共有2 Thread-7消費者消費,目前總共有1 Thread-5消費者消費,目前總共有0 Thread-4生產者生產,目前總共有1 Thread-6生產者生產,目前總共有2 Thread-3消費者消費,目前總共有1 Thread-2生產者生產,目前總共有2 Thread-7消費者消費,目前總共有1 Thread-0生產者生產,目前總共有2 Thread-5消費者消費,目前總共有1 Thread-1消費者消費,目前總共有0 Thread-6生產者生產,目前總共有1 Thread-4生產者生產,目前總共有2 Thread-3消費者消費,目前總共有1 Thread-2生產者生產,目前總共有2 Thread-5消費者消費,目前總共有1 Thread-7消費者消費,目前總共有0 Thread-0生產者生產,目前總共有1 Thread-1消費者消費,目前總共有0 Thread-6生產者生產,目前總共有1 Thread-4生產者生產,目前總共有2 Thread-3消費者消費,目前總共有1 Thread-7消費者消費,目前總共有0 Thread-0生產者生產,目前總共有1 Thread-2生產者生產,目前總共有2 Thread-5消費者消費,目前總共有1 Thread-1消費者消費,目前總共有0 Thread-6生產者生產,目前總共有1 Thread-3消費者消費,目前總共有0 Thread-4生產者生產,目前總共有1 Thread-1消費者消費,目前總共有0 Thread-0生產者生產,目前總共有1 Thread-5消費者消費,目前總共有0 Thread-2生產者生產,目前總共有1 Thread-7消費者消費,目前總共有0 Thread-4生產者生產,目前總共有1 Thread-6生產者生產,目前總共有2 Thread-3消費者消費,目前總共有1 Thread-2生產者生產,目前總共有2 Thread-0生產者生產,目前總共有3 Thread-5消費者消費,目前總共有2 Thread-7消費者消費,目前總共有1 Thread-1消費者消費,目前總共有0Process finished with exit code 0

    阻塞隊列BlockingQueue的實現

    BlockingQueue即阻塞隊列,從阻塞這個詞可以看出,在某些情況下對阻塞隊列的訪問可能會造成阻塞。被阻塞的情況主要有如下兩種:

  • 當隊列滿了的時候進行入隊列操作
  • 當隊列空了的時候進行出隊列操作
    因此,當一個線程對已經滿了的阻塞隊列進行入隊操作時會阻塞,除非有另外一個線程進行了出隊操作,當一個線程對一個空的阻塞隊列進行出隊操作時也會阻塞,除非有另外一個線程進行了入隊操作。
    從上可知,阻塞隊列是線程安全的。
    下面是BlockingQueue接口的一些方法:
  • import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /*** 使用BlockingQueue實現生產者消費者模型*/ public class Test3 {private static Integer count = 0;//創建一個阻塞隊列final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10);public static void main(String[] args) {Test3 test3 = new Test3();new Thread(test3.new Producer()).start();new Thread(test3.new Consumer()).start();new Thread(test3.new Producer()).start();new Thread(test3.new Consumer()).start();new Thread(test3.new Producer()).start();new Thread(test3.new Consumer()).start();new Thread(test3.new Producer()).start();new Thread(test3.new Consumer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (Exception e) {e.printStackTrace();}try {blockingQueue.put(1);count++;System.out.println(Thread.currentThread().getName()+ "生產者生產,目前總共有" + count);} catch (InterruptedException e) {e.printStackTrace();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e1) {e1.printStackTrace();}try {blockingQueue.take();count--;System.out.println(Thread.currentThread().getName()+ "消費者消費,目前總共有" + count);} catch (InterruptedException e) {e.printStackTrace();}}}} }

    ?

    信號量Semaphore的實現?

    Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源,在操作系統中是一個非常重要的問題,可以用來解決哲學家就餐問題。Java中的Semaphore維護了一個許可集,一開始先設定這個許可集的數量,可以使用acquire()方法獲得一個許可,當許可不足時會被阻塞,release()添加一個許可。在下列代碼中,還加入了另外一個mutex信號量,維護生產者消費者之間的同步關系,保證生產者和消費者之間的交替進行

    import java.util.concurrent.Semaphore; /*** 使用semaphore信號量實現*/ public class Test4 {private static Integer count = 0;//創建三個信號量final Semaphore notFull = new Semaphore(10);final Semaphore notEmpty = new Semaphore(0);final Semaphore mutex = new Semaphore(1);public static void main(String[] args) {Test4 test4 = new Test4();new Thread(test4.new Producer()).start();new Thread(test4.new Consumer()).start();new Thread(test4.new Producer()).start();new Thread(test4.new Consumer()).start();new Thread(test4.new Producer()).start();new Thread(test4.new Consumer()).start();new Thread(test4.new Producer()).start();new Thread(test4.new Consumer()).start();}class Producer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}try {notFull.acquire();mutex.acquire();count++;System.out.println(Thread.currentThread().getName()+ "生產者生產,目前總共有" + count);} catch (InterruptedException e) {e.printStackTrace();} finally {mutex.release();notEmpty.release();}}}}class Consumer implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; i++) {try {Thread.sleep(3000);} catch (InterruptedException e1) {e1.printStackTrace();}try {notEmpty.acquire();mutex.acquire();count--;System.out.println(Thread.currentThread().getName()+ "消費者消費,目前總共有" + count);} catch (InterruptedException e) {e.printStackTrace();} finally {mutex.release();notFull.release();}}}} }

    管道輸入輸出流PipedInputStream和PipedOutputStream實現

    ?

    在java的io包下,PipedOutputStream和PipedInputStream分別是管道輸出流和管道輸入流。
    它們的作用是讓多線程可以通過管道進行線程間的通訊。在使用管道通信時,必須將PipedOutputStream和PipedInputStream配套使用。
    使用方法:先創建一個管道輸入流和管道輸出流,然后將輸入流和輸出流進行連接,用生產者線程往管道輸出流中寫入數據,消費者在管道輸入流中讀取數據,這樣就可以實現了不同線程間的相互通訊,但是這種方式在生產者和生產者、消費者和消費者之間不能保證同步,也就是說在一個生產者和一個消費者的情況下是可以生產者和消費者之間交替運行的,多個生成者和多個消費者者之間則不行

    /*** 使用管道實現生產者消費者模型*/ public class Test5 {final PipedInputStream pis = new PipedInputStream();final PipedOutputStream pos = new PipedOutputStream();{try {pis.connect(pos);} catch (IOException e) {e.printStackTrace();}}class Producer implements Runnable {@Overridepublic void run() {try {while(true) {Thread.sleep(1000);int num = (int) (Math.random() * 255);System.out.println(Thread.currentThread().getName() + "生產者生產了一個數字,該數字為: " + num);pos.write(num);pos.flush();} } catch (Exception e) {e.printStackTrace();} finally {try {pos.close();pis.close();} catch (IOException e) {e.printStackTrace();}}}}class Consumer implements Runnable {@Overridepublic void run() {try {while(true) {Thread.sleep(1000);int num = pis.read();System.out.println("消費者消費了一個數字,該數字為:" + num);}} catch (Exception e) {e.printStackTrace();} finally {try {pos.close();pis.close();} catch (IOException e) {e.printStackTrace();}}}}public static void main(String[] args) {Test5 test5 = new Test5();new Thread(test5.new Producer()).start();new Thread(test5.new Consumer()).start();} }

    參考

    《Thinking In Java》

    Java實現生產者和消費者的5種方式

    總結

    以上是生活随笔為你收集整理的Java中生产者和消费者总结的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。