日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

并发模式:生产者和消费者

發布時間:2023/12/3 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 并发模式:生产者和消费者 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
在我15年的職業生涯中,生產者和消費者的問題是我僅遇到過幾次。 在大多數編程情況下,我們正在做的事情是以同步方式執行功能,其中JVM或Web容器自行處理多線程的復雜性。 但是,在編寫某些需要的用例時。 上周,我遇到了一個這樣的用例,使我在上一次這樣做的時候回溯了三年。 但是,上次完成的方式卻大不相同。

當我第一次聽到問題陳述時,我立即知道需要什么。 但是,這次我的做法與上次有所不同。 這與我今天如何看待技術有關。 我不會涉足任何非技術方面,并且會直接跳入問題及其解決方案。 我開始研究市場上存在的東西,并發現了幾篇文章,這些文章幫助我以正確的方式傳達了我的想法。

問題陳述

我們需要一個用于批量遷移的解決方案。 我們正在將數據從系統1遷移到系統2,在此過程中,我們需要執行以下三個任務:

  • 根據組從數據庫加載數據
  • 處理數據
  • 通過修改來更新在步驟1中加載的記錄

我們必須處理100個小組,每個小組大約有4萬條記錄。 您可以想象如果我們以同步方式執行此練習將花費多少時間。 這里的圖像有效地解釋了這個問題。

生產者消費者:問題

生產者和消費者模式

首先讓我們看一下生產者消費者模式。 如果您參考上面的問題說明并查看圖片,我們會看到有太多實體準備使用其部分數據。 但是,沒有足夠的工人可以處理所有數據。 因此,隨著生產者繼續排隊,它只會繼續增長。 我們看到系統開始占用線程并花費大量時間。

中級解決方案

生產者消費者:中級方法

