Java并发中常用同步工具类
為什么80%的碼農(nóng)都做不了架構(gòu)師?>>> ??
同步工具類可以是任何一個(gè)對(duì)象,只要它根據(jù)其自身的狀態(tài)來協(xié)調(diào)線程控制流。阻塞隊(duì)列(BlockingQueue)可以作為同步工具類,其他類型的同步工具類還包括信號(hào)量(Semaphore),柵欄(Barrier)以及閉鎖(Latch)。在平臺(tái)類庫(kù)中還包含其他一些同步工具類的類,如果這些類還無法滿足需要,那么可以創(chuàng)建自己的同步工具類。
閉鎖Latch
閉鎖可以延遲線程的進(jìn)度直到其到達(dá)終止?fàn)顟B(tài)。閉鎖的作用相當(dāng)于一扇門:在閉鎖到達(dá)結(jié)束狀態(tài)之前,這扇門一直是關(guān)著的,并且沒有任何線程能通過,當(dāng)?shù)竭_(dá)結(jié)束狀態(tài)時(shí),這扇門會(huì)打開并允許所有的線程通過。當(dāng)閉鎖到達(dá)結(jié)束狀態(tài)后,將不會(huì)再改變狀態(tài),因此這扇門將永遠(yuǎn)保持打開狀態(tài)。閉鎖可以用來確保某些活動(dòng)直到其他活動(dòng)都完成后才繼續(xù)執(zhí)行,例如:
- 確保某個(gè)計(jì)算在其需要的所有資源都被初始化之后才繼續(xù)執(zhí)行。二元閉鎖(包括兩個(gè)狀態(tài))可以用來表示“資源R已經(jīng)被初始化”,而所有需要R的操作都必須在這個(gè)閉鎖上等待。
- 確保某個(gè)服務(wù)在其依賴的所有其他服務(wù)都已經(jīng)啟動(dòng)之后才啟動(dòng)。每個(gè)服務(wù)都有一個(gè)相關(guān)的二元閉鎖。當(dāng)啟動(dòng)服務(wù)S時(shí),將首先在S以來的其他服務(wù)的閉鎖上等待,在所有依賴的服務(wù)都啟動(dòng)后會(huì)釋放閉鎖S,這樣其他依賴S的服務(wù)才能繼續(xù)執(zhí)行。
- 等待直到某個(gè)操作的所有參與者(例如:在多玩家游戲中的所有玩家)都就緒再繼續(xù)執(zhí)行。在這種情況中,當(dāng)所有的玩家都準(zhǔn)備就緒時(shí),閉鎖將到達(dá)結(jié)束狀態(tài)。
CountDownLatch是一種靈活的閉鎖實(shí)現(xiàn),可以在上述各種情況下使用,它可以使一個(gè)或多個(gè)線程等待一組事件發(fā)生。閉鎖狀態(tài)包括一個(gè)計(jì)數(shù)器,該計(jì)數(shù)器被初始化為一個(gè)正數(shù),表示需要等待的事件數(shù)量。countDown方法將遞減計(jì)數(shù)器,表示有一個(gè)事件已經(jīng)發(fā)生了,而await方法等待計(jì)數(shù)器到達(dá)零,這表示所有需要的事件都已經(jīng)發(fā)生。如果計(jì)數(shù)器的值為非零,那么await會(huì)一直阻塞直到計(jì)數(shù)器為零,或者等待中的線程中斷,或者等待超時(shí)。
在下面的TestHarness中給出了閉鎖的兩種常見用法。TestHarness創(chuàng)建一定數(shù)量的線程,利用它們并發(fā)的執(zhí)行指定的任務(wù)。它使用兩個(gè)閉鎖,分別表示“起始門(Start?Gate)”和“結(jié)束門(End Gate)”。起始門計(jì)數(shù)器的初始值為1,而結(jié)束們計(jì)數(shù)器的初始值為工作線程數(shù)量。每個(gè)工作線程首先要做的就是在啟動(dòng)門上等待,從而確保所有線程都就緒后才開始執(zhí)行。而每個(gè)線程要做的最后一件事情是調(diào)用countDown方法使事件數(shù)量減1,這能使主線程高效的等待直到所有工作線程都執(zhí)行完成,因此可以統(tǒng)計(jì)所消耗的時(shí)間:
public class TestHarness {public long timeTasks(int nThread, final Runnable task) throws InterruptedException {final CountDownLatch startGate = new CountDownLatch(1);final CountDownLatch endGate = new CountDownLatch(nThread);for (int i = 0; i < nThread; i++) {Thread t = new Thread() {public void run() {try {startGate.await();try {task.run();} finally {endGate.countDown();}} catch(InterruptedException ignored) {}}};t.start();}long start = System.nanoTime();startGate.countDown();long end = System.nanoTime();return end - start;} }為什么要在TestHarness中使用閉鎖,而不是在線程創(chuàng)建后就立即啟動(dòng)?或許,我們希望測(cè)試n個(gè)線程并發(fā)執(zhí)行某個(gè)任務(wù)時(shí)需要的時(shí)間。如果在創(chuàng)建線程之后立即啟動(dòng)它們,那么先啟動(dòng)的線程將“領(lǐng)先”后啟動(dòng)的線程,并且活躍線程數(shù)量會(huì)隨著時(shí)間的推移而增加或減少,競(jìng)爭(zhēng)程度也在不斷發(fā)生變化。起始門將使得主線程能夠同時(shí)釋放所有的工作線程,而結(jié)束門則使主線程能夠等待最后一個(gè)線程執(zhí)行完成,而不是順序的等待每個(gè)線程執(zhí)行完成。
FutureTask
FutureTask也可以用作閉鎖。(FutureTask實(shí)現(xiàn)了Future語(yǔ)義,表示一種抽象的可生成結(jié)果的計(jì)算)。FutureTask表示的計(jì)算是通過Callable來實(shí)現(xiàn)的,相當(dāng)于一種可生成結(jié)果的Runnable,并且可以處于以下3種狀態(tài):等待運(yùn)行(Waiting to run),正在運(yùn)行(Running)和完成運(yùn)行(Completed)。“執(zhí)行完成”表示計(jì)算的所有可能結(jié)束方式,包括正常結(jié)束、由于取消而結(jié)束和由于異常而結(jié)束等。當(dāng)FutureTask進(jìn)入完成狀態(tài)以后,它會(huì)永遠(yuǎn)停止在這個(gè)狀態(tài)上。
Future.get的行為取決于任務(wù)的狀態(tài)。如果任務(wù)已經(jīng)執(zhí)行完成,那么get會(huì)立即返回結(jié)果,否則get將阻塞直到任務(wù)進(jìn)入完成狀態(tài),然后返回結(jié)果或者拋出異常。FutureTask將計(jì)算結(jié)果從執(zhí)行計(jì)算的線程傳遞到獲取這個(gè)結(jié)果的線程,而FutureTask的規(guī)范確保了這種傳遞過程能實(shí)現(xiàn)結(jié)果的安全發(fā)布。
FutureTask在Executor框架中表示異步任務(wù),此外還可以用來表示一些時(shí)間較長(zhǎng)的計(jì)算,這些計(jì)算可以在使用計(jì)算結(jié)果之前啟動(dòng)。下面的程序就使用了FutureTask來執(zhí)行一個(gè)高開銷的計(jì)算,并且計(jì)算結(jié)果將在稍后使用。通過提前啟動(dòng)計(jì)算,可以減少在等待結(jié)果時(shí)需要的時(shí)間:
public class Preloader{private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(new Callable<ProductInfo>(){public ProductInfo call() throws DataLoadException{return loadProductInfo();}});private final Thread thread = new Thread(future);public void start() {thread.start();}public ProductInfo get() throws DataLoadException, InterruptedException{try {return future.get();} catch(ExecutionException e) {Throwable cause = e.getCause();if (cause instance of DataLoadException) {throw (DataLoadException)cause;} else {throw launderThrowable(cause);}}} }Preloader創(chuàng)建了一個(gè)FutureTask,其中包含從數(shù)據(jù)庫(kù)加載產(chǎn)品信息的任務(wù),以及一個(gè)執(zhí)行運(yùn)算的線程。由于在構(gòu)造函數(shù)或靜態(tài)初始化方法中啟動(dòng)線程并不是一種好方法,因此提供了一個(gè)start方法來啟動(dòng)線程。當(dāng)程序雖有需要ProductInfo時(shí),可以調(diào)用get方法,如果數(shù)據(jù)已經(jīng)加載,那么將返回這些數(shù)據(jù),否則將等待加載完成后再返回。
Callable表示的任務(wù)可以拋出受檢查的或未受檢查的異常,并且任何代碼都可能拋出一個(gè)Error。無論任務(wù)代碼拋出什么異常,都會(huì)被封裝到一個(gè)ExecutionException中,并在Future.get中被重新拋出。這將使調(diào)用get的代碼變得復(fù)雜,因?yàn)樗粌H需要處理可能出現(xiàn)的ExecutionException(以及未檢查的CancellationException),而且還由于ExecutionException是作為一個(gè)Throwable類返回的,因此處理起來并不容易。
信號(hào)量Semaphore
計(jì)數(shù)信號(hào)量(Counting Semaphore)用來控制同時(shí)訪問某個(gè)特定資源的操作數(shù)量,或者同時(shí)執(zhí)行某個(gè)指定操作的數(shù)量。計(jì)數(shù)信號(hào)量還可以用來實(shí)現(xiàn)某種資源池,或者對(duì)容器施加邊界。
Semaphore中管理著一組虛擬的許可(permit),許可的初始數(shù)量可通過構(gòu)造函數(shù)來指定。在執(zhí)行操作時(shí)可以首先獲得許可(只要還有剩余的許可),并在使用以后釋放即可。如果沒有許可,那么aquire將阻塞直到有許可(或者直到被中斷或者操作超時(shí))。release方法將返回一個(gè)許可給信號(hào)量。計(jì)算信號(hào)量的一種簡(jiǎn)化形式是二值信號(hào)量,即初始值為1的Semaphore。二值信號(hào)量可以用作互斥體(mutex),并具備不可重入的加鎖語(yǔ)義:誰擁有這個(gè)唯一的許可,誰就擁有了互斥鎖。
Semaphore可以用于實(shí)現(xiàn)資源池,例如數(shù)據(jù)庫(kù)連接池。我們可以構(gòu)造一個(gè)固定長(zhǎng)度的資源池,當(dāng)池為空時(shí),請(qǐng)求資源將會(huì)失敗,但你真正希望看到的行為是阻塞而不是失敗,并且當(dāng)池非空時(shí)解除阻塞。如果將Semaphore的計(jì)數(shù)值初始化為池的大小,并在從池中獲取一個(gè)資源之前先調(diào)用aquire方法獲得一個(gè)許可,在將資源返回給池之后調(diào)用release釋放許可,這樣就實(shí)現(xiàn)了固定長(zhǎng)度了。
同樣,你可以使用Semaphore將任何一種容器變成有界阻塞容器,如下代碼所示。信號(hào)量的計(jì)數(shù)值會(huì)初始化為容器容量的最大值。add操作在向底層容器中添加一個(gè)元素之前,首先需要獲得一個(gè)許可。如果add操作沒有添加任何元素,那么會(huì)立刻釋放許可。同樣remove操作釋放一個(gè)許可,使更多的元素能夠添加到容器中。底層的Set實(shí)現(xiàn)并不知道關(guān)于邊界的任何信息,這是由BoundedHashSet來處理的。
public class BoundedHashSet<T> {private final Set<T> set;private final Semaphore sem;public BoundedHashSet(int bound) {this.set = Collections.synchronizedSet(new HashSet<T>());sem = new Semaphore(bound);}public boolean add(T o) throws InterruptedException {sem.aquire();boolean wasAdded = false;try {wasAdded = set.add(o);return wasAdded;} finally {if (!wasAdded) {sem.release();}}}public boolean remove(Object o) {boolean wasRemoved = set.remove(o);if (wasRemoved) {sem.release();}return wasRemoved;} }柵欄Barrier
我們已經(jīng)看到通過閉鎖來啟動(dòng)一組相關(guān)的操作,或者等待也組相關(guān)的操作結(jié)束。閉鎖是一次性對(duì)象,一旦進(jìn)入終止?fàn)顟B(tài),就不能被重置。
柵欄(Barrier)類似于閉鎖,它能阻塞線程直到某個(gè)事件發(fā)生。柵欄與閉鎖的關(guān)鍵區(qū)別在于:所有線程必須同時(shí)到達(dá)柵欄位置,才能繼續(xù)執(zhí)行。閉鎖也用于等待事件,而柵欄也用于等待其他線程。柵欄用于實(shí)現(xiàn)一些協(xié)議,例如幾個(gè)家庭決定在某個(gè)地方集合:“所有人6:00在麥當(dāng)勞碰頭,到了以后要等其他人,之后再討論下一步要做的事情。”
CyclicBarrier可以使一定數(shù)量的參與方反復(fù)地在柵欄位置匯集,它在并行迭代算法中非常有用:這種算法通常將也一個(gè)問題拆分成一系列相互獨(dú)立的子問題。當(dāng)線程到達(dá)柵欄位置時(shí),將調(diào)用await方法,這個(gè)方法將阻塞直到所有線程都到達(dá)阻塞位置。如果所有線程都到達(dá)了柵欄的位置,那么柵欄將打開,此時(shí)所有線程都將被釋放,而柵欄將被重置以便下次使用。如果對(duì)await的調(diào)用超時(shí),或者await阻塞的線程被中斷,那么柵欄就被認(rèn)為是打破了,所有阻塞的await調(diào)用都將終止并拋出BrokenBarrierException。如果成功地通過柵欄,那么await將為每個(gè)線程返回一個(gè)唯一的到達(dá)索引號(hào),我們可以利用這些索引來“選舉”產(chǎn)生一個(gè)領(lǐng)導(dǎo)線程,并在下一次迭代中由該領(lǐng)導(dǎo)線程執(zhí)行一些特殊的工作。CyclicBarrier還可以使你將一個(gè)柵欄參數(shù)傳遞給構(gòu)造函數(shù),這是一個(gè)Runnable,當(dāng)成功通過柵欄時(shí)會(huì)(在下一個(gè)子任務(wù)線程中)執(zhí)行它,但在阻塞線程被釋放之前是不能執(zhí)行的。
下面的例子中給出了如何通過柵欄來計(jì)算細(xì)胞的自動(dòng)化模擬。在把模擬過程并行化時(shí),為每個(gè)元素(這里為每個(gè)細(xì)胞)分配一個(gè)獨(dú)立的線程是不現(xiàn)實(shí)的,也因?yàn)檫@將產(chǎn)生過多的線程,而在協(xié)調(diào)這些線程上導(dǎo)致的開銷將降低計(jì)算性能。合理的做法是,將問題分解成一定數(shù)量的子問題,為每個(gè)子問題分配一個(gè)線程來進(jìn)行求解,之后再將所有的結(jié)果合并起來。CellularAutomata將問題分解為Ncpu(可用的CPU數(shù)量)個(gè)子問題,并將每個(gè)子問題分配給一個(gè)線程。在每個(gè)步驟中,工作線程都為各自子問題中的所有細(xì)胞計(jì)算新值。當(dāng)所有工作線程都到達(dá)柵欄時(shí),柵欄會(huì)把這些新值交給數(shù)據(jù)模型。在柵欄的操作執(zhí)行完成以后,工作線程將開始下一步的計(jì)算,包括調(diào)用isDone方法來判斷是否需要進(jìn)行下一次迭代。
public class CellularAutomata {private final Board mainBoard;private final CyclicBarrier barrier;private final Worker[] workers;public CellularAutomata (Board board) {this.mainBoard = board;int count = Runtime.getRuntime().availableProcessors();this.barrier = new CyclicBarrier(count, new Runnable(){public void run() {mainBoard.commitNewValues();}});this.workers = new Worker[count];for (int i = 0; i < count; i++) {workers[i] = new Worker(mainBoard.getSubBoard(count, i));}}private class Worker implements Runnable {private final Board board;public Worker(Board board) {this.board = board;}public void run() {while(!board.hasConverged()) {for (int x = 0; x < board.getMaxX(); x++) {for (int y = 0; y < board.getMaxY(); y++) {board.setNewValue(x, y, computeValue(x, y));}}try {barrier.await();} catch(InterruptedException ex) {return;} catch(BrokenBarrierException ex) {return;}}}}public void start() {for (int i = 0; i < workers.length; i++) {new Thread(workers[i]).start();}mainBoard.waitForConvergence();} }另一種形式的柵欄就是Exchanger,它是一種兩方(Two-Party)柵欄,各方在柵欄位置上交換數(shù)據(jù)。當(dāng)兩方執(zhí)行不對(duì)稱的操作時(shí),Exchanger會(huì)非常有用,例如當(dāng)一個(gè)線程向緩沖區(qū)寫入數(shù)據(jù),而另一個(gè)線程從緩沖區(qū)中讀取數(shù)據(jù)。這些線程可以使用Exchanger來匯合,并將滿的緩沖區(qū)與空的緩沖區(qū)交換。當(dāng)兩個(gè)線程通過Exchanger交換對(duì)象時(shí),這種交換就把這兩個(gè)對(duì)象安全地發(fā)布給另一方。
數(shù)據(jù)交換的時(shí)機(jī)取決于應(yīng)用程序的響應(yīng)需求。最簡(jiǎn)單的方案是,當(dāng)緩沖區(qū)被填滿時(shí),由填充任務(wù)進(jìn)行交換,當(dāng)緩沖區(qū)為空時(shí),由清空任務(wù)進(jìn)行交換。這樣會(huì)把需要交換的次數(shù)降至最低,但如果新數(shù)據(jù)的到達(dá)率不可預(yù)測(cè),那么一些數(shù)據(jù)的處理過程就將也延遲。另一個(gè)方法是,不僅當(dāng)緩沖被填滿時(shí)進(jìn)行交換,并且當(dāng)緩沖被填充到一定程度并保持也一定時(shí)間也以后,也進(jìn)行交換。
轉(zhuǎn)載于:https://my.oschina.net/itblog/blog/775918
總結(jié)
以上是生活随笔為你收集整理的Java并发中常用同步工具类的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 几个颇有创意的网站推广方法(推荐)
- 下一篇: java美元兑换,(Java实现) 美元