日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java master work_并发编程之Master-Worker模式

發(fā)布時(shí)間:2025/4/5 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java master work_并发编程之Master-Worker模式 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

我們知道,單個(gè)線程計(jì)算是串行的,只有等上一個(gè)任務(wù)結(jié)束之后,才能執(zhí)行下一個(gè)任務(wù),所以執(zhí)行效率是比較低的。

那么,如果用多線程執(zhí)行任務(wù),就可以在單位時(shí)間內(nèi)執(zhí)行更多的任務(wù),而Master-Worker就是多線程并行計(jì)算的一種實(shí)現(xiàn)方式。

它的思想是,啟動(dòng)兩個(gè)進(jìn)程協(xié)同工作:Master和Worker進(jìn)程。

Master負(fù)責(zé)任務(wù)的接收和分配,Worker負(fù)責(zé)具體的子任務(wù)執(zhí)行。每個(gè)Worker執(zhí)行完任務(wù)之后把結(jié)果返回給Master,最后由Master匯總結(jié)果。(其實(shí)也是一種分而治之的思想,和forkjoin計(jì)算框架有相似之處,參看:并行任務(wù)計(jì)算框架forkjoin)

Master-Worker工作示意圖如下:

下面用Master-Worker實(shí)現(xiàn)計(jì)算1-100的平方和,思路如下:

定義一個(gè)Task類用于存儲每個(gè)任務(wù)的數(shù)據(jù)。

Master生產(chǎn)固定個(gè)數(shù)的Worker,把所有worker存放在workers變量(map)中,Master需要存儲所有任務(wù)的隊(duì)列workqueue(ConcurrentLinkedQueue)和所有子任務(wù)返回的結(jié)果集resultMap(ConcurrentHashMap)。

每個(gè)Worker執(zhí)行自己的子任務(wù),然后把結(jié)果存放在resultMap中。

Master匯總resultMap中的數(shù)據(jù),然后返回給Client客戶端。

為了擴(kuò)展Worker的功能,用一個(gè)MyWorker繼承Worker重寫任務(wù)處理的具體方法。

Task類:

package com.thread.masterworker;

public class Task {

private int id;

private String name;

private int num;

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public int getNum() {

return num;

}

public void setNum(int num) {

this.num = num;

}

}

Master實(shí)現(xiàn):

package com.thread.masterworker;

import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.ConcurrentLinkedQueue;

public class Master {

//所有任務(wù)的隊(duì)列

private ConcurrentLinkedQueue workerQueue = new ConcurrentLinkedQueue();

//所有worker

private HashMap workers = new HashMap();

//共享變量,worker返回的結(jié)果

private ConcurrentHashMap resultMap = new ConcurrentHashMap();

//構(gòu)造方法,初始化所有worker

public Master(Worker worker,int workerCount){

worker.setWorkerQueue(this.workerQueue);

worker.setResultMap(this.resultMap);

for (int i = 0; i < workerCount; i++) {

Thread t = new Thread(worker);

this.workers.put("worker-"+i,t);

}

}

//任務(wù)的提交

public void submit(Task task){

this.workerQueue.add(task);

}

//執(zhí)行任務(wù)

public int execute(){

for (Map.Entry entry : workers.entrySet()) {

entry.getValue().start();

}

//一直循環(huán),直到結(jié)果返回

while (true){

if(isComplete()){

return getResult();

}

}

}

//判斷是否所有線程都已經(jīng)執(zhí)行完畢

public boolean isComplete(){

for (Map.Entry entry : workers.entrySet()) {

//只要有任意一個(gè)線程沒有結(jié)束,就返回false

if(entry.getValue().getState() != Thread.State.TERMINATED){

return false;

}

}

return true;

}

//處理結(jié)果集返回最終結(jié)果

public int getResult(){

int res = 0;

for (Map.Entry entry : resultMap.entrySet()) {

res += (Integer) entry.getValue();

}

return res;

}

}

父類Worker:

package com.thread.masterworker;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.ConcurrentLinkedQueue;

public class Worker implements Runnable {

private ConcurrentLinkedQueue workerQueue;

private ConcurrentHashMap resultMap;

public void setWorkerQueue(ConcurrentLinkedQueue workerQueue) {

this.workerQueue = workerQueue;

}

public void setResultMap(ConcurrentHashMap resultMap) {

this.resultMap = resultMap;

}

@Override

public void run() {

while(true){

//從任務(wù)隊(duì)列中取出一個(gè)任務(wù)

Task task = workerQueue.poll();

if(task == null) break;

//處理具體的任務(wù)

Object res = doTask(task);

//把每次處理的結(jié)果放到結(jié)果集里面,此處直接把num值作為結(jié)果

resultMap.put(String.valueOf(task.getId()),res);

}

}

public Object doTask(Task task) {

return null;

}

}

子類MyWorker繼承父類Worker,重寫doTask方法實(shí)現(xiàn)具體的邏輯:

package com.thread.masterworker;

public class MyWorker extends Worker {

@Override

public Object doTask(Task task) {

//暫停0.5秒,模擬任務(wù)處理

try {

Thread.sleep(500);

} catch (InterruptedException e) {

e.printStackTrace();

}

//計(jì)算數(shù)字的平方

int num = task.getNum();

return num * num;

}

}

客戶端Client:

package com.thread.masterworker;

import java.util.Random;

public class Client {

public static void main(String[] args) {

Master master = new Master(new MyWorker(), 10);

//提交n個(gè)任務(wù)到任務(wù)隊(duì)列里

for (int i = 0; i < 100; i++) {

Task task = new Task();

task.setId(i);

task.setName("任務(wù)"+i);

task.setNum(i+1);

master.submit(task);

}

//執(zhí)行任務(wù)

long start = System.currentTimeMillis();

int res = master.execute();

long time = System.currentTimeMillis() - start;

System.out.println("結(jié)果:"+res+",耗時(shí):"+time);

}

}

以上,我們用10個(gè)線程去執(zhí)行子任務(wù),最終由Master做計(jì)算求和(1-100的平方和)。每個(gè)線程暫停500ms,計(jì)算數(shù)字的平方值。

總共100個(gè)任務(wù),分10個(gè)線程并行計(jì)算,相當(dāng)于每個(gè)線程均分10個(gè)任務(wù),一個(gè)任務(wù)的時(shí)間大概為500ms,故10個(gè)任務(wù)為5000ms,再加上計(jì)算平方值的時(shí)間,故稍大于5000ms。結(jié)果如下,

結(jié)果:338350,耗時(shí):5084

總結(jié)

以上是生活随笔為你收集整理的java master work_并发编程之Master-Worker模式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。