java多线程阻塞队列_阻塞队列和多线程消费者,如何知道何时停止
我有一個單線程生成器,它創建一些任務對象,然后添加到 ArrayBlockingQueue (具有固定大小) .
我也開始了一個多線程的消費者 . 這是一個固定的線程池( Executors.newFixedThreadPool(threadCount); ) . 然后我向這個threadPool提交了一些ConsumerWorker入口,每個ConsumerWorker都對上面提到的ArrayBlockingQueue實例進行了引用 .
每個這樣的工作人員都會在隊列中執行 take() 并處理該任務 .
我的問題是,當沒有更多的工作要做時,讓 Worker 知道的最佳方法是什么 . 換句話說,如何告訴Workers, 生產環境 者已經完成了對隊列的添加,從這一點開始,每個工作人員在看到Queue為空時應該停止 .
我現在得到的是一個設置,其中我的Producer初始化了一個回調,當他完成它的工作(向隊列中添加東西)時會觸發回調 . 我還保留了我創建并提交給ThreadPool的所有ConsumerWorkers的列表 . 當Producer Callback告訴我 生產環境 者已完成時,我可以告訴每個 Worker . 此時,他們應該繼續檢查隊列是否為空,當它變為空時,它們應該停止,從而允許我優雅地關閉ExecutorService線程池 . 就是這樣的
public class ConsumerWorker implements Runnable{
private BlockingQueue inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}
這里的問題是我有一個明顯的競爭條件,有時 生產環境 者將完成,發出信號,消費者工作者將在消耗隊列中的所有內容之前停止 .
我的問題是,同步這個的最佳方法是什么,以便一切正常?我應該同步整個部分來檢查 生產環境 者是否正在運行加上如果隊列是空的加上從隊列中取出一些塊(在隊列對象上)?我應該只在ConsumerWorker實例上同步 isRunning 布爾的更新嗎?還有其他建議嗎?
更新,這里是我最終使用的工作實施:
public class ConsumerWorker implements Runnable{
private BlockingQueue inputQueue;
private final static Produced POISON = new Produced(-1);
public ConsumerWorker(BlockingQueue inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(true) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Produced queueElement = inputQueue.take();
Thread.sleep(new Random().nextInt(100));
if(queueElement==POISON) {
break;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Consumer "+Thread.currentThread().getName()+" END");
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
try {
inputQueue.put(POISON);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
這很大程度上受到JohnVint在下面的回答的啟發,只有一些小修改 .
===由于@ vendhan的評論而更新 .
謝謝你的觀察 . 你是對的,這個問題的第一個代碼片段(在其他問題中)是 while(isRunning || !inputQueue.isEmpty()) 沒有真正意義的那個 .
在我實際的最終實現中,我做了一些更接近你更換“||”的建議 . (或)用“&&”(和),從某種意義上說,每個 Worker (消費者)現在只檢查他從列表中得到的元素是否是毒丸,如果是的話就停止了(理論上我們可以說 Worker 有要運行并且隊列不能為空) .
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的java多线程阻塞队列_阻塞队列和多线程消费者,如何知道何时停止的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java取消按钮事件_java按钮事件处
- 下一篇: java 钉钉获取用户信息,JAVA m