phaser java_【Java并发编程实战】-----“J.U.C”:Phaser
Phaser由java7中推出,是Java SE 7中新增的一個(gè)使用同步工具,在功能上面它與CyclicBarrier、CountDownLatch有些重疊,但是它提供了更加靈活、強(qiáng)大的用法。
CyclicBarrier,允許一組線程互相等待,直到到達(dá)某個(gè)公共屏障點(diǎn)。它提供的await()可以實(shí)現(xiàn)讓所有參與者在臨界點(diǎn)到來之前一直處于等待狀態(tài)。
CountDownLatch,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待。它提供了await()、countDown()兩個(gè)方法來進(jìn)行操作。
在Phaser中,它把多個(gè)線程協(xié)作執(zhí)行的任務(wù)劃分為多個(gè)階段,編程時(shí)需要明確各個(gè)階段的任務(wù),每個(gè)階段都可以有任意個(gè)參與者,線程都可以隨時(shí)注冊并參與到某個(gè)階段。
構(gòu)造
Phaser創(chuàng)建后,初始階段編號為0,構(gòu)造函數(shù)中指定初始參與個(gè)數(shù)。
注冊:Registration
Phaser支持通過register()和bulkRegister(int parties)方法來動態(tài)調(diào)整注冊任務(wù)的數(shù)量。
Arrival
每個(gè)Phaser實(shí)例都會維護(hù)一個(gè)phase number,初始值為0。每當(dāng)所有注冊的任務(wù)都到達(dá)Phaser時(shí),phase number累加,并在超過Integer.MAX_VALUE后清零。arrive()和arriveAndDeregister()方法用于記錄到達(dá);其中arrive(),某個(gè)參與者完成任務(wù)后調(diào)用;arriveAndDeregister(),任務(wù)完成,取消自己的注冊。arriveAndAwaitAdvance(),自己完成等待其他參與者完成,進(jìn)入阻塞,直到Phaser成功進(jìn)入下個(gè)階段。
example 1
public classPhaserTest_1 {public static voidmain(String[] args) {
Phaser phaser= new Phaser(5);for(int i = 0 ; i < 5 ; i++){
Task_01 task_01= newTask_01(phaser);
Thread thread= new Thread(task_01, "PhaseTest_" +i);
thread.start();
}
}static class Task_01 implementsRunnable{private finalPhaser phaser;publicTask_01(Phaser phaser){this.phaser =phaser;
}
@Overridepublic voidrun() {
System.out.println(Thread.currentThread().getName()+ "執(zhí)行任務(wù)完成,等待其他任務(wù)執(zhí)行......");//等待其他任務(wù)執(zhí)行完成
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+ "繼續(xù)執(zhí)行任務(wù)...");
}
}
}
運(yùn)行結(jié)果:
PhaseTest_0執(zhí)行任務(wù)完成,等待其他任務(wù)執(zhí)行......
PhaseTest_1執(zhí)行任務(wù)完成,等待其他任務(wù)執(zhí)行......
PhaseTest_3執(zhí)行任務(wù)完成,等待其他任務(wù)執(zhí)行......
PhaseTest_2執(zhí)行任務(wù)完成,等待其他任務(wù)執(zhí)行......
PhaseTest_4執(zhí)行任務(wù)完成,等待其他任務(wù)執(zhí)行......
PhaseTest_4繼續(xù)執(zhí)行任務(wù)...
PhaseTest_1繼續(xù)執(zhí)行任務(wù)...
PhaseTest_0繼續(xù)執(zhí)行任務(wù)...
PhaseTest_2繼續(xù)執(zhí)行任務(wù)...
PhaseTest_3繼續(xù)執(zhí)行任務(wù)...
在該實(shí)例中我們可以確認(rèn),所有子線程的****+”繼續(xù)執(zhí)行任務(wù)…”,都是在線程調(diào)用arriveAndAwaitAdvance()方法之后執(zhí)行的。
example 2
前面提到過,Phaser提供了比CountDownLatch、CyclicBarrier更加強(qiáng)大、靈活的功能,從某種程度上來說,Phaser可以替換他們:
public classPhaserTest_5 {public static voidmain(String[] args) {
Phaser phaser= new Phaser(1); //相當(dāng)于CountDownLatch(1)//五個(gè)子任務(wù)
for(int i = 0 ; i < 3 ; i++){
Task_05 task= newTask_05(phaser);
Thread thread= new Thread(task,"PhaseTest_" +i);
thread.start();
}try{//等待3秒
Thread.sleep(3000);
}catch(InterruptedException e) {
e.printStackTrace();
}
phaser.arrive();//countDownLatch.countDown()
}static class Task_05 implementsRunnable{private finalPhaser phaser;
Task_05(Phaser phaser){this.phaser =phaser;
}
@Overridepublic voidrun() {
phaser.awaitAdvance(phaser.getPhase());//countDownLatch.await()
System.out.println(Thread.currentThread().getName() + "執(zhí)行任務(wù)...");
}
}
}
在這里,任務(wù)一開始并沒有真正執(zhí)行,而是等待三秒后執(zhí)行。
對于CyclicBarrier就更加簡單了,直接arriveAndAwaitAdvance()方法替換,如example 1。
example 3
在CyclicBarrier中當(dāng)任務(wù)執(zhí)行完之后可以執(zhí)行一個(gè)action,在Phaser中同樣有一個(gè)對應(yīng)的action,只不過Phaser需要重寫onAdvance()方法:
public classPhaserTest_3 {public static voidmain(String[] args) {
Phaser phaser= new Phaser(3){/*** registeredParties:線程注冊的數(shù)量
* phase:進(jìn)入該方法的線程數(shù),*/
protected boolean onAdvance(int phase, intregisteredParties) {
System.out.println("執(zhí)行onAdvance方法.....;phase:" + phase + "registeredParties=" +registeredParties);return phase == 3;
}
};for(int i = 0 ; i < 3 ; i++){
Task_03 task= newTask_03(phaser);
Thread thread= new Thread(task,"task_" +i);
thread.start();
}while(!phaser.isTerminated()){
phaser.arriveAndAwaitAdvance();//主線程一直等待
}
System.out.println("主線程任務(wù)已經(jīng)結(jié)束....");
}static class Task_03 implementsRunnable{private finalPhaser phaser;publicTask_03(Phaser phaser){this.phaser =phaser;
}
@Overridepublic voidrun() {do{try{
Thread.sleep(500);
}catch(InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+ "開始執(zhí)行任務(wù)...");
phaser.arriveAndAwaitAdvance();
}while(!phaser.isTerminated());
}
}
}
運(yùn)行結(jié)果:
task_0開始執(zhí)行任務(wù)...
task_1開始執(zhí)行任務(wù)...
task_1執(zhí)行onAdvance方法.....;phase:0registeredParties=3task_2開始執(zhí)行任務(wù)...
task_0開始執(zhí)行任務(wù)...
task_1開始執(zhí)行任務(wù)...
task_0執(zhí)行onAdvance方法.....;phase:1registeredParties=3task_2開始執(zhí)行任務(wù)...
task_2執(zhí)行onAdvance方法.....;phase:2registeredParties=3主線程任務(wù)已經(jīng)結(jié)束....
task_0開始執(zhí)行任務(wù)...
參考博文:
總結(jié)
以上是生活随笔為你收集整理的phaser java_【Java并发编程实战】-----“J.U.C”:Phaser的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: CodeTank iOS App Tec
- 下一篇: Java反射基础(一)--Class对象