我們確實有一個中間解決方案。 參考該圖像,您將立即注意到,生產者將他們的工作堆積在文件柜中,而工人在完成上一項任務時繼續將其撿起來。 但是,這種方法確實存在一些明顯的缺點:

  • 仍然只有一名工人必須完成所有工作。 外部系統可能很高興,但是任務將繼續存在,直到工作人員完成所有任務為止
  • 生產者將他們的數據堆積在隊列中,并且需要資源來保存它們。 就像在此示例中,機柜可以裝滿一樣,JVM資源也可能發生同樣的情況。 我們需要注意要在內存中放入多少數據,在某些情況下可能不會太多。
  • 解決方案

    生產者消費者:解決方案

    解決方案是我們每天在很多地方都能看到的,例如電影院大廳排隊,汽油泵等。有很多人來訂票,而根據進來的人數,增加了更多的人來發行票。 本質上,請參考此處的圖像,您會注意到生產者將繼續向內閣添加他們的工作,并且我們有更多的工人來處理工作量。

    Java提供了并發包來解決此問題。 到現在為止,我一直在較低級別上進行線程工作,這是我第一次使用此程序包。 當我開始瀏覽網絡并閱讀其他博客作者的言論時,我遇到了一篇非常好的文章 。 它有助于非常有效地理解BlockingQueue的使用。 但是,Dhruba提供的解決方案并不能幫助我實現所需的高吞吐量。 因此,我開始探索對ArrayBlockingQueue的使用。

    控制器

    這是管理生產者和消費者之間的合同的第一類。 控制器將為生產者設置1個線程,為消費者設置2個線程。 根據需要,我們可以創建所需數量的線程。 甚至甚至可以從屬性中讀取數據或做一些動態魔術。 現在,我們將保持簡單。

    package com.kapil.techieforever.producerconsumer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TestProducerConsumer { public static void main(String args[]) { try { Broker broker = new Broker(); ExecutorService threadPool = Executors.newFixedThreadPool(3); threadPool.execute(new Consumer("1", broker)); threadPool.execute(new Consumer("2", broker)); Future producerStatus = threadPool.submit(new Producer(broker)); // this will wait for the producer to finish its execution. producerStatus.get(); threadPool.shutdown(); } catch (Exception e) { e.printStackTrace(); } } }

    我正在使用ExecuteService創建線程池并對其進行管理。 代替使用基本的Thread實現,這是一種更有效的方法,因為它將根據需要處理退出和重新啟動線程。 您還將注意到,我正在使用Future類來獲取生產者線程的狀態。 該類非常有效,它將使我的程序停止進一步執行。 這是在線程上替換“ .join”方法的一種好方法。 注意:在這個例子中,我并不是很有效地使用Future。 因此您可能需要嘗試一些適合自己的事情。
    另外,您還應注意在生產者和消費者之間用作文件柜的Broker類。 我們將在短時間內看到它的實現。

    生產者

    此類負責產生需要處理的數據。

    package com.kapil.techieforever.producerconsumer; public class Producer implements Runnable { private Broker broker; public Producer(Broker broker) { this.broker = broker; } @Override public void run() { try { for (Integer i = 1; i < 5 + 1; ++i) { System.out.println("Producer produced: " + i); Thread.sleep(100); broker.put(i); } this.broker.continueProducing = Boolean.FALSE; System.out.println("Producer finished its job; terminating."); } catch (InterruptedException ex) { ex.printStackTrace(); } } }

    此類正在做它所能做的最簡單的事情-向代理添加一個整數。 需要注意的一些關鍵領域是:
    1. Broker上有一個屬性,生產者在完成生產后最終會對其進行更新。 這也稱為“最終”或“毒藥”條目。 消費者使用它來知道不再有數據 2.我使用Thread.sleep來模擬某些生產者可能需要更多時間來生產數據。 您可以調整此值并查看消費者的行為

    消費者

    此類負責從代理讀取數據并完成其工作

    package com.kapil.techieforever.producerconsumer; public class Consumer implements Runnable { private String name; private Broker broker; public Consumer(String name, Broker broker) { this.name = name; this.broker = broker; } @Override public void run() { try { Integer data = broker.get(); while (broker.continueProducing || data != null) { Thread.sleep(1000); System.out.println("Consumer " + this.name + " processed data from broker: " + data); data = broker.get(); } System.out.println("Comsumer " + this.name + " finished its job; terminating."); } catch (InterruptedException ex) { ex.printStackTrace(); } } }

    這還是一個簡單的類,它讀取Integer并將其打印在控制臺上。 但是,要注意的關鍵點是:
    1.處理數據的循環是一個無限循環,它在兩種情況下運行–直到生產者消費并且經紀人有一些數據為止
    2.同樣,Thread.sleep用于創建有效的不同方案

    經紀人

    package com.kapil.techieforever.producerconsumer; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; public class Broker { public ArrayBlockingQueue queue = new ArrayBlockingQueue(100); public Boolean continueProducing = Boolean.TRUE; public void put(Integer data) throws InterruptedException { this.queue.put(data); } public Integer get() throws InterruptedException { return this.queue.poll(1, TimeUnit.SECONDS); } }

    首先要注意的是,我們使用ArrayBlockingQueue作為數據持有人。 我不會說這是什么,而是要您在此處的JavaDocs上閱讀它。 但是,我將解釋生產者將把數據放入隊列,而使用者將以FIFO格式從隊列中獲取數據。 但是,如果生產者運行緩慢,則消費者將等待數據進入,如果陣列已滿,生產者將等待數據填滿。

    另外,請注意,我使用的是“投票”功能,而不是進入隊列。 這是為了確保消費者不會一直等待,等待會在幾秒鐘后超時。 這有助于我們進行相互交流,并在處理完所有數據后殺死消費者。 (注意:嘗試用get代替poll,您將看到一些有趣的輸出)。

    我的代碼位于Google項目托管上 。 隨意瀏覽并從那里下載。 本質上,這是一個蝕(Spring STS)項目。 根據下載時間,您可能還會在下載時獲得其他軟件包和類。 也可以隨意查看這些內容并分享您的評論
    –您可以在SVN瀏覽器中瀏覽源代碼,或者;
    –您可以從項目本身下載它 。

    側面解決方案

    最初,我在中間發布了此解決方案,但是后來我意識到這不是做事的方法,因此我從主要內容中刪除了此內容,并將其放在最后。 最終解決方案的另一種變體是,工人/消費者一次不處理一項工作,而是一起處理多個工作,并在完成下一個工作之前先完成工作。 這種方法可以產生相似的結果,但是在某些情況下,如果我們有一些工作需要花費不同的時間才能完成,那么從本質上講,這意味著某些工人比其他工人最終會更快地結束工作,從而造成了瓶頸。 并且,如果作業是事先分配的,這意味著所有消費者將在加工之前擁有所有作業(不是生產者-消費者模式),那么這個問題可能加起來甚至更多,并導致處理邏輯的更多延遲。

    相關文章

    • 隊列是Devil自己的數據結構 (petewarden.typepad.com)
    • 我對撒但的小幫手排隊有錯嗎? (petewarden.typepad.com)
    • http://code.google.com/p/disruptor/

    參考: 并發模式: JCG合作伙伴的 生產者和消費者 ? Scratch Pad博客上的Kapil Viren Ahuja。


    翻譯自: https://www.javacodegeeks.com/2012/02/concurrency-pattern-producer-and.html

    總結

    以上是生活随笔為你收集整理的并发模式:生产者和消费者的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。