java master work_并发编程之Master-Worker模式
我們知道,單個(gè)線程計(jì)算是串行的,只有等上一個(gè)任務(wù)結(jié)束之后,才能執(zhí)行下一個(gè)任務(wù),所以執(zhí)行效率是比較低的。
那么,如果用多線程執(zhí)行任務(wù),就可以在單位時(shí)間內(nèi)執(zhí)行更多的任務(wù),而Master-Worker就是多線程并行計(jì)算的一種實(shí)現(xiàn)方式。
它的思想是,啟動(dòng)兩個(gè)進(jìn)程協(xié)同工作:Master和Worker進(jìn)程。
Master負(fù)責(zé)任務(wù)的接收和分配,Worker負(fù)責(zé)具體的子任務(wù)執(zhí)行。每個(gè)Worker執(zhí)行完任務(wù)之后把結(jié)果返回給Master,最后由Master匯總結(jié)果。(其實(shí)也是一種分而治之的思想,和forkjoin計(jì)算框架有相似之處,參看:并行任務(wù)計(jì)算框架forkjoin)
Master-Worker工作示意圖如下:
下面用Master-Worker實(shí)現(xiàn)計(jì)算1-100的平方和,思路如下:
定義一個(gè)Task類用于存儲每個(gè)任務(wù)的數(shù)據(jù)。
Master生產(chǎn)固定個(gè)數(shù)的Worker,把所有worker存放在workers變量(map)中,Master需要存儲所有任務(wù)的隊(duì)列workqueue(ConcurrentLinkedQueue)和所有子任務(wù)返回的結(jié)果集resultMap(ConcurrentHashMap)。
每個(gè)Worker執(zhí)行自己的子任務(wù),然后把結(jié)果存放在resultMap中。
Master匯總resultMap中的數(shù)據(jù),然后返回給Client客戶端。
為了擴(kuò)展Worker的功能,用一個(gè)MyWorker繼承Worker重寫任務(wù)處理的具體方法。
Task類:
package com.thread.masterworker;
public class Task {
private int id;
private String name;
private int num;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
}
Master實(shí)現(xiàn):
package com.thread.masterworker;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Master {
//所有任務(wù)的隊(duì)列
private ConcurrentLinkedQueue workerQueue = new ConcurrentLinkedQueue();
//所有worker
private HashMap workers = new HashMap();
//共享變量,worker返回的結(jié)果
private ConcurrentHashMap resultMap = new ConcurrentHashMap();
//構(gòu)造方法,初始化所有worker
public Master(Worker worker,int workerCount){
worker.setWorkerQueue(this.workerQueue);
worker.setResultMap(this.resultMap);
for (int i = 0; i < workerCount; i++) {
Thread t = new Thread(worker);
this.workers.put("worker-"+i,t);
}
}
//任務(wù)的提交
public void submit(Task task){
this.workerQueue.add(task);
}
//執(zhí)行任務(wù)
public int execute(){
for (Map.Entry entry : workers.entrySet()) {
entry.getValue().start();
}
//一直循環(huán),直到結(jié)果返回
while (true){
if(isComplete()){
return getResult();
}
}
}
//判斷是否所有線程都已經(jīng)執(zhí)行完畢
public boolean isComplete(){
for (Map.Entry entry : workers.entrySet()) {
//只要有任意一個(gè)線程沒有結(jié)束,就返回false
if(entry.getValue().getState() != Thread.State.TERMINATED){
return false;
}
}
return true;
}
//處理結(jié)果集返回最終結(jié)果
public int getResult(){
int res = 0;
for (Map.Entry entry : resultMap.entrySet()) {
res += (Integer) entry.getValue();
}
return res;
}
}
父類Worker:
package com.thread.masterworker;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Worker implements Runnable {
private ConcurrentLinkedQueue workerQueue;
private ConcurrentHashMap resultMap;
public void setWorkerQueue(ConcurrentLinkedQueue workerQueue) {
this.workerQueue = workerQueue;
}
public void setResultMap(ConcurrentHashMap resultMap) {
this.resultMap = resultMap;
}
@Override
public void run() {
while(true){
//從任務(wù)隊(duì)列中取出一個(gè)任務(wù)
Task task = workerQueue.poll();
if(task == null) break;
//處理具體的任務(wù)
Object res = doTask(task);
//把每次處理的結(jié)果放到結(jié)果集里面,此處直接把num值作為結(jié)果
resultMap.put(String.valueOf(task.getId()),res);
}
}
public Object doTask(Task task) {
return null;
}
}
子類MyWorker繼承父類Worker,重寫doTask方法實(shí)現(xiàn)具體的邏輯:
package com.thread.masterworker;
public class MyWorker extends Worker {
@Override
public Object doTask(Task task) {
//暫停0.5秒,模擬任務(wù)處理
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
//計(jì)算數(shù)字的平方
int num = task.getNum();
return num * num;
}
}
客戶端Client:
package com.thread.masterworker;
import java.util.Random;
public class Client {
public static void main(String[] args) {
Master master = new Master(new MyWorker(), 10);
//提交n個(gè)任務(wù)到任務(wù)隊(duì)列里
for (int i = 0; i < 100; i++) {
Task task = new Task();
task.setId(i);
task.setName("任務(wù)"+i);
task.setNum(i+1);
master.submit(task);
}
//執(zhí)行任務(wù)
long start = System.currentTimeMillis();
int res = master.execute();
long time = System.currentTimeMillis() - start;
System.out.println("結(jié)果:"+res+",耗時(shí):"+time);
}
}
以上,我們用10個(gè)線程去執(zhí)行子任務(wù),最終由Master做計(jì)算求和(1-100的平方和)。每個(gè)線程暫停500ms,計(jì)算數(shù)字的平方值。
總共100個(gè)任務(wù),分10個(gè)線程并行計(jì)算,相當(dāng)于每個(gè)線程均分10個(gè)任務(wù),一個(gè)任務(wù)的時(shí)間大概為500ms,故10個(gè)任務(wù)為5000ms,再加上計(jì)算平方值的時(shí)間,故稍大于5000ms。結(jié)果如下,
結(jié)果:338350,耗時(shí):5084
總結(jié)
以上是生活随笔為你收集整理的java master work_并发编程之Master-Worker模式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java lambda collect_
- 下一篇: 读《java的讲座》后感,老师讲座听后感