用Java解决生产者-消费者问题
生活随笔
收集整理的這篇文章主要介紹了
用Java解决生产者-消费者问题
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
當(dāng)我們嘗試多線程編程時(shí),生產(chǎn)者-消費(fèi)者問題是最常見的問題之一。 盡管不像多線程編程中的其他一些問題那樣具有挑戰(zhàn)性,但是錯(cuò)誤地實(shí)現(xiàn)此問題可能會(huì)造成應(yīng)用程序混亂。 生產(chǎn)的物品將不使用,開始的物品將被跳過,消耗量取決于生產(chǎn)是在消耗嘗試之前還是之后開始的,等等。此外,您可能會(huì)在異常發(fā)生后很長(zhǎng)時(shí)間注意到異常,最重要的是,幾乎所有異常線程程序,這一程序也很難調(diào)試和復(fù)制。 因此,在這篇文章中,我認(rèn)為我將嘗試借助Java出色的java.util.concurrent包及其類來解決Java中的此問題。 首先,讓我們看一下生產(chǎn)者-消費(fèi)者問題的特征:
- 生產(chǎn)者生產(chǎn)物品。
- 消費(fèi)者消費(fèi)生產(chǎn)者生產(chǎn)的物品。
- 生產(chǎn)者完成生產(chǎn),并讓消費(fèi)者知道他們已經(jīng)完成了。
- 消耗該項(xiàng)目的步驟獨(dú)立產(chǎn)生,而不依賴于其他項(xiàng)目。
- 處理項(xiàng)目的時(shí)間大于生產(chǎn)項(xiàng)目的時(shí)間。
- 可以有多個(gè)生產(chǎn)者。
- 將有多個(gè)消費(fèi)者。
- 一旦完成新物品的生產(chǎn),生產(chǎn)者將告知消費(fèi)者,以便消費(fèi)者在消費(fèi)并加工完最后一件物品后退出。
在這里,可以由多個(gè)類似商品的生產(chǎn)者共享消費(fèi)者。 類似的項(xiàng)目,我的意思是生產(chǎn)者,其生產(chǎn)“項(xiàng)目”類型的對(duì)象。 Item的定義如下:
package com.maximus.consumer;public interface Item {public void process(); }現(xiàn)在我們來看一下Consumer接口的實(shí)現(xiàn):
package com.maximus.consumer;import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue;public class ConsumerImpl implements Consumer {private BlockingQueue< Item > itemQueue = new LinkedBlockingQueue<Item>();private ExecutorService executorService = Executors.newCachedThreadPool();private List<ItemProcessor> jobList = new LinkedList<ItemProcessor>();private volatile boolean shutdownCalled = false;public ConsumerImpl(int poolSize){for(int i = 0; i < poolSize; i++){ItemProcessor jobThread = new ItemProcessor(itemQueue);jobList.add(jobThread);executorService.submit(jobThread);}}public boolean consume(Item j){if(!shutdownCalled){try{itemQueue.put(j);}catch(InterruptedException ie){Thread.currentThread().interrupt();return false;}return true;}else{return false;}}public void finishConsumption(){for(ItemProcessor j : jobList){j.cancelExecution();}executorService.shutdown();} }現(xiàn)在,唯一感興趣的點(diǎn)是消費(fèi)者內(nèi)部用于處理傳入商品的ItemProcessor。 ItemProcessor的編碼如下:
package com.maximus.consumer;import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit;public class ItemProcessor implements Runnable {private BlockingQueue<Item> jobQueue;private volatile boolean keepProcessing;public ItemProcessor(BlockingQueue<Item> queue){jobQueue = queue;keepProcessing = true;}public void run(){while(keepProcessing || !jobQueue.isEmpty()){try{Item j = jobQueue.poll(10, TimeUnit.SECONDS);if(j != null){j.process();}}catch(InterruptedException ie){Thread.currentThread().interrupt();return;}}}public void cancelExecution(){this.keepProcessing = false;} } 上面唯一的挑戰(zhàn)是while循環(huán)中的條件。 這樣編寫while循環(huán),即使在生產(chǎn)者完成生產(chǎn)并通知消費(fèi)者生產(chǎn)完成之后,也可以支持項(xiàng)目消耗的繼續(xù)。 上面的while循環(huán)可確保在線程退出之前完成所有項(xiàng)目的消耗。 上面的使用者是線程安全的,可以共享多個(gè)生產(chǎn)者,以便每個(gè)生產(chǎn)者可以并發(fā)調(diào)用consumer.consume(),而不必?fù)?dān)心同步和其他多線程警告。 生產(chǎn)者只需要提交Item接口的實(shí)現(xiàn),其process()方法將包含如何完成消耗的邏輯。 作為閱讀本文的獎(jiǎng)勵(lì),我提出了一個(gè)測(cè)試程序,演示了如何使用上述類: package com.maximus.consumer;import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.InputStreamReader;public class Test {public static void main(String[] args) throws Exception{Consumer consumer = new ConsumerImpl(10);BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(new File(args[0]))));String line = "";while((line = br.readLine()) != null){System.out.println("Producer producing: " + line);consumer.consume(new PrintJob(line));}consumer.finishConsumption();} }class PrintJob implements Item {private String line;public PrintJob(String s){line = s;}public void process(){System.out.println(Thread.currentThread().getName() + " consuming :" + line);} } 可以通過多種不同的方式來調(diào)整上述消費(fèi)者,使其更加靈活。 我們可以定義生產(chǎn)完成后消費(fèi)者將做什么。 可能對(duì)其進(jìn)行了調(diào)整,以允許批處理,但我將其留給用戶使用。 隨意使用它,并以任何想要的方式扭曲它。 編碼愉快!參考: The Java HotSpot博客上的JCG合作伙伴 Sarma Swaranga 解決了Java中的生產(chǎn)者-消費(fèi)者問題 。
翻譯自: https://www.javacodegeeks.com/2012/05/solving-producer-consumer-problem-in.html
總結(jié)
以上是生活随笔為你收集整理的用Java解决生产者-消费者问题的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 动态ADF火车:以编程方式添加火车停靠站
- 下一篇: 超出了GC开销限制– Java堆分析