日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java 并发基础

發(fā)布時間:2025/3/17 java 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java 并发基础 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Java 并發(fā)基礎(chǔ)

標簽 : Java基礎(chǔ)


線程簡述

線程是進程的執(zhí)行部分,用來完成一定的任務(wù); 線程擁有自己的堆棧,程序計數(shù)器和自己的局部變量,但不擁有系統(tǒng)資源, 他與其他線程共享父進程的共享資源及部分運行時環(huán)境,因此編程時需要小心,確保線程不會妨礙同一進程中的其他線程;

  • 多線程優(yōu)勢
    • 進程之間不能共享內(nèi)存,但線程之間共享內(nèi)存/文件描述符/進程狀態(tài)非常容易.
    • 系統(tǒng)創(chuàng)建進程時需要為該其分配很多系統(tǒng)資源(如進程控制塊),但創(chuàng)建線程的開銷要小得多,因此線程實現(xiàn)多任務(wù)并發(fā)比進程效率高.
    • Java語言內(nèi)置多線程支持,而不是單純采用底層操作系統(tǒng)API調(diào)用, 從而可以簡化Java的多線程編程.

線程創(chuàng)建與啟動

Java使用java.lang.Thread代表線程(所有的線程對象必須是Thread類的實例).使用java.lang.Runnable java.util.concurrent.Callable和java.util.concurrent.Future來代表一段線程執(zhí)行體(一段順序執(zhí)行的代碼).一個線程的作用是完成一段程序流的執(zhí)行,同時子線程的執(zhí)行還可以跟父線程并行, 兩段線程的執(zhí)行流程沒有關(guān)系, 父線程還可以繼續(xù)執(zhí)行其他的事情.


繼承Thread

繼承Thread類,并重寫run()方法(代表線程執(zhí)行體),然后調(diào)用start()方法來啟動線程.

/*** @author jifang* @since 16/1/20下午2:32.*/ public class ThreadStart {public static void main(String[] args) {new ConcreteThread().start();new ConcreteThread("second").start();for (int i = 0; i < 10; ++i) {System.out.println(Thread.currentThread().getName() + ": i");}}private static class ConcreteThread extends Thread {public ConcreteThread() {}public ConcreteThread(String name) {super(name);}@Overridepublic void run() {for (int i = 0; i < 10; ++i) {System.out.println(getName() + ": " + i);}}} }

繼承Thread類來創(chuàng)建線程類時,多個線程之間無法共享線程類的實例變量.


實現(xiàn)Runnable

實現(xiàn)Runnable接口,重寫run()方法(同樣代表線程執(zhí)行體),并將該類實例作為Thread的target提交給線程執(zhí)行.

/*** @author jifang* @since 16/1/20下午2:47.*/ public class RunnableStart {public static void main(String[] args) {Runnable runnable = new ConcreteRunnable();new Thread(runnable, "first").start();new Thread(runnable).start();for (int i = 0; i < 10; ++i) {System.out.println(Thread.currentThread().getName() + " " + i);}}private static class ConcreteRunnable implements Runnable {private int i = 0;@Overridepublic void run() {for (; i < 10; ++i) {System.out.println(Thread.currentThread().getName() + " " + i);}}} }

運行上例可以看到i值重復(fù)的現(xiàn)象,這是因為有多個線程都在修改同一個i值, 對于并發(fā)修改共享資源的情況,需要添加同步機制保護,詳見下面.

Runnable對象僅作為Thread對象的target,其包含的run()方法僅作為線程執(zhí)行體.實際的線程對象依然是Thread實例, 只是該Thread線程執(zhí)行的是target的run()方法.


Callable與Future

Callable接口提供一個call()方法作為線程執(zhí)行體,相比于run(),call()可以有返回值,還可以聲明拋出異常.但它并不是Runnable接口的子接口, 所以不能直接作為target執(zhí)行.因此Java又提供了Future接口來代表Callable中call()方法的返回值,并提供java.util.concurrent.FutureTask類實現(xiàn)Callable與Runnable接口(其實現(xiàn)了RunnableFuture接口,該接口同時繼承了Runnable Future),以作為Thread的target.

Future提供如下方法控制與其關(guān)聯(lián)的Callable:

方法釋義
boolean cancel(boolean mayInterruptIfRunning)Attempts to cancel execution of this task.
V get()Waits if necessary for the computation to complete, and then retrieves its result.
V get(long timeout, TimeUnit unit)Waits if necessary for at most the given time for the computation to complete, and then retrieves its result, if available.
boolean isCancelled()Returns true if this task was cancelled before it completed normally.
boolean isDone()Returns true if this task completed.

Callable創(chuàng)建并啟動線程的步驟如下:

  • 實現(xiàn)Callable接口并重寫call()方法;
  • 使用FutureTask類包裝Callable對象;
  • 將FutureTask實例提交給Thread并啟動新線程;
  • 使用FutureTask的get()獲取子線程執(zhí)行結(jié)束后的返回值.
/*** @author jifang* @since 16/1/20下午3:00.*/ public class CallableStart {public static void main(String[] args) throws ExecutionException, InterruptedException {RunnableFuture<Integer> task = new FutureTask<>(new ConcreteCallable());new Thread(task).start();while (true) {System.out.println("主線程在干其他事情...");if (task.isDone()) {System.out.println("子線程返回值: " + task.get());break;}Thread.sleep(5);}}private static class ConcreteCallable implements Callable<Integer> {@Overridepublic Integer call() throws Exception {int total = 0;for (int i = 0; i < 100; ++i) {Thread.sleep(10);total += i;}return total;}} }

由于實現(xiàn)Runnable和Callable的方式可以讓多個線程共享同一個target,因此適用于多個線程處理同一份資源的情況,從而將CPU/代碼/數(shù)據(jù)分開.


線程生命周期

當線程被new出并start后,他既不是馬上就進入執(zhí)行狀態(tài), 也不會一直處于執(zhí)行狀態(tài), 一個線程會經(jīng)過新建NEW -> 就緒RUNNABLE -> 運行RUNNING -> 阻塞BLOCKED -> 死亡DEAD五種狀態(tài)切換.

1. 新建New

當new出一個Thread后,該線程處于新建狀態(tài),此時他和其他Java對象一樣,僅由JVM為其分配內(nèi)存.并沒有表現(xiàn)出任何線程的動態(tài)特征.

2. 就緒Runnable

當線程對象調(diào)用start()后,該線程處于就緒狀態(tài),JVM會為其創(chuàng)建方法調(diào)用棧(Stack Trace)/線程控制塊/程序計數(shù)器(PC),處于這個狀態(tài)的線程表示是可以運行的.但何時運行,取決于JVM里線程調(diào)度器的調(diào)度.

3. 運行Running

如果處于就緒狀態(tài)的線程一旦獲得了CPU,就開始執(zhí)行run()方法中的線程執(zhí)行體,則線程進入運行狀態(tài).

4. 阻塞Blocked

當發(fā)生如下情況時,線程會進入阻塞狀態(tài)

  • 線程調(diào)用sleep()主動放棄處理器;
  • 線程調(diào)用阻塞IO, 其IO資源未到;
  • 線程試圖獲得同步監(jiān)視器, 但同步監(jiān)視器被其他線程持有;
  • 線程等待某個通知wait();
  • 調(diào)用了線程的suspend()方法(該方法將導(dǎo)致線程掛起,但這樣容易導(dǎo)致死鎖,不建議使用[詳細見線程同步]).

當前線程被阻塞之后, 其他線程就可以獲得執(zhí)行的機會.

當發(fā)生如下情況, 線程可以解除阻塞, 重新進入就緒:

  • 線程sleep()到達指定時間;
  • 阻塞IO返回;
  • 成功獲得同步監(jiān)視器;
  • 線程收到了其他線程發(fā)出的通知notify();
  • 被suspend()的線程被調(diào)用了resume()恢復(fù)方法;

被阻塞的線程會在合適的時候重新進入就緒狀態(tài).

5. 線程死亡

  • run() / call()方法執(zhí)行完成, 線程正常結(jié)束;
  • 線程拋出未捕獲的Exception或Error;
  • 直接調(diào)用線程的stop()方法結(jié)束該線程(該方法容易導(dǎo)致死鎖,不建議使用).

一旦子線程啟動起來后,就擁有和父線程相同的地位,不會受父線程的任何影響(因此當主線程結(jié)束時,其他線程不會同主線程一起結(jié)束).

為了測試某個線程是否生存, 可以調(diào)用Thread實例的isAlive()方法(就緒/運行/阻塞返回true, 新建/死亡返回false).

不要試圖對已經(jīng)死亡的線程調(diào)用start()方法, 死亡線程將不可再次作為線程執(zhí)行.否則會拋出java.lang.IllegalThreadStateException.


線程池

線程池會在系統(tǒng)啟動時即創(chuàng)建大量空閑線程,然后將一個Runnable/Callable對象提交給線程池,池就會分配/創(chuàng)建一個線程來執(zhí)行他們的run()/call(),任務(wù)執(zhí)行結(jié)束,該線程并不會死亡,而是再次返回池中變?yōu)榭臻e狀態(tài),等待執(zhí)行下一個任務(wù);

線程池不僅可以避免每當有新任務(wù)就啟動一個新線程帶來的系統(tǒng)開銷,而且可以有效控制系統(tǒng)中并發(fā)線程的數(shù)量,一旦系統(tǒng)中的線程超過一定數(shù)量,將導(dǎo)致系統(tǒng)性能劇烈下降,甚至JVM崩潰,而線程池可以設(shè)置最大線程數(shù)以防止線程數(shù)超標.

Java提供java.util.concurrent.Executors工廠類來生產(chǎn)線程池, 該工廠類提供如下靜態(tài)方法:

方法釋義
static ExecutorService newCachedThreadPool()Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.
static ExecutorService newFixedThreadPool(int nThreads)Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue.
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.
static ExecutorService newSingleThreadExecutor()Creates an Executor that uses a single worker thread operating off an unbounded queue.
static ScheduledExecutorService newSingleThreadScheduledExecutor()Creates a single-threaded executor that can schedule commands to run after a given delay, or to execute periodically.

上面這些方法還有都有一個重載方法,需要使用java.util.concurrent.ThreadFactory參數(shù),ThreadFactory是一個接口,用于自定義線程的創(chuàng)建策略.

1.java.util.concurrent.ExecutorService代表盡快執(zhí)行任務(wù)的線程池,當有任務(wù)執(zhí)行時,只需將RunnableCallable實例submit()給線程池就好(只池中有空閑線程,就立即執(zhí)行任務(wù)),ExecutorService提供如下方法來提交任務(wù):

方法描述
<T> Future<T> submit(Callable<T> task)Submits a value-returning task for execution and returns a Future representing the pending results of the task.
Future<?> submit(Runnable task)Submits a Runnable task for execution and returns a Future representing that task.
<T> Future<T> submit(Runnable task, T result)Submits a Runnable task for execution and returns a Future representing that task.

Java為ExecutorService提供了一個java.util.concurrent.ThreadPoolExecutor實現(xiàn)類,該類有如下構(gòu)造方法:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {// ... }

因此, 如果默認的線程池策略(如最[小/大]線程數(shù)/線程等待時間)不能滿足我們的需求,我們可以自定義線程池策略.

2.ScheduledExecutorService線程池是ExecutorService的子接口,代表可以在指定延遲后或周期性執(zhí)行線程任務(wù).它提供了如下方法來提交任務(wù):

方法
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

其釋義可以參考JDK文檔;

/*** @author jifang* @since 16/1/20下午9:47.*/ public class ThreadPool {public static void main(String[] args) {ExecutorService pool = getThreadPool();pool.submit(new ConcreteRunnable());pool.submit(new ConcreteRunnable());pool.shutdown();}private static ExecutorService getThreadPool() {return Executors.newCachedThreadPool(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r);}});// return Executors.newCachedThreadPool();// return Executors.newFixedThreadPool(2);// return Executors.newSingleThreadExecutor();}private static class ConcreteRunnable implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; ++i) {try {Thread.sleep(10);} catch (InterruptedException e) {throw new RuntimeException(e);}System.out.println(Thread.currentThread().getName() + " " + i);}}}}
  • 使用自定義策略的線程池,提交Callable任務(wù)
public class ThreadPool {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = getThreadPool();Future<Integer> task1 = pool.submit(new ConcreteCallable());Future<Integer> task2 = pool.submit(new ConcreteCallable());System.out.println(task1.isDone());System.out.println(task2.isDone());pool.shutdown();System.out.println("task1 " + task1.get());System.out.println("task2 " + task2.get());}private static ExecutorService getThreadPool() {return new ThreadPoolExecutor(5, 20, 20L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}private static class ConcreteCallable implements Callable<Integer> {@Overridepublic Integer call() throws Exception {int sum = 0;for (int i = 0; i < 100; ++i) {Thread.sleep(10);sum += i;}return sum;}}}

用完一個線程池后, 應(yīng)該調(diào)用該線程池的shutdown()方法,該方法將啟動線程池關(guān)閉序列,不再接收新任務(wù),但會將以前所有已提交的任務(wù)盡快執(zhí)行完成.所有任務(wù)都執(zhí)行完,池中所有線程都會死亡.


線程控制

Java提供了一些工具方法,通過這些方法可以控制線程的執(zhí)行.


Join

join()方法可以一個讓線程等待另一個線程執(zhí)行完成: 調(diào)用線程被阻塞,知道被join()的線程執(zhí)行完成.
該方法通常由主線程調(diào)用,將大問題劃分成小問題,每個小問題分配一個線程執(zhí)行,當所有的小問題處理完成,再由主線程來做最后處理.如多線程排序,將一個大的排序任務(wù)分割為幾個小塊,分配給幾個線程,當所有子線程執(zhí)行完成后,再由主線程進行歸并:

/*** @author jifang* @since 16/1/21上午11:18.*/ public class MultiThreadSort {private static final int THREAD_COUNT = 12; /*12個線程分段排序*/private static final int NUMBER_COUNT = 201600;private static final int PER_COUNT = NUMBER_COUNT / THREAD_COUNT;private static final int RANDOM_LIMIT = 10000000;public static void main(String[] args) throws InterruptedException {// 為數(shù)組分配隨機值, 為了方便查看, 為其分配10000000以內(nèi)的值Random random = new Random();int[] array = new int[NUMBER_COUNT];for (int i = 0; i < NUMBER_COUNT; ++i) {array[i] = random.nextInt(RANDOM_LIMIT);}List<Thread> threadList = new LinkedList<>();for (int index = 0; index < THREAD_COUNT; ++index) {Thread t = new Thread(new SortRunnable(array, index * PER_COUNT, (index + 1) * PER_COUNT));t.start();threadList.add(t);}// 等待線程排序完成join(threadList);// 分段合并int[] result = merge(array, PER_COUNT, THREAD_COUNT);if (check(result)) {System.out.println("correct");}}private static boolean check(int[] array) {for (int i = 0; i < array.length - 1; ++i) {if (array[i] > array[i + 1]) {System.out.println("error");return false;}}return true;}private static void join(List<Thread> threads) throws InterruptedException {for (Thread thread : threads) {thread.join();}}/*** 分段合并** @param array 已經(jīng)分段排好序的數(shù)組* @param size 每段的長度* @param count 一共的段數(shù)* @return*/private static int[] merge(int[] array, int size, int count) {// indexes保存array每段的起始位置int[] indexes = new int[count];for (int i = 0; i < count; ++i) {indexes[i] = i * size;}int[] result = new int[array.length];// i保存result下標for (int i = 0; i < result.length; ++i) {int minNumber = Integer.MAX_VALUE;int minIndex = 0;// 內(nèi)層for循環(huán)的作用: 找出這count段中最小的那個值for (int index = 0; index < indexes.length; ++index) {// indexes[index]: 當前段的起始位置if ((indexes[index] < (index + 1) * size) && (array[indexes[index]] < minNumber)) {minNumber = array[indexes[index]];minIndex = index;}}result[i] = minNumber;indexes[minIndex]++;}return result;}private static class SortRunnable implements Runnable {private int[] array;private int start;private int end;public SortRunnable(int[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}@Overridepublic void run() {// 分段排序Arrays.sort(array, start, end);}} }

join()還其他重載形式,可以設(shè)定主調(diào)線程的最長等待時間.


后臺線程

后臺線程的任務(wù)是為其他線程提供服務(wù),又被成為”守護線程”, JVM的垃圾回收線程就是典型的后臺守護線程.
調(diào)用Thread對象的setDaemon(true)方法可以將指定線程設(shè)置成后臺線程(在start()之前),isDaemon()可以判斷是否為后臺線程(主線程默認是非后臺線程, 非后臺線程創(chuàng)建的默認是非后臺線程, 反之亦然).

后臺線程的特征: 所有前臺線程死亡, 后臺線程會自動死亡.


Sleep

前面多次看到在線程的執(zhí)行過程中調(diào)用sleep()讓線程睡眠(進入阻塞狀態(tài)),以模擬耗時的操作. 其方法簽名如下:

static void sleep(long millis) throws InterruptedException;

由于sleep()會拋出CheckedException,因此可以將其包裝一下:

/*** @author jifang* @since 16/1/23 下午9:17.*/ public class SleepUtil {public static void sleep(long millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {throw new RuntimeException(e);}} }

sleep()還有重載形式, 但不常用.


Yield

yield()方法讓當前正在執(zhí)行的線程暫停,但不是阻塞線程,而是讓該線程轉(zhuǎn)入就緒狀態(tài),重新等待調(diào)度.
實際上,當某個線程調(diào)用yield()方法讓出處理器資源后,只有優(yōu)先級與當前線程相同,或優(yōu)先級比當前線程更高的處于就緒的線程才會獲得執(zhí)行機會, 因此完全有可能線程轉(zhuǎn)入就緒后,調(diào)度器又將其調(diào)度出來重新執(zhí)行.

注意: yield()方法可移植性并不是很好, 而且很有可能導(dǎo)致死鎖.所以并不推薦使用(詳細見線程同步).


線程優(yōu)先級

每個線程都具有一定的優(yōu)先級,優(yōu)先級高的線程獲得更多的執(zhí)行機會;默認情況下,每個子線程的優(yōu)先級與子父線程相同(默認main線程具有普通優(yōu)先級).
Thread類提供了setPriority(int newPriority)/getPriority()方法來設(shè)置/獲取線程優(yōu)先級.newPriority的范圍為1~10,但由于這些級別需要操作系統(tǒng)的支持,但不同操作系統(tǒng)的優(yōu)先級策略并不相同,因此推薦使用Thread類提供了三個靜態(tài)常量進行設(shè)置:

/*** The minimum priority that a thread can have.*/ public final static int MIN_PRIORITY = 1;/*** The default priority that is assigned to a thread.*/ public final static int NORM_PRIORITY = 5;/*** The maximum priority that a thread can have.*/ public final static int MAX_PRIORITY = 10; /*** @author jifang* @since 16/1/21上午11:12.*/ public class ThreadPriority {public static void main(String[] args) {Thread low = new Thread(new PriorityRunnable(), "low");low.setPriority(Thread.MIN_PRIORITY);Thread mid = new Thread(new PriorityRunnable(), "mid");mid.setPriority(Thread.NORM_PRIORITY);Thread high = new Thread(new PriorityRunnable(), "high");high.setPriority(Thread.MAX_PRIORITY);start(low, mid, high);}private static void start(Thread... threads) {for (Thread thread : threads) {thread.start();}}private static class PriorityRunnable implements Runnable {@Overridepublic void run() {for (int i = 0; i < 10; ++i) {System.out.println(Thread.currentThread().getName() + " " + i);}}} }

線程同步

模擬銀行取錢的場景,無線程同步:

  • 賬戶
/*** 銀行賬戶** @author jifang* @since 16/1/21下午2:05.*/ public class Account {private double balance;public Account() {}public Account(double balance) {this.balance = balance;}public double getBalance() {return balance;}public void reduceBalance(double count) {this.balance -= count;} }
  • 甲/乙線程取錢
/*** @author jifang* @since 16/1/21下午2:09.*/ public class DrawMoney {public static void main(String[] args) {Runnable r = new DrawRunnable(new Account(800), 300);new Thread(r, "甲").start();new Thread(r, "乙").start();}private static class DrawRunnable implements Runnable {private final Account account;private double money;public DrawRunnable(Account account, double money) {this.account = account;this.money = money;}@Overridepublic void run() {while (true) {if (account.getBalance() > money) {System.out.println(Thread.currentThread().getName() + "取錢" + money + "成功");try {Thread.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}account.reduceBalance(money);System.out.println("\t" + Thread.currentThread().getName() + "成功后的余額: " + account.getBalance());} else {System.out.println(Thread.currentThread().getName() + "取錢失敗");System.out.println("\t" + Thread.currentThread().getName() + "失敗后的余額: " + account.getBalance());break;}}}} }

運行如上程序, 很有可能會產(chǎn)生余額為負的情況(但現(xiàn)實中這是不可能存在的,出現(xiàn)這樣的結(jié)果就說明是我們的程序出錯了).


synchronized

之所以會出現(xiàn)上面的情況, 是因為run()方法不具有線程安全性(當賬戶余額為500時, 甲乙兩個線程的account.getBalance() > money都返回true(為了增大這類事件產(chǎn)生的概率,線程會在判斷完之后會sleep1毫秒以等待另一個線程),這樣兩個線程都會去取款300,因此會導(dǎo)致余額出現(xiàn)-100的情況).
為了解決該問題, Java多線程引入了同步監(jiān)視器synchronized關(guān)鍵字;被synchronized保護的代碼稱之為同步代碼塊,線程開始執(zhí)行同步代碼塊之前, 必須先獲得對同步監(jiān)視器的鎖定.

任何時刻只能有一個線程獲得對同步監(jiān)視器的鎖定,當同步代碼執(zhí)行完后,該線程會自動釋放對同步監(jiān)視器的鎖定.

@Override public void run() {while (true) {synchronized (account) {if (account.getBalance() > money) {System.out.println(Thread.currentThread().getName() + "取錢" + money + "成功");try {Thread.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}account.reduceBalance(money);System.out.println("\t" + Thread.currentThread().getName() + "成功后的余額: " + account.getBalance());} else {System.out.println(Thread.currentThread().getName() + "取錢失敗");System.out.println("\t" + Thread.currentThread().getName() + "失敗后的余額: " + account.getBalance());break;}}} }

推薦使用可能被并發(fā)訪問的共享資源作為同步監(jiān)視器.

synchronized關(guān)鍵詞還可以用于修飾方法,該方法稱之為同步方法,同步方法鎖定的同步監(jiān)視器是this,也就是調(diào)用方法的對象.我們可將取錢的操作移動到Account中,并將該類改造成線程安全類:

/*** @author jifang* @since 16/1/21下午2:05.*/ public class Account {// ...public synchronized boolean draw(double money) {if (getBalance() > money) {System.out.println(Thread.currentThread().getName() + "取錢" + money + "成功");try {Thread.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}balance -= money;System.out.println("\t" + Thread.currentThread().getName() + "成功后的余額: " + getBalance());return true;} else {System.out.println(Thread.currentThread().getName() + "取錢失敗");System.out.println("\t" + Thread.currentThread().getName() + "失敗后的余額: " + getBalance());return false;}} }

這樣Thread的run()方法則變得非常簡單:

@Overridepublic void run() {while (account.draw(money)) {}}

synchronized可以修飾方法,也可以修改代碼塊,但不能修飾構(gòu)造器,成員變量等.


同步監(jiān)視器釋放

釋放同步監(jiān)視器鎖定:

  • 當前線程的同步方法/同步代碼塊執(zhí)行結(jié)束, 釋放同步監(jiān)視器;
  • 當前線程在同步代碼塊/同步方法中遇到break/return終止該代碼塊/方法的執(zhí)行, 釋放同步監(jiān)視器.
  • 當前線程在同步代碼塊/同步方法中出現(xiàn)了未處理的Error/Exception, 導(dǎo)致異常結(jié)束, 釋放同步監(jiān)視器.
  • 當前線程調(diào)用了同步對象的wait()方法,當前線程暫停,并釋放同步監(jiān)視器.

不釋放同步監(jiān)視器:

  • 程序調(diào)用Thread.sleep()/Thread.yield()方法暫停當前線程執(zhí)行.
  • 其他線程調(diào)用當前線程的suspend()方法將線程掛起.

wait/notify

現(xiàn)在系統(tǒng)中有兩個線程,分別執(zhí)行存錢/取錢,考慮這樣一種特殊的需求:”要求存錢者和取錢著不斷地重復(fù)存錢/取錢動作,同時規(guī)定不允許連續(xù)兩次存錢, 也不允許兩次連續(xù)取錢”.
可以借助Object類提供的wait()/notify()/notifyAll()三個方法來完成這一需求,但這三個方法必須由同步監(jiān)視器對象來調(diào)用,因此可以分為以下兩種情況:

  • 使用synchronized修飾的同步方法, 由于this就是同步監(jiān)視器,所以可以在同步方法里面直接調(diào)用這三個方法.
  • 使用synchronized修飾的同步代碼塊, 由于同步監(jiān)視器是synchronized括號中的對象, 所以必須使用該對象調(diào)用這三個方法.
方法釋義
void wait()Causes the current thread to wait until another thread invokes the notify() method or the notifyAll() method for this object. (注意: 調(diào)用wait()方法的當前線程會釋放對同步監(jiān)視器的鎖定)
void notify()Wakes up a single thread that is waiting on this object’s monitor.
void notifyAll()Wakes up all threads that are waiting on this object’s monitor.
/*** @author jifang* @since 16/1/24 下午4:35.*/ public class Account {private double balance = 0.0;/*haveBalance標識當前賬戶是否還有余額*/private boolean haveBalance = false;public double getBalance() {return balance;}/*** 取錢** @param amount*/public synchronized void draw(double amount) throws InterruptedException {// 如果沒有存款, 則釋放鎖定, 持續(xù)等待while (!haveBalance) {wait();}System.out.printf("%s執(zhí)行取錢操作", Thread.currentThread().getName());balance -= amount;System.out.printf(", 當前余額%f%n", balance);haveBalance = false;// 喚醒其他線程notifyAll();}/*** 存錢** @param amount*/public synchronized void deposit(double amount) throws InterruptedException {// 如果有存款, 則釋放鎖定, 持續(xù)等待while (haveBalance) {wait();}System.out.printf("%s執(zhí)行存錢操作", Thread.currentThread().getName());balance += amount;System.out.printf(", 當前余額%f%n", balance);haveBalance = true;// 喚醒其他線程notifyAll();} } public class Depositor {public static void main(String[] args) {Account account = new Account();new Thread(new DrawMethod(account, 100), "- 取錢者").start();new Thread(new DepositMethod(account, 100), "+ 存錢者").start();}private static class DrawMethod implements Runnable {private Account account;private double amount;public DrawMethod(Account account, double amount) {this.account = account;this.amount = amount;}@Overridepublic void run() {while (true) {try {account.draw(amount);SleepUtil.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}}}}private static class DepositMethod implements Runnable {private Account account;private double amount;public DepositMethod(Account account, double amount) {this.account = account;this.amount = amount;}@Overridepublic void run() {while (true) {try {SleepUtil.sleep(500);account.deposit(amount);} catch (InterruptedException e) {throw new RuntimeException(e);}}}} }

Lock

從1.5開始,Java提供了另外一種線程同步機制Lock,Lock提供比synchronized更廣泛的鎖定操作,并且支持多個相關(guān)的Condition.
java.util.concurrent.locks.Lock和java.util.concurrent.locks.ReadWriteLock是Java提供的兩類鎖的根接口,并且為Lock提供了ReentrantLock/ReentrantReadWriteLock.ReadLock/ReentrantReadWriteLock.WriteLock實現(xiàn), 為ReadWriteLock提供ReentrantReadWriteLock實現(xiàn).

Lock很容易實現(xiàn)對共享資源的互斥訪問:每次只能有一個線程對Lock加鎖,線程在訪問共享資源之前應(yīng)先獲得Lock對象并lock(), 在訪問結(jié)束之后要unlock().

ReentrantLock表示可重入鎖,也就是說一個線程可以對已被加鎖的ReentrantLock鎖再次加鎖,ReentrantLock對象會維持一個計數(shù)器來追蹤lock()方法的嵌套調(diào)用,所以一段被鎖保護的代碼可以調(diào)用另一個被相同鎖保護的方法.

  • 使用Lock實現(xiàn)生產(chǎn)者/消費者模式
/*** 注意: add/reduce一定要確保使用的是同一把鎖** @author jifang* @since 16/1/21下午4:52.*/ public class Repository {/*使用mutex保護count*/private final Lock mutex = new ReentrantLock();private int count;private int limit;public Repository(int count, int limit) {this.count = count;this.limit = limit;}private boolean canAdd(int count) {return this.count + count <= limit;}private boolean canReduce(int count) {return this.count - count >= 0;}public boolean add(int count) {try {// + 加鎖mutex.lock();if (canAdd(count)) {SleepUtil.sleep(80);this.count += count;return true;}} finally {// + 解鎖mutex.unlock();}return false;}public boolean reduce(int count) {try {// - 加鎖mutex.lock();if (canReduce(count)) {SleepUtil.sleep(80);this.count -= count;return true;}} finally {// - 解鎖mutex.unlock();}return false;}public int getCount() {return count;} } /*** @author jifang* @since 16/1/21下午5:02.*/ public class ProducerConsumer {public static void main(String[] args) {ExecutorService pool = getExecutor();Repository repository = new Repository(0, 100);pool.submit(new Producer(repository, 30));pool.submit(new Producer(repository, 30));pool.submit(new Consumer(repository, 1));pool.submit(new Consumer(repository, 2));}private static ExecutorService getExecutor() {return new ThreadPoolExecutor(10, 20, 20, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());}private static class Producer implements Runnable {private Repository repository;private int produceCount;public Producer(Repository repository, int produceCount) {this.repository = repository;this.produceCount = produceCount;}@Overridepublic void run() {while (true) {try {Thread.sleep(700);} catch (InterruptedException e) {throw new RuntimeException(e);}if (repository.add(produceCount)) {System.out.println("+ 生產(chǎn)者" + Thread.currentThread().getName() + " 生產(chǎn)<" + produceCount + ">個產(chǎn)品 -- 成功");} else {System.out.println("+ 生產(chǎn)者" + Thread.currentThread().getName() + " 生產(chǎn)<" + produceCount + ">個產(chǎn)品 -- 失敗");}System.out.println("當前倉庫共有" + repository.getCount() + "個產(chǎn)品");}}}private static class Consumer implements Runnable {private Repository repository;private int consumeCount;public Consumer(Repository repository, int consumeCount) {this.repository = repository;this.consumeCount = consumeCount;}@Overridepublic void run() {while (true) {try {Thread.sleep(300);} catch (InterruptedException e) {throw new RuntimeException(e);}if (repository.reduce(consumeCount)) {System.out.println("- 消費者" + Thread.currentThread().getName() + " 消費<" + consumeCount + ">個產(chǎn)品 -- 成功");} else {System.out.println("- 消費者" + Thread.currentThread().getName() + " 消費<" + consumeCount + ">個產(chǎn)品 -- 失敗");}System.out.println("當前倉庫共有" + repository.getCount() + "個產(chǎn)品");}}} }

使用Lock對象進行線程同步, 當加鎖/釋放鎖出現(xiàn)在不同的作用范圍時, 通常建議使用finally塊來確保一定會釋放鎖.

相對于Lock, synchronized則更加方便,可以避免很多鎖的常見編程錯誤,但Lock卻提供了synchronized所沒有功能,比如用于tryLock()嘗試加鎖, 獲取可中斷鎖的lockInterruptibly(), 還有在等待時間內(nèi)獲取鎖的tryLock(long time, TimeUnit unit)方法.


Condition

使用Lock來保證互斥時,Java提供了一個java.util.concurrent.locks.Condition來保持協(xié)調(diào),使用Condition可以讓那些已經(jīng)得到Lock的線程無法繼續(xù)執(zhí)行的而釋放Lock,也可以喚醒其他等待在該Condition上的線程.
Condition實例需要綁定在一個Lock對象上,使用Lock來保護Condition,因此調(diào)用Lock的newCondition()方法才能獲得Condition實例. 他提供了如下方法:

方法釋義
void await()Causes the current thread to wait until it is signalled or interrupted(當前線程會釋放對Lock的鎖定).
void signal()Wakes up one waiting thread.
void signalAll()Wakes up all waiting threads.

下面使用Condition來模擬生產(chǎn)者/消費者模型(當倉庫中有產(chǎn)品時才能消費, 當倉庫未滿時才能生產(chǎn)):

public class Repository {/*使用mutex保護condition*/private final Lock mutex = new ReentrantLock();private final Condition addCondition = mutex.newCondition();private final Condition reduceCondition = mutex.newCondition();private int count;private int limit;public Repository(int count, int limit) {this.count = count;this.limit = limit;}private boolean canAdd(int count) {return this.count + count <= limit;}private boolean canReduce(int count) {return this.count - count >= 0;}public void add(int count) {try {// + 加鎖mutex.lock();while (!canAdd(count)) {System.out.printf("+... 生產(chǎn)者 %s is waiting...%n", Thread.currentThread().getName());addCondition.await();}this.count += count;System.out.printf("+ %s生產(chǎn)成功, 當前產(chǎn)品數(shù)%d%n", Thread.currentThread().getName(), getCount());// 喚醒消費線程reduceCondition.signalAll();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {// + 解鎖mutex.unlock();}SleepUtil.sleep(80);}public void reduce(int count) {try {// - 加鎖mutex.lock();while (!canReduce(count)) {System.out.printf("-... 消費者 %s is waiting...%n", Thread.currentThread().getName());reduceCondition.await();}this.count -= count;System.out.printf("- %s消費成功, 當前產(chǎn)品數(shù)%d%n", Thread.currentThread().getName(), getCount());// 喚醒生產(chǎn)線程addCondition.signalAll();} catch (InterruptedException e) {throw new RuntimeException(e);} finally {// - 解鎖mutex.unlock();}SleepUtil.sleep(80);}private int getCount() {return count;} } public class ProducerConsumer {public static void main(String[] args) {ExecutorService pool = getExecutor();Repository repository = new Repository(0, 100);pool.submit(new Producer(repository, 30));pool.submit(new Producer(repository, 30));pool.submit(new Consumer(repository, 1));pool.submit(new Consumer(repository, 2));}private static ExecutorService getExecutor() {return new ThreadPoolExecutor(10, 20, 20, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());}private static class Producer implements Runnable {private Repository repository;private int produceCount;public Producer(Repository repository, int produceCount) {this.repository = repository;this.produceCount = produceCount;}@Overridepublic void run() {while (true) {repository.add(produceCount);}}}private static class Consumer implements Runnable {private Repository repository;private int consumeCount;public Consumer(Repository repository, int consumeCount) {this.repository = repository;this.consumeCount = consumeCount;}@Overridepublic void run() {while (true) {repository.reduce(consumeCount);}}} }

ThreadLocal

java.lang.ThreadLocal和其他所有的同步機制一樣,都是為了解決多個線程中對同一共享變量(臨界資源)的訪問沖突問題,不過其他同步機制(synchronized/Lock)都是通過加鎖來實現(xiàn)多個線程對同一變量的安全訪問. 而ThreadLocal則從另外一個角度來解決這個問題– 將臨界資源進行復(fù)制(使其不再是臨界資源),每個線程都擁有一份,從而也就沒有必要再對其同步.
ThreadLocal代表一個線程局部變量,他為每一個使用該變量的線程都提供一個副本,使得每一個線程都可以獨立的改變自己的副本,而不會和其他的線程沖突(好像每個線程都完全擁有該變量).

ThreadLocal提供如下方法:

方法釋義
T get()Returns the value in the current thread’s copy of this thread-local variable.
void remove()Removes the current thread’s value for this thread-local variable.
void set(T value)Sets the current thread’s copy of this thread-local variable to the specified value.
/*** @author jifang* @since 16/1/23 下午6:14.*/ public class Account {private ThreadLocal<Long> balance = new ThreadLocal<>();public Account(Long balance) {this.balance.set(balance);}public Long getBalance() {return balance.get();}public void setBalance(Long balance) {this.balance.set(balance);} } /*** @author jifang* @since 16/1/23 下午6:10.*/ public class ThreadLocalClient {public static void main(String[] args) {// 同一個AccountAccount account = new Account(888L);new Thread(new Writer(account)).start();new Thread(new Reader(account)).start();// 主線程for (int i = 0; i < 10; ++i) {sleep(10);System.out.println(Thread.currentThread().getName() + "-balance: " + account.getBalance());}}private static class Writer implements Runnable {private Account account;public Writer(Account account) {this.account = account;}@Overridepublic void run() {for (int i = 0; i < 10; ++i) {account.setBalance((long) i);sleep(10);System.out.println(Thread.currentThread().getName() + "-balance: " + account.getBalance());}}}private static class Reader implements Runnable {private Account account;public Reader(Account account) {this.account = account;}@Overridepublic void run() {for (int i = 0; i < 10; ++i) {sleep(10);System.out.println(Thread.currentThread().getName() + "-balance: " + account.getBalance());}}}private static void sleep(long millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {throw new RuntimeException(e);}} }

可以看到, 雖然在主線程里面對Account設(shè)置的初始值, 但是到子線程里面還是默認值.

建議: 如果多個線程之間需要共享資源, 以達到線程之間通信的目的, 就使用同步機制;如果僅僅需要隔離多個線程之間的共享沖突, 則可以選擇使用ThreadLocal.


線程通信

BlockingQueue

Java 1.5 提供了java.util.concurrent.BlockingQueue接口, 雖然他也是Queue的子接口, 但他的主要用途不是作為容器, 而是用作線程同步的工具 –多個線程通過交替向BlockingQueue放入/取出數(shù)據(jù),從而完成線程間通信的目的.

BlockingQueue提供如下方法用于線程間通信:

Java為BlockingQueue提供了如下實現(xiàn):

  • ArrayBlockingQueue: 基于數(shù)組實現(xiàn)的BlockingQueue.
  • LinkedBlockingQueue: 基于鏈表實現(xiàn)的BlockingQueue.
  • SynchronousQueue: 同步隊列, 對隊列的存/取操作必須交替進行.
  • PriorityBlockingQueue: 并不是標準的隊列,其take/poll/remove方法返回的是隊列中最小元素(其大小關(guān)系可以通過實現(xiàn)Comparator接口進行定制).
  • DelayQueue: 一個特殊的BlockingQueue,底層基于PriorityBlockingQueue實現(xiàn).DelayQueue要求集合元素必須實現(xiàn)Delayed接口(該接口里面只有一個long getDelay(TimeUnit unit)方法),DelayQueue根據(jù)集合元素的getDelay()方法的返回值進行排序.

使用BlockingQueue實現(xiàn)生產(chǎn)者/消費者模型

/*** @author jifang* @since 16/1/21下午9:11.*/ public class ConsumerProducer {private static final int CORE_POOL_SIZE = 5;private static final int MAX_POOL_SIZE = 20;private static final int KEEP_ALIVE_TIME = 20;public static void main(String[] args) {BlockingQueue<String> queue = getQueue();Producer producer = new Producer(queue, 1000);Consumer consumer = new Consumer(queue);ExecutorService pool = getThreadPool();pool.submit(producer);pool.submit(consumer);pool.submit(consumer);}private static BlockingQueue<String> getQueue() {return new ArrayBlockingQueue<>(10);}private static ExecutorService getThreadPool() {return new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());}private static class Producer implements Runnable {private BlockingQueue<String> queue;private int count;public Producer(BlockingQueue<String> queue, int count) {this.queue = queue;this.count = count;}@Overridepublic void run() {Random random = new Random();while (count-- != 0) {try {Thread.sleep(100);queue.put("product " + random.nextInt(100));} catch (InterruptedException e) {throw new RuntimeException(e);}}}}private static class Consumer implements Runnable {private BlockingQueue<String> queue;public Consumer(BlockingQueue<String> queue) {this.queue = queue;}@Overridepublic void run() {while (true) {try {String product = queue.take();System.out.println(Thread.currentThread().getName() + " " + product);Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}}} }

Disruptor


最近師兄技術(shù)分享提到了并發(fā)框架Disruptor,原因是最近在做數(shù)據(jù)遷移,需要多個線程之間進行數(shù)據(jù)通信,將原來老庫中的數(shù)據(jù)讀出,中間處理一道之后寫入新庫.第一次聽到它不免驚訝一番(竟然還有這么牛逼的東西(⊙﹏⊙)),于是就在網(wǎng)上找了幾篇文章,了解了一下他的應(yīng)用場景/使用方法和基本原理,現(xiàn)炒現(xiàn)賣整理如下,詳細可參考我列在后面的參考文檔.


Disruptor入門示例

  • 事件
    事件(Event)就是通過 Disruptor 進行交換的數(shù)據(jù)類型
/*** @author jifang* @since 16/1/22 下午8:49.*/ public class IntegerEvent {private int value;public int getValue() {return value;}public void setValue(int value) {this.value = value;} }
  • 事件工廠
    事件工廠(EventFactory)定義了如何初始化Disruptor的環(huán)形緩沖區(qū)RingBuffer
/*** 用來產(chǎn)生Event事件(填充默認的buffer)** @author jifang* @since 16/1/22 下午8:50.*/ public class IntegerEventFactory implements EventFactory<IntegerEvent> {@Overridepublic IntegerEvent newInstance() {return new IntegerEvent();} }
  • 事件消費者
    定義了消費者線程該如何處理事件Event
public class IntegerConsumer implements EventHandler<IntegerEvent> {@Overridepublic void onEvent(IntegerEvent event, long sequence, boolean endOfBatch) throws Exception {System.out.println("sequence: " + sequence);System.out.println("endOfBatch: " + endOfBatch);System.out.println("event-value: " + event.getValue());System.out.println("\t" + Thread.currentThread().getName());} }
  • 事件生產(chǎn)者
    定義了生產(chǎn)者向RingBufferpush哪些數(shù)據(jù),Disruptor的事件發(fā)布過程分為三個階段:
    • 從RingBuffer中獲取下一個可以寫入的序列號sequence;
    • 獲取sequence對應(yīng)的事件對象Event,將數(shù)據(jù)寫入Event;
    • 將填充好的Event提交到RingBuffer.
public class IntegerProducer {private RingBuffer<IntegerEvent> buffer;public IntegerProducer(RingBuffer<IntegerEvent> buffer) {this.buffer = buffer;}public void pushData(int value) {long sequence = buffer.next();try {//該event從buffer中拿出(當環(huán)形buffer填滿時, 你能看到event里面的值)IntegerEvent event = buffer.get(sequence);event.setValue(value);} finally {buffer.publish(sequence);}} }

注意: buffer.publish(sequence)需放在finally中以確保一定會得到調(diào)用.如果某個請求的sequence未被提交,將會阻塞當前/其他Producer發(fā)布事件.

附: Disruptor還提供另一種發(fā)布事件的形式,簡化以上操作并確保publish()總是得到調(diào)用:

public class IntegerProducer {private RingBuffer<IntegerEvent> buffer;public IntegerProducer(RingBuffer<IntegerEvent> buffer) {this.buffer = buffer;}private EventTranslatorOneArg<IntegerEvent, Integer> translator = new EventTranslatorOneArg<IntegerEvent, Integer>() {@Overridepublic void translateTo(IntegerEvent event, long sequence, Integer value) {event.setValue(value);}};public void pushData(int value) {buffer.publishEvent(translator, value);} }

Disruptor要求publish()必須得到調(diào)用(即使發(fā)生異常也要調(diào)用),那么就需要調(diào)用者在事件處理時判斷事件攜帶的數(shù)據(jù)是否是正確/完整的,而不是由Disruptor來保證.

  • 啟動Disruptor
public class Client {private IntegerEventFactory factory;private int ringBufferSize;private ExecutorService executorService;@Beforepublic void setUp() throws Exception {factory = new IntegerEventFactory();ringBufferSize = 8;executorService = new ThreadPoolExecutor(5, 20, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20));}@Testpublic void client() {Disruptor<IntegerEvent> disruptor = new Disruptor<>(factory, ringBufferSize, executorService);// 關(guān)聯(lián)消費者disruptor.handleEventsWith(new IntegerConsumer());// 子線程從buffer消費數(shù)據(jù)disruptor.start();// 主線程向buffer生產(chǎn)數(shù)據(jù)IntegerProducer producer = new IntegerProducer(disruptor.getRingBuffer());for (int i = 0; i < 100; ++i) {try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}producer.pushData(i);}disruptor.shutdown();executorService.shutdown();} }

Disruptor選項

Disruptor可以根據(jù)不同的軟件/硬件來調(diào)整選項以獲得更高的性能. 其基本的選項有兩個:單生產(chǎn)者模式設(shè)定和可選的等待策略.

  • 單生產(chǎn)者模式

    在并發(fā)系統(tǒng)中提高性能的方式之一就是單一寫者原則:如果你的代碼中僅有一個事件生產(chǎn)者,那么可以在生成Disruptor時將其設(shè)置為單一生產(chǎn)者模式來提高系統(tǒng)的性能.

  • 可選的等待策略
    Disruptor定義了com.lmax.disruptor.WaitStrategy接口用于抽象Consumer如何等待新事件,其默認策略是BlockingWaitStrategy

public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize) {return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy()); }

這個策略的內(nèi)部使用一個鎖Lock和條件變量Condition來控制線程的執(zhí)行和等待.因此是最慢的等待策略(但也是CPU使用率最低和最穩(wěn)定的策略).但Disruptor提供了根據(jù)不同的部署環(huán)境調(diào)整等待的方法(與單生產(chǎn)者模式一樣, 也是需要在構(gòu)建Disruptor實例時指定), Disruptor提供了如下幾種常用策略.

策略描述
SleepingWaitStrategySleepingWaitStrategy方式是循環(huán)等待并且在循環(huán)中間調(diào)用LockSupport.parkNanos()來睡眠.它的優(yōu)點在于生產(chǎn)線程只需要計數(shù),而不執(zhí)行任何指令;并且沒有條件變量的消耗,因此和BlockingWaitStrategy一樣,SleepingWaitStrategy的CPU使用率也比較低.但是,事件對象從生產(chǎn)者到消費者傳遞的延遲變大了.因此最好用在不需要低延遲,而事件發(fā)布對于生產(chǎn)者的影響比較小的情況下,如異步日志.
YieldingWaitStrategyYieldingWaitStrategy是可以被用在低延遲系統(tǒng)中的兩個策略之一,這種策略在減低系統(tǒng)延遲的同時也會增加CPU運算量.該策略會循環(huán)等待sequence增加到合適值(循環(huán)中調(diào)用Thread.yield()以允許其他準備好的線程執(zhí)行);如果需要高性能而且事件消費者線程比邏輯內(nèi)核少,推薦使用YieldingWaitStrategy策略,如:在開啟超線程時.
BusySpinWaitStrategyBusySpinWaitStrategy是性能最高的等待策略,同時也是對部署環(huán)境要求最高的策略(CPU空轉(zhuǎn)等待).這個策略最好用在要求極高性能且事件處理線數(shù)小于CPU邏輯核心數(shù)的場景中,如在禁用超線程技術(shù)時.

我們指定單生產(chǎn)者模式和BusySpinWaitStrategy來啟動上例:

public class Client {// ...@Testpublic void client() {// 指定單生產(chǎn)者模式,與BusySpinWaitStrategy策略Disruptor<IntegerEvent> disruptor = new Disruptor<>(factory, ringBufferSize, executorService, ProducerType.SINGLE, new BusySpinWaitStrategy());//...} }

當程序跑起來之后,可以查看機器的top輸出,直觀的感受一下Disruptor是多么消耗CPU…


線程安全集合

包裝不安全集合

平常使用的ArrayList LikedList HashMap HashSet LinkedHashMap LinkedHashSet TreeMap TreeSet等java.util包下的集合(除HashTable Vactor除外)都是線程不安全的, 當多個線程并發(fā)向這些集合中存/取數(shù)據(jù)時, 就可能會破獲這些集合的數(shù)據(jù)完整性.
因此java.util.Collections類提供了一些工具方法來將這些線程不安全的集合包裝成線程安全的集合.

方法釋義
static <T> Collection<T> synchronizedCollection(Collection<T> c)Returns a synchronized (thread-safe) collection backed by the specified collection.
static <T> List<T> synchronizedList(List<T> list)Returns a synchronized (thread-safe) list backed by the specified list.
static <K,V> Map<K,V> synchronizedMap(Map<K,V> m)Returns a synchronized (thread-safe) map backed by the specified map.
static <T> Set<T> synchronizedSet(Set<T> s)Returns a synchronized (thread-safe) set backed by the specified set.
static <K,V> SortedMap<K,V> synchronizedSortedMap(SortedMap<K,V> m)Returns a synchronized (thread-safe) sorted map backed by the specified sorted map.
static <T> SortedSet<T> synchronizedSortedSet(SortedSet<T> s)Returns a synchronized (thread-safe) sorted set backed by the specified sorted set.
/*** @author jifang* @since 16/1/23 下午9:07.*/ public class ListClient {private static List<Integer> list = Collections.synchronizedList(new ArrayList<Integer>());private static Runnable runnable = new Runnable() {@Overridepublic void run() {for (int i = 0; i < 100; ++i) {SleepUtil.sleep(10);list.add(i);}}};public static void main(String[] args) {new Thread(runnable).start();new Thread(runnable).start();} }

注意: 如果需要將某個集合包裝成線程安全集合, 需要在創(chuàng)建之后立即包裝.


線程安全集合

從1.5開始, Java在java.util.concurrent包下提供了大量支持高效并發(fā)訪問的接口和實現(xiàn).

從UML類圖可以看出,線程安全的集合類可分為以Concurrent開頭和以CopyOnWrite開頭的兩類.

以Concurrent開頭的集合代表了支持并發(fā)訪問的集合, 他們可以支持多個線程并發(fā)讀/寫, 這些寫入操作保證是線程安全的,而讀取操作不必鎖定, 因此在并發(fā)寫入時有較好的性能.

  • ConcurrentLinkedQueue實現(xiàn)了多線程的高效訪問, 訪問時無需等待, 因此當多個線程共享訪問一個公共集合時, ConcurrentLinkedQueue是一個較好的選擇(不允許使用null元素).
  • ConcurrentHashMap默認支持16個線程并發(fā)寫入(可以通過concurrencyLevel構(gòu)造參數(shù)修改), 當線程數(shù)超過concurrencyLevel時, 可能有一些線程需要等待.

由于ConcurrentLinkedQueue ConcurrentHashMap支持并發(fā)訪問, 所以當使用迭代器來遍歷集合時, 該迭代器可能不能反映出創(chuàng)建迭代器之后所做的修改, 但程序不會拋出異常.

以CopyOnWrite開頭的集合采用了寫時復(fù)制技術(shù).

  • CopyOnWriteArrayList采用底層復(fù)制數(shù)組的方式來實現(xiàn)寫入操作: 當線程執(zhí)行讀操作時, 線程會直接讀取集合本身, 無須加鎖和阻塞;但當線程對CopyOnWriteArrayList集合執(zhí)行寫入操作(add/remove/set)時, 該集合會在底層復(fù)制一份新數(shù)組, 然后對新數(shù)組執(zhí)行寫入操作.由于對CopyOnWriteArrayList的寫入是針對副本執(zhí)行, 因此它是線程安全的.

注意: 由于CopyOnWriteArrayList的寫入操作需要頻繁的復(fù)制數(shù)組,因此寫入性能較差;但由于讀操作不用加鎖(不是同一個數(shù)組),因此讀操作非常快. 因此, CopyOnWriteArrayList適合在讀操作遠遠大于寫操作的場景中, 如緩存.

  • 由于CopyOnWriteArraySet底層封裝的是CopyOnWriteArrayList, 因此他的實現(xiàn)機制完全類似于CopyOnWriteArrayList.

未處理異常

從1.5開始, Java增強了線程的異常處理功能,如果線程執(zhí)行過程中拋出了一個未處理異常,JVM會在結(jié)束該線程之前查找是否有對應(yīng)的java.lang.Thread.UncaughtExceptionHandler對象,如果找到了Handler對象, 則會調(diào)用該對象的void uncaughtException(Thread thread, Throwable exception)方法來處理該異常.
Thread提供了如下兩個方法來設(shè)置異常處理器:

方法釋義
static void setDefaultUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh)Set the default handler invoked when a thread abruptly terminates due to an uncaught exception, and no other handler has been defined for that thread.
void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler eh)Set the handler invoked when this thread abruptly terminates due to an uncaught exception.
/*** @author jifang* @since 16/1/23 下午5:50.*/ public class Main {private static Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {@Overridepublic void uncaughtException(Thread t, Throwable e) {System.out.println(t.getName());e.printStackTrace();}};public static void main(String[] args) {Thread.setDefaultUncaughtExceptionHandler(handler);int a = 5 / 0;System.out.println("線程正常結(jié)束, a=" + a);} }
參考:
Disruptor 極速體驗
Disruptor入門
并發(fā)框架Disruptor譯文
Linux多線程實踐(9) –簡單線程池的設(shè)計與實現(xiàn)
Linux多線程實踐(8) –Posix條件變量解決生產(chǎn)者消費者問題
Linux多線程實踐(7) –多線程排序?qū)Ρ?/dd>
Linux多線程實踐(4) –線程特定數(shù)據(jù)
Linux多線程實踐(1) –線程理論
Linux IPC實踐(7) –Posix消息隊列
進程同步的基本概念:臨界資源、同步和互斥
瘋狂Java講義
Effective java(第2版)

總結(jié)

以上是生活随笔為你收集整理的Java 并发基础的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。