日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程语言 > asp.net >内容正文

asp.net

多线程设计模式(四):生产者-消费模式

發(fā)布時(shí)間:2024/8/26 asp.net 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 多线程设计模式(四):生产者-消费模式 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
生產(chǎn)者-消費(fèi)模式,通常有兩類線程,即若干個(gè)生產(chǎn)者線程和若干個(gè)消費(fèi)者線程。生產(chǎn)者線程負(fù)責(zé)提交用戶請求,消費(fèi)者線程負(fù)責(zé)具體處理生產(chǎn)者提交的任務(wù)。兩者之間通過共享內(nèi)存緩沖去進(jìn)行通信。

一、架構(gòu)模式圖:

類圖:

生產(chǎn)者:提交用戶請求,提取用戶任務(wù),并裝入內(nèi)存緩沖區(qū);

消費(fèi)者:在內(nèi)存緩沖區(qū)中提取并處理任務(wù);

內(nèi)存緩沖區(qū):緩存生產(chǎn)者提交的任務(wù)或數(shù)據(jù),供消費(fèi)者使用;

任務(wù):生產(chǎn)者向內(nèi)存緩沖區(qū)提交的數(shù)據(jù)結(jié)構(gòu);

Main:使用生產(chǎn)者和消費(fèi)者的客戶端。

?

二、代碼實(shí)現(xiàn)一個(gè)基于生產(chǎn)者-消費(fèi)者模式的求整數(shù)平方的并行計(jì)算:

(1)Producer生產(chǎn)者線程:

?

[java]?view plaincopy
  • <span?style="font-size:18px;">package?ProducerConsumer;??
  • ??
  • import?java.util.Random;??
  • import?java.util.concurrent.BlockingQueue;??
  • import?java.util.concurrent.TimeUnit;??
  • import?java.util.concurrent.atomic.AtomicInteger;??
  • ??
  • public?class?Producer??implements?Runnable{??
  • ??????
  • ????//Volatile修飾的成員變量在每次被線程訪問時(shí),都強(qiáng)迫從共享內(nèi)存中重讀該成員變量的值。??
  • ????//而且,當(dāng)成員變量發(fā)生變化時(shí),強(qiáng)迫線程將變化值回寫到共享內(nèi)存。??
  • ????//這樣在任何時(shí)刻,兩個(gè)不同的線程總是看到某個(gè)成員變量的同一個(gè)值。??
  • ????private?volatile??boolean?isRunning=?true;??
  • ??????
  • ????//內(nèi)存緩沖區(qū)??
  • ????private?BlockingQueue<PCData>?queue;??
  • ??????
  • ????//總數(shù),原子操作??
  • ????private?static?AtomicInteger?count?=?new?AtomicInteger();??
  • ???????
  • ????private?static?final?int?SLEEPTIME=1000;??
  • ??????
  • ??????
  • ????public?Producer(BlockingQueue<PCData>?queue)?{??
  • ??????????
  • ????????this.queue?=?queue;??
  • ????}??
  • ??
  • ??
  • ??
  • ??
  • ????@Override??
  • ????public?void?run()?{??
  • ????????PCData?data=null;??
  • ????????Random?r??=?new?Random();??
  • ????????System.out.println("start?producer?id?=?"+?Thread?.currentThread().getId());??
  • ????????try{??
  • ????????????while(isRunning){??
  • ????????????????Thread.sleep(r.nextInt(SLEEPTIME));??
  • ????????????????//構(gòu)造任務(wù)數(shù)據(jù)??
  • ????????????????data=?new?PCData(count.incrementAndGet());??
  • ????????????????System.out.println("data?is?put?into?queue?");??
  • ????????????????//提交數(shù)據(jù)到緩沖區(qū)??
  • ????????????????if(!queue.offer(data,2,TimeUnit.SECONDS)){??
  • ????????????????????System.out.println("faile?to??put?data:??"+?data);??
  • ????????????????}??
  • ????????????}??
  • ????????}catch?(InterruptedException?e){??
  • ????????????e.printStackTrace();??
  • ????????????Thread.currentThread().interrupt();??
  • ??????????????
  • ????????}??
  • ??????????
  • ??????????
  • ????}??
  • ??
  • ????public?void?stop(){??
  • ??????????
  • ????????isRunning=false;??
  • ????}??
  • ??
  • ??
  • }??
  • </span>??
  • ?

    ?

    ?

    ?

    (2)Consumer消費(fèi)者線程:

    ?

    [java]?view plaincopy
  • <span?style="font-size:18px;">package?ProducerConsumer;??
  • ??
  • import?java.text.MessageFormat;??
  • import?java.util.Random;??
  • import?java.util.concurrent.BlockingQueue;??
  • ??
  • public?class?Consumer?implements?Runnable?{??
  • ????//緩沖區(qū)?????
  • ????private?BlockingQueue<PCData>?queue;??
  • ????private?static?final?int?SLEEPTIME=1000;??
  • ??????
  • ??????
  • ????public?Consumer(BlockingQueue<PCData>?queue)?{??????????
  • ????????this.queue?=?queue;??
  • ????}??
  • ??
  • ??
  • ????@Override??
  • ????public?void?run()?{??
  • ????????System.out.println("start?Consumer?id=?"+?Thread?.currentThread().getId());??
  • ????????Random?r?=?new?Random();??
  • ??????????
  • ????????????try?{??
  • ????????????????//提取任務(wù)??
  • ????????????????while(true){??
  • ????????????????????PCData?data=?queue.take();??
  • ????????????????????if(null!=?data){??
  • ????????????????????????//計(jì)算平方??
  • ????????????????????????int?re=?data.getData()*data.getData();??
  • ????????????????????????System.out.println(MessageFormat.format("{0}*{1}={2}",??
  • ????????????????????????????????????data.getData(),data.getData(),re??
  • ????????????????????????????????));??
  • ????????????????????????Thread.sleep(r.nextInt(SLEEPTIME));??
  • ??????????????????????????????????????????????????
  • ????????????????????}??
  • ????????????????}??
  • ????????????}?catch?(InterruptedException?e)?{????????????????
  • ????????????????e.printStackTrace();??
  • ????????????????Thread.currentThread().interrupt();??
  • ????????????}??
  • ??????????????
  • ??????????
  • ??????????
  • ????}??
  • ??????
  • ??????
  • ??
  • ??????
  • ??
  • }??
  • </span>??
  • ?

    ?

    ?

    ?

    (3)PCData共享數(shù)據(jù)模型:

    [java]?view plaincopy
  • <span?style="font-size:18px;">package?ProducerConsumer;??
  • ??
  • public??final?class?PCData?{??
  • ??
  • ????private?final?int?intData;??
  • ??
  • ????public?PCData(int?d)?{??
  • ????????intData=d;??
  • ????}??
  • ??????
  • ????public?PCData(String??d)?{??
  • ????????intData=Integer.valueOf(d);??
  • ????}??
  • ??????
  • ????public?int?getData(){??
  • ??????????
  • ????????return?intData;??
  • ??????????
  • ????}??
  • ????@Override??
  • ????public?String?toString(){??
  • ????????return?"data:"+?intData?;??
  • ????}??
  • ??????
  • }??
  • </span>??
  • ?

    (4)Main函數(shù):

    [java]?view plaincopy
  • <span?style="font-size:18px;">package?ProducerConsumer;??
  • ??
  • import?java.util.concurrent.BlockingQueue;??
  • import?java.util.concurrent.Executor;??
  • import?java.util.concurrent.ExecutorService;??
  • import?java.util.concurrent.Executors;??
  • import?java.util.concurrent.LinkedBlockingDeque;??
  • ??
  • public?class?Main?{??
  • ??
  • ????/**?
  • ?????*?@param?args?
  • ?????*/??
  • ????public?static?void?main(String[]?args)??throws?InterruptedException{??
  • ????????//建立緩沖區(qū)??
  • ????????BlockingQueue<PCData>?queue?=?new?LinkedBlockingDeque<PCData>(10);??
  • ????????//建立生產(chǎn)者??
  • ????????Producer?producer1?=?new?Producer(queue);??
  • ????????Producer?producer2?=?new?Producer(queue);??
  • ????????Producer?producer3?=?new?Producer(queue);??
  • ??????????
  • ????????//建立消費(fèi)者??
  • ????????Consumer?consumer1?=?new?Consumer(queue);??
  • ????????Consumer?consumer2?=?new?Consumer(queue);??
  • ????????Consumer?consumer3?=?new?Consumer(queue);?????????
  • ??????????????????
  • ????????//建立線程池??
  • ????????ExecutorService?service?=?Executors.newCachedThreadPool();??
  • ??????????
  • ????????//運(yùn)行生產(chǎn)者??
  • ????????service.execute(producer1);??
  • ????????service.execute(producer2);??
  • ????????service.execute(producer3);??
  • ????????//運(yùn)行消費(fèi)者??
  • ????????service.execute(consumer1);??
  • ????????service.execute(consumer2);??
  • ????????service.execute(consumer3);??
  • ??????
  • ????????Thread.sleep(10*1000);??
  • ??????????
  • ????????//停止生產(chǎn)者??
  • ????????producer1.stop();??
  • ????????producer2.stop();??
  • ????????producer3.stop();??
  • ??????????
  • ????????Thread.sleep(3000);??
  • ????????service.shutdown();??
  • ????}??
  • ??
  • }??
  • </span>??
  • ?

    三、注意:

    ????volatile關(guān)鍵字:Volatile修飾的成員變量在每次被線程訪問時(shí),都強(qiáng)迫從共享內(nèi)存中重讀該成員變量的值。而且,當(dāng)成員變量發(fā)生變化時(shí),強(qiáng)迫線程將變化值回寫到共享內(nèi)存。這樣在任何時(shí)刻,兩個(gè)不同的線程總是看到某個(gè)成員變量的同一個(gè)值。

    ????生產(chǎn)-消費(fèi)模式的核心組件是共享內(nèi)存緩沖區(qū),是兩者的通信橋梁,起到解耦作用,優(yōu)化系統(tǒng)整體結(jié)構(gòu)。

    ????由于緩沖區(qū)的存在,生產(chǎn)者和消費(fèi)者,無論誰在某一局部時(shí)間內(nèi)速度相對較高,都可以使用緩沖區(qū)得到緩解,保證系統(tǒng)正常運(yùn)行,這在一定程度上緩解了性能瓶頸對系統(tǒng)系能的影響。

    轉(zhuǎn):http://blog.csdn.net/lmdcszh/article/details/39699261

    轉(zhuǎn)載于:https://www.cnblogs.com/duanxz/p/5143186.html

    總結(jié)

    以上是生活随笔為你收集整理的多线程设计模式(四):生产者-消费模式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。