使用LinkedBlockingQueue来实现生产者消费者的例子
生活随笔
收集整理的這篇文章主要介紹了
使用LinkedBlockingQueue来实现生产者消费者的例子
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
工作中,經(jīng)常有將文件中的數(shù)據(jù)導(dǎo)入數(shù)據(jù)庫的表中,或者將數(shù)據(jù)庫表中的記錄保存到文件中。為了提高程序的處理速度,可以設(shè)置讀線程和寫線程,這些線程通過消息隊(duì)列進(jìn)行數(shù)據(jù)交互。本例就是使用了LinkedBlockingQueue來模仿生產(chǎn)者線程和消費(fèi)者線程進(jìn)行數(shù)據(jù)生產(chǎn)和消費(fèi)。
為了方便,這些不同的類被寫在了一個類中,實(shí)際使用的時候,可以單獨(dú)拆開,舉一反三地使用。
以下是例子:
LinkedBlockingQueueDemo.java
?
import java.util.Date; import java.util.Random; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit;public class LinkedBlockingQueueDemo {// 生產(chǎn)者線程數(shù)量private final static int providerThreadAmount = 5;// 記錄每一個生產(chǎn)者線程是否處理完畢的標(biāo)記private static boolean[] providerDoneFlag = new boolean[providerThreadAmount];// 整個所有的生產(chǎn)者線程全部結(jié)束的標(biāo)記private static boolean done = false;// 一個線程安全的隊(duì)列,用于生產(chǎn)者和消費(fèi)者異步地信息交互private static LinkedBlockingQueue<String> linkedBlockingQeque = new LinkedBlockingQueue<String>();static class ProviderThread extends Thread {private Thread thread;private String threadName;private int threadNo;public ProviderThread(String threadName2, int threadNo) {this.threadName = threadName2;this.threadNo = threadNo;}public void start() {if (thread == null) {thread = new Thread(this, threadName);}thread.start();System.out.println((new Date().getTime()) + " " + threadName + " starting... " + Thread.currentThread().getName());}@Overridepublic void run() {int rows = 0;for (int i = 0; i < 100; i++) {String string = String.format("%s-%d-%s", threadName, i, Thread.currentThread().getName());// offer不會去阻塞線程,put會//linkedBlockingQeque.offer(string); linkedBlockingQeque.put(string);rows++;/** try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch* (InterruptedException e) { e.printStackTrace(); }*/}// 本線程處理完畢的標(biāo)記LinkedBlockingQueueDemo.providerDoneFlag[threadNo] = true;System.out.println((new Date().getTime()) + " " + threadName + " end. total rows is " + rows + "\t"+ Thread.currentThread().getName());}}static class ConsumerThread implements Runnable {private Thread thread;private String threadName;public ConsumerThread(String threadName2) {this.threadName = threadName2;}public void start() {if (thread == null) {thread = new Thread(this, threadName);}thread.start();System.out.println((new Date().getTime()) + " " + threadName + " starting... " + Thread.currentThread().getName());}@Overridepublic void run() {int rows = 0;// 生產(chǎn)者線程沒有結(jié)束,或者消息隊(duì)列中有元素的時候,去隊(duì)列中取數(shù)據(jù)while (LinkedBlockingQueueDemo.getDone() == false || linkedBlockingQeque.isEmpty() == false) {try {//在甘肅電信的實(shí)際應(yīng)用中發(fā)現(xiàn),當(dāng)數(shù)據(jù)的處理量達(dá)到千萬級的時候,帶參數(shù)的poll會將主機(jī)的幾百個G的內(nèi)存耗盡,jvm會提示申請內(nèi)存失敗,并將進(jìn)程退出。網(wǎng)上說,這是這個方法的一個bug。//String string = linkedBlockingQeque.poll(3, TimeUnit.SECONDS);String string = linkedBlockingQeque.poll();if (string == null) {continue;}rows++;System.out.println((new Date().getTime()) + " " + threadName + " get msg from linkedBlockingQeque is "+ string + "\t" + Thread.currentThread().getName());/** try { Thread.sleep((new Random()).nextInt(5) * 1000); } catch* (InterruptedException e) { e.printStackTrace(); }*/} catch (InterruptedException e) {e.printStackTrace();}}System.out.println((new Date().getTime()) + " " + threadName + " end total rows is " + rows + "\t"+ Thread.currentThread().getName());}}public static synchronized void setDone(boolean flag) {LinkedBlockingQueueDemo.done = flag;}public static synchronized boolean getDone() {return LinkedBlockingQueueDemo.done;}public static void main(String[] args) {System.out.println((new Date().getTime()) + " " + "process begin at " + Thread.currentThread().getName());System.out.println((new Date().getTime()) + " " + "linkedBlockingDeque.hashCode() is " + linkedBlockingQeque.hashCode());// 啟動若干生產(chǎn)者線程for (int i = 0; i < providerThreadAmount; i++) {String threadName = String.format("%s-%d", "ProviderThread", i);ProviderThread providerThread = new ProviderThread(threadName, i);providerThread.start();}// 啟動若干個消費(fèi)者線程for (int i = 0; i < 10; i++) {String threadName = String.format("%s-%d", "ConsumerThread", i);ConsumerThread consumerThread = new ConsumerThread(threadName);consumerThread.start();}// 循環(huán)檢測生產(chǎn)者線程是否處理完畢do {for (boolean b : providerDoneFlag) {if (b == false) {/** try { Thread.sleep(3 * 1000); System.out.println((new Date().getTime()) +* " "+"sleep 3 seconds. linkedBlockingQeque.size() is "+linkedBlockingQeque.* size() + "\t" + Thread.currentThread().getName()); } catch* (InterruptedException e) { e.printStackTrace(); }*/// 只要有一個生產(chǎn)者線程沒有結(jié)束,則整個生產(chǎn)者線程檢測認(rèn)為沒有結(jié)束break;}LinkedBlockingQueueDemo.setDone(true);}// 生產(chǎn)者線程全部結(jié)束的時候,跳出檢測if (LinkedBlockingQueueDemo.getDone() == true) {break;}} while (true);System.out.println((new Date().getTime()) + " process done successfully\t" + Thread.currentThread().getName());} }?
?
結(jié)果略。
轉(zhuǎn)載于:https://www.cnblogs.com/babyha/p/9765846.html
總結(jié)
以上是生活随笔為你收集整理的使用LinkedBlockingQueue来实现生产者消费者的例子的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 随笔001
- 下一篇: protel99se 问题汇总(不定期更