think-in-java(21)并发
【README】
并發后半部分(并發2,從21.4.3中斷開始)參見:?https://blog.csdn.net/PacosonSWJTU/article/details/106878087?;?
本文章包括了并發1和并發2 ,期初新建并發2是為了方便編寫文檔,因為并發內容實屬太多,所以分了2次post;
【21.2】基本線程機制
并發編碼使我們可以將程序劃分為多個分離的,獨立運行的任務;?
cpu將輪流給每個任務分配其占用時間。
?
【21.2.1】定義任務
線程可以驅動任務,一種描述任務的方式是使用 Runnable 接口;
方式1:直接調用 Runnalbe接口的run 方法創建線程驅動任務;
/*** 用 Runnable 接口定義任務*/ public class LiftOff implements Runnable {protected int countDown = 10;private static int taskCount = 0;private final int id = taskCount++;public LiftOff(){}public LiftOff(int countDown) {this.countDown = countDown;}public String status() {return "#" + id + "(" + (countDown > 0? countDown: "liftoff") + "), "; }@Override public void run() {while(countDown-- > 0 ) {System.out.println(status());Thread.yield(); // 當前線程轉入可運行狀態,把cpu時間片讓步給其他線程 }}public static void main(String[] args) {LiftOff obj = new LiftOff();obj.run(); // 這里直接調用 Runnalbe接口的run 方法創建線程驅動任務} }運行結果:
#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(liftoff),?
【21.2.2】Thread類
開啟線程的第2種方式,使用Thread來驅動任務;調用thread 的start方法,start方法會調用 runnable 接口實現類的run 方法;
/*** 開啟線程的第2種方式,使用Thread來驅動任務 */ public class BasicThreads {public static void main(String[] args) {Thread t = new Thread(new LiftOff());t.start();System.out.println("waiting for liftOff"); } }main 方法通過主線程來驅動,而 LiftOff中run方法的邏輯通過main方法分發的子線程來驅動;
運行結果:
waiting for liftOff #0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(liftoff),啟動多個線程運行多個任務,可以看到線程切換的動作
/*** 啟動多個線程運行多個任務,可以看到線程切換的動作 */ public class MoreBasicThreads {public static void main(String[] args) {for (int i=0; i< 5; i++) {new Thread(new LiftOff()).start();}System.out.println("waiting for lift off");} } /*waiting for lift off #3(9), #3(8), #4(9), #4(8), #4(7), #0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #1(9), #1(8), #1(7), #2(9), #2(8), #1(6), #1(5), #1(4), #1(3), #1(2), #0(2), #0(1), #0(liftoff), #4(6), #3(7), #4(5), #4(4), #4(3), #1(1), #1(liftoff), #2(7), #4(2), #4(1), #3(6), #4(liftoff), #2(6), #3(5), #2(5), #2(4), #2(3), #3(4), #3(3), #3(2), #3(1), #3(liftoff), #2(2), #2(1), #2(liftoff), */通過主線程顯式創建多個子線程的問題:
主線程創建多個子線程,每個子線程Thread 都注冊了他自己,內存存在對他的引用,所以在子線程退出其 run 方法之前,垃圾回收器無法清除它, 這不便于內存回收與分配;
?
【21.2.3】使用 Executor 執行器(CachedThreadPool、FixedThreadPool、SingleThreadExecutor )
1、使用 Executor 執行器管理Thread線程對象, 可以簡化并發編程,且處理線程占用的內存回收事宜;
Executor允許你管理異步任務的執行, 而無需顯式地管理線程的生命周期 。
Executor執行器在 java5或6中是啟動任務的優選方法;
荔枝1、基于 newCachedThreadPool實現線程池
/*** page657/線程池* shutdown 方法調用可以防止新任務被提交給 Executor, 當前線程* 將繼續運行在 shutdown被調用之前提交的所有任務 * @author */ public class CachedThreadPool {public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();for (int i=0; i<5; i++) {exec.execute(new LiftOff());}/*shutdown 方法調用可以防止新任務被提交給 Executor, 當前線程* 將繼續運行在 shutdown被調用之前提交的所有任務 */exec.shutdown();} } /*#1(9), #2(9), #2(8), #2(7), #3(9), #4(9), #4(8), #4(7), #4(6), #0(9), #4(5), #4(4), #3(8), #2(6), #1(8), #1(7), #1(6), #1(5), #2(5), #3(7), #3(6), #4(3), #0(8), #0(7), #4(2), #4(1), #3(5), #3(4), #3(3), #3(2), #3(1), #3(liftoff), #2(4), #1(4), #1(3), #1(2), #2(3), #4(liftoff), #0(6), #2(2), #1(1), #1(liftoff), #2(1), #0(5), #2(liftoff), #0(4), #0(3), #0(2), #0(1), #0(liftoff), */常見的是: 單個Executor 執行器被用來創建和管理系統中的所有任務;
荔枝2、基于 newFixedThreadPool實現線程池
newFixedThreadPool 可以一次性預先執行代價高昂的線程分配,因此可以限制線程數量
/*** page 657 * newFixedThreadPool線程池 */ public class FixedThreadPool {public static void main(String[] args) {// newFixedThreadPool 可以一次性預先執行代價高昂的線程分配,因此可以限制線程數量 ExecutorService exec = Executors.newFixedThreadPool(5);for (int i=0; i<5; i++) {exec.execute(new LiftOff());}/*shutdown 方法調用可以防止新任務被提交給 Executor, 當前線程* 將繼續運行在 shutdown被調用之前提交的所有任務 */exec.shutdown();} } /*#0(9), #2(9), #2(8), #3(9), #3(8), #4(9), #1(9), #1(8), #4(8), #4(7), #3(7), #3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(liftoff), #2(7), #0(8), #2(6), #4(6), #1(7), #4(5), #4(4), #4(3), #2(5), #0(7), #0(6), #0(5), #0(4), #0(3), #2(4), #4(2), #4(1), #4(liftoff), #1(6), #2(3), #0(2), #0(1), #0(liftoff), #2(2), #1(5), #1(4), #2(1), #1(3), #2(liftoff), #1(2), #1(1), #1(liftoff), */注意:CachedThreadPool 與 FixedThreadPool 線程池的區別: CacheThreadPool 在程序執行過程中通常會創建于所需數量相同的線程,然后在他回收舊線程時停止創建新線程, CachedThreadPool 是合理的的 Executor執行器的首選。?
而FixedThreadPool 是可以限制線程數量的線程池,只有當 CachedThreadPool 出現問題時,才需要切換到 FixedThreadPool;?
?
荔枝3: SingleThreadPool 是 線程數量為1的 FixedThreadPool。?
如果向 SingleThreadPool 提交多個任務, 這些任務將排隊,每個任務都會在下一個任務開始之前結束,所有任務使用相同的線程。因為 SingleThreadExecutor 會序列化所有提交給他的任務,并會維護他自己隱藏的懸掛任務隊列。
/*** page 657 /* newSingleThreadExecutor 類似于線程數量為1的 FixedThreadPool */ public class SingleThreadExecutor {public static void main(String[] args) {ExecutorService exec = Executors.newSingleThreadExecutor();for (int i=0; i<5; i++) {exec.execute(new LiftOff());}exec.shutdown(); } } /*#0(9), #0(8), #0(7), #0(6), #0(5), #0(4), #0(3), #0(2), #0(1), #0(liftoff), #1(9), #1(8), #1(7), #1(6), #1(5), #1(4), #1(3), #1(2), #1(1), #1(liftoff), #2(9), #2(8), #2(7), #2(6), #2(5), #2(4), #2(3), #2(2), #2(1), #2(liftoff), #3(9), #3(8), #3(7), #3(6), #3(5), #3(4), #3(3), #3(2), #3(1), #3(liftoff), #4(9), #4(8), #4(7), #4(6), #4(5), #4(4), #4(3), #4(2), #4(1), #4(liftoff), */【21.2.4】 從任務中產生返回值
若任務在執行完成時需要返回值,則使用 Callable 而不是 Runnable 來描述任務;
/*** page 658 * newCachedThreadPool:任務執行完成后可以返回執行結果 */ public class CallableDemo {public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();ArrayList<Future<String>> results = new ArrayList<>();for (int i=0; i<10; i++) {// submit 方法會產生 Future 對象, 他用 Callable返回結果的特定類型進行了參數化 results.add(exec.submit(new TaskWithResult(i))); // 驅動或運行任務使用 ExecutorService.submit() 方法 }for (Future<String> fu : results) {try {System.out.println(fu.get()); // 結果值 } catch (Exception e) {e.printStackTrace();} finally {exec.shutdown(); }}System.out.println("我是main線程");} } /*result of task with result 0 result of task with result 1 result of task with result 2 result of task with result 3 result of task with result 4 result of task with result 5 result of task with result 6 result of task with result 7 result of task with result 8 result of task with result 9 我是main線程 */ class TaskWithResult implements Callable<String> {private int id; public TaskWithResult(int id) {this.id = id ;}@Overridepublic String call() throws Exception {return "result of task with result " + id;} }顯然, 在子線程全部返回前,主線程是阻塞的,因為 主線程打印的消息在所有子線程返回結果之后;
【21.2.5】休眠
方法1: sleep方法 讓線程休眠給定時間,然后又重新回到可運行狀態;
方法2:yield方法表示:當前線程的重要任務已經運行完畢了, 讓出占用的cpu時間片給其他線程;
/*** page659 * Thread.sleep 線程休眠 */ public class SleepingTask extends LiftOff {public void run() {try {while (countDown-- > 0) {System.out.println(status());Thread.sleep(1000); // 當前線程休眠1秒鐘 (老式方法)TimeUnit.SECONDS.sleep(1);// 休眠1秒鐘 (推薦方法)}} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();for (int i=0; i<3; i++) {exec.execute(new SleepingTask());}exec.shutdown();} } /*#0(9), #2(9), #1(9), #2(8), #0(8), #1(8), #1(7), #2(7), #0(7), #2(6), #0(6), #1(6), #2(5), #1(5), #0(5), #0(4), #1(4), #2(4), #2(3), #0(3), #1(3), #1(2), #2(2), #0(2), #2(1), #0(1), #1(1), #1(liftoff), #2(liftoff), #0(liftoff), */Thread.sleep(1000); // 當前線程休眠1秒鐘 (老方法)
TimeUnit.SECONDS.sleep(1);// 休眠1秒鐘 (新方法,java5或6推薦)
【21.2.6】優先級
線程優先級將線程重要性傳遞給了調度器,優先級低的線程僅僅是執行頻率較低。
試圖通過控制線程優先級是一種錯誤。因為cpu的時間片劃分是未知的,可能碰到中斷,如io,所以不建議使用優先級,這里僅僅看下代碼;
/*** page660 * 線程優先級*/ public class SimplePriority implements Runnable {private int countDown = 5; private volatile double d; // volative 確保變量不被任何編譯器優化(指令優化) private int priority;public SimplePriority(int priority) {this.priority = priority;}public String toString() {return Thread.currentThread() + ":" + countDown;}@Override public void run() {Thread.currentThread().setPriority(priority); // 設置線程優先級while(true) {for (int i=1; i<100000; i++) {d += (Math.PI + Math.E) / (double) i;if (i % 1000 == 0) {Thread.yield(); // 當前線程釋放cpu時間片給其他線程}System.out.println(this);if (--countDown ==0) return ; }}}public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();for (int i=0; i< 5; i++) {exec.execute(new SimplePriority(Thread.MIN_PRIORITY)); // 最小優先級}exec.execute(new SimplePriority(Thread.MAX_PRIORITY)); // 最高優先級 exec.shutdown(); } } /*Thread[pool-1-thread-3,1,main]:5 Thread[pool-1-thread-3,1,main]:4 Thread[pool-1-thread-6,10,main]:5 Thread[pool-1-thread-6,10,main]:4 Thread[pool-1-thread-5,1,main]:5 Thread[pool-1-thread-1,1,main]:5 Thread[pool-1-thread-4,1,main]:5 Thread[pool-1-thread-2,1,main]:5 Thread[pool-1-thread-2,1,main]:4 Thread[pool-1-thread-2,1,main]:3 Thread[pool-1-thread-2,1,main]:2 Thread[pool-1-thread-2,1,main]:1 Thread[pool-1-thread-4,1,main]:4 Thread[pool-1-thread-1,1,main]:4 Thread[pool-1-thread-1,1,main]:3 Thread[pool-1-thread-1,1,main]:2 Thread[pool-1-thread-1,1,main]:1 Thread[pool-1-thread-5,1,main]:4 Thread[pool-1-thread-6,10,main]:3 Thread[pool-1-thread-3,1,main]:3 Thread[pool-1-thread-3,1,main]:2 Thread[pool-1-thread-6,10,main]:2 Thread[pool-1-thread-6,10,main]:1 Thread[pool-1-thread-5,1,main]:3 Thread[pool-1-thread-5,1,main]:2 Thread[pool-1-thread-4,1,main]:3 Thread[pool-1-thread-5,1,main]:1 Thread[pool-1-thread-3,1,main]:1 Thread[pool-1-thread-4,1,main]:2 Thread[pool-1-thread-4,1,main]:1 */【21.2.7】讓步
1、當前線程調用 Thread.yield方法將給線程調度器一個暗示:
我的工作已經完成了, 可以讓別的線程使用cpu時間片了,但這里僅僅是一個暗示,沒有任何機制保證它將被采納;
注意: yield方法經常被誤用。
【21.2.8】 后臺線程
1、后臺線程:指在程序運行的時候在后臺提供一種通用服務的線程,并且這種線程并不屬于程序中不可或缺的部分。
當所有非后臺線程結束時, 程序也就終止了,同時會殺死進程中的所有后臺線程。
2、后天線程 daemon 荔枝:??
/*** page662 * 后臺線程 daemon.setDaemon(true); 后臺線程不影響非后臺線程的結束,如main主線程就是非后臺線程 * 當所有非后臺線程結束時,程序終止了,同事會殺死進程中的所有后臺線程 */ public class SimpleDaemons implements Runnable {@Override public void run() {try {while(true) {Thread.sleep(1000);System.out.println(Thread.currentThread() + " " + this);}} catch (Exception e){e.printStackTrace();}}public static void main(String[] args) {for (int i=0; i<10; i++) {Thread daemon = new Thread(new SimpleDaemons());daemon.setDaemon(true); // 在 start 方法前調用,設置為后臺線程 daemon.start(); }System.out.println("all daemons started");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}} }?
3、通過 ThreadFactory 線程工廠創建后臺線程?
/*** page662 * 編寫定制的 ThreadFactory 可以定制由 Executor 創建的線程屬性*/ public class DaemonThreadFactory implements ThreadFactory {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);// 設置為后臺線程return t; } } /*** page 663 * 基于 ThreadFactory 創建后臺線程 */ public class DaemonFromFactory implements Runnable {@Overridepublic void run() {try {while(true) { // Thread.sleep(1000);TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread() + " " + this);}} catch (Exception e) {e.printStackTrace();} }public static void main(String[] args) { /*DaemonThreadFactory創建的全是后臺線程*/ExecutorService exec = Executors.newCachedThreadPool(new DaemonThreadFactory()); for (int i=0; i<10; i++) {exec.execute(new DaemonFromFactory());}System.out.println("all daemons started");try { // Thread.sleep(3000);TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace();} } } /*all daemons started Thread[Thread-8,5,main] diy.chapter21.DaemonFromFactory@12b68fd4 Thread[Thread-7,5,main] diy.chapter21.DaemonFromFactory@1ce6edff Thread[Thread-9,5,main] diy.chapter21.DaemonFromFactory@50f7270b Thread[Thread-0,5,main] diy.chapter21.DaemonFromFactory@4c19ebce Thread[Thread-5,5,main] diy.chapter21.DaemonFromFactory@70c680f2 Thread[Thread-2,5,main] diy.chapter21.DaemonFromFactory@3812167c Thread[Thread-3,5,main] diy.chapter21.DaemonFromFactory@2b0a5613 Thread[Thread-6,5,main] diy.chapter21.DaemonFromFactory@15547dba Thread[Thread-1,5,main] diy.chapter21.DaemonFromFactory@16c1c280 Thread[Thread-4,5,main] diy.chapter21.DaemonFromFactory@58bbe46a Thread[Thread-8,5,main] diy.chapter21.DaemonFromFactory@12b68fd4 Thread[Thread-7,5,main] diy.chapter21.DaemonFromFactory@1ce6edff Thread[Thread-2,5,main] diy.chapter21.DaemonFromFactory@3812167c Thread[Thread-1,5,main] diy.chapter21.DaemonFromFactory@16c1c280 Thread[Thread-0,5,main] diy.chapter21.DaemonFromFactory@4c19ebce Thread[Thread-6,5,main] diy.chapter21.DaemonFromFactory@15547dba Thread[Thread-9,5,main] diy.chapter21.DaemonFromFactory@50f7270b Thread[Thread-3,5,main] diy.chapter21.DaemonFromFactory@2b0a5613 Thread[Thread-5,5,main] diy.chapter21.DaemonFromFactory@70c680f2 Thread[Thread-4,5,main] diy.chapter21.DaemonFromFactory@58bbe46a Thread[Thread-8,5,main] diy.chapter21.DaemonFromFactory@12b68fd4 Thread[Thread-7,5,main] diy.chapter21.DaemonFromFactory@1ce6edff */4、?自定義線程執行器
/*** 自定義線程執行器-DaemonThreadPoolExecutor * ThreadPoolExecutor extends AbstractExecutorService* AbstractExecutorService implements ExecutorService* interface ExecutorService extends Executor*/ public class DaemonThreadPoolExecutor extends ThreadPoolExecutor {public DaemonThreadPoolExecutor() {super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new DaemonThreadFactory());} } /*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters and default rejected execution handler.** @param corePoolSize the number of threads to keep in the pool, even* if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the* pool* @param keepAliveTime when the number of threads is greater than* the core, this is the maximum time that excess idle threads* will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are* executed. This queue will hold only the {@code Runnable}* tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor* creates a new thread* @throws IllegalArgumentException if one of the following holds:<br>* {@code corePoolSize < 0}<br>* {@code keepAliveTime < 0}<br>* {@code maximumPoolSize <= 0}<br>* {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}* or {@code threadFactory} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}5、后臺線程創建的任何線程都將被自動設置為后臺線程
/*** page663* 后臺線程創建的任何線程都將被自動設置為后臺線程 */ public class Daemons {public static void main(String[] args) {Thread d = new Thread(new Daemon());d.setDaemon(true); // 主線程是后臺線程 ,其創建的10個線程也是后臺線程 d.start();System.out.println("d.isDaemon() = " + d.isDaemon() + ", "); try {TimeUnit.SECONDS.sleep(1); // 睡眠1秒 } catch (InterruptedException e) {e.printStackTrace(); } } } // 后臺線程 class Daemon implements Runnable {private Thread[] t = new Thread[10];@Override public void run() { for (int i=0; i<t.length; i++) {t[i] = new Thread(new DaemonSpawn());t[i].start();System.out.println("daemon spawn" + i + "started");}while(true) {Thread.yield(); // 一直處于可運行狀態,但無法獲取cpu時間片運行 }} } // 任務 class DaemonSpawn implements Runnable {public void run() {while(true) {Thread.yield(); // 把cpu時間片讓給其他線程 }} }6、后臺線程在不執行 finally 子句的情況下就會終止其 run 方法
/*** page664 * 后臺線程在不執行 finally 子句的情況下就會終止其 run 方法* 即終止run方法時,不會執行finally子句中的代碼;* 但把 t.setDaemon(true) 給刪除掉,則會執行 finally 子句中的代碼 */ public class ADaemon implements Runnable {@Overridepublic void run() {try {System.out.println("starting daemon");TimeUnit.SECONDS.sleep(1);} catch (Exception e) {e.printStackTrace();} finally {System.out.println("this should always run? "); // 沒有執行 }}/*** 一旦main 方法退出, jvm就會立即關閉所有的后臺進程;* 因為不能以優雅的方式來關閉后臺線程,故非后臺線程的 Executors 是一種更好的方式, * 因為 Executors 控制的所有任務可以同時被關閉;* @param args*/public static void main(String[] args) {Thread t = new Thread(new ADaemon());t.setDaemon(true);t.start(); } } /*starting daemon */非后臺的Executor 通常是一種更好的方式,因為 Executor控制的所有任務可以同時被關閉;
?
【21.2.9】編碼的變體
1、上面描述任務的方式都是通過實現 Runnable接口, 還有一種方式是繼承 Thread 類;
/*** page665 * 創建線程的第2種方式, 繼承了Thread,就不能繼承其他類了 */ public class SimpleThread extends Thread {private int countDown = 5; private static int threadCount = 0;public SimpleThread() {super(Integer.toString(++threadCount));start();}public String toString() {return "#" + getName() + "(" + countDown + "), ";}@Override public void run() {while(true) {System.out.println(this);if (--countDown == 0) {return ;}}}public static void main(String[] args) {for (int i=0; i< 5; i++) {new SimpleThread(); }} } /*#2(5), #5(5), #5(4), #5(3), #5(2), #5(1), #4(5), #1(5), #3(5), #1(4), #1(3), #1(2), #4(4), #2(4), #2(3), #2(2), #4(3), #1(1), #3(4), #4(2), #2(1), #4(1), #3(3), #3(2), #3(1), */2、第2種方式,比較常見是 實現 Runnable接口;
/*** page666 * 與繼承Thread不同的是,這里是實現 Runnable接口 , * 在構造器中啟動線程可能有問題, 因為另一個任務可能會在構造器結束之前開始執行, * 這意味著該任務能夠訪問處于不穩定狀態的對象, * 這是優選Executor 而不是顯式創建Thread對象的另一個原因*/ public class SelfManaged implements Runnable {private int countDown = 5; private Thread t = new Thread(this);public SelfManaged() {t.start();}public String toString() {return Thread.currentThread().getName() + "(" + countDown +")";}@Overridepublic void run() {while(true) {System.out.println(this);if (--countDown ==0) {return ;}}} public static void main(String[] args) {for(int i=0; i<5; i++) {new SelfManaged();}} }小結:?在構造器中啟動線程可能有問題, 因為另一個任務可能會在構造器結束之前開始執行,??這意味著該任務能夠訪問處于不穩定狀態的對象。?這是優選Executor 而不是顯式創建Thread對象的另一個原因。
3、通過內部類隱藏線程代碼
/*** 使用內部類隱藏線程代碼 */ public class ThreadVariations {public static void main(String[] args) {new InnerThread1("InnerThread1");new InnerThread2("InnerThread2");new InnerRunnable1("InnerRunnable1");new InnerRunnable2("InnerRunnable2");new ThreadMethod("ThreadMethod").runTaks();} } // 一個單獨的方法開啟線程運行任務 class ThreadMethod {private int countDown = 5;private Thread t;private String name;public ThreadMethod(String name) {this.name = name;} public void runTaks() {if (t == null) {t = new Thread(name) {public void run() {try {while (true) {System.out.println(this);if (--countDown == 0)return;sleep(1000);}} catch (Exception e) {e.printStackTrace();}}public String toString() {return getName() + ": " + countDown;}};t.start();}} }// 匿名內部類繼承 Thread class InnerThread2 {private int countDown = 5;private Thread t;public InnerThread2(String name) {t = new Thread(name) {public void run() {try {while (true) {System.out.println(this);if (--countDown == 0)return;sleep(1000);}} catch (Exception e) {e.printStackTrace();}}public String toString() {return getName() + ":" + countDown;}};t.start();} }/*** 匿名內部類實現 Runnable接口*/ class InnerRunnable2 {private int countDown = 5;private Thread t;public InnerRunnable2(String name) {t = new Thread(new Runnable() { // 匿名內部類實現 Runnable接口@Overridepublic void run() {try {while (true) {System.out.println(this);if (--countDown == 0)return;TimeUnit.MILLISECONDS.sleep(1000);}} catch (Exception e) {e.printStackTrace();}}public String toString() {return Thread.currentThread().getName() + ":"+ countDown;}}, name);t.start();}}/*** 內部類繼承線程Thread*/ class InnerThread1 {private int countDown = 5;private Inner inner;public InnerThread1(String name) {inner = new Inner(name);}// 內部類private class Inner extends Thread {Inner(String name) {super(name);start();}@Overridepublic void run() {try {while (true) {System.out.println(this);if (--countDown == 0)return;sleep(1000);}} catch (Exception e) {e.printStackTrace();}}public String toString() {return getName() + ":" + countDown;}} }/*** 內部類實現 Runnable接口*/ class InnerRunnable1 {private int countDown = 5;private Inner inner;public InnerRunnable1(String name) {inner = new Inner(name);}private class Inner implements Runnable {Thread t;Inner(String name) {t = new Thread(this, name);t.start();}public void run() {try {while (true) {System.out.println(this);if (--countDown == 0)return;TimeUnit.MILLISECONDS.sleep(10);}} catch (Exception e) {e.printStackTrace();}}public String toString() {return t.getName() + ":" + countDown;}} } /*InnerThread1:5 InnerThread2:5 InnerRunnable1:5 InnerRunnable2:5 ThreadMethod: 5 InnerRunnable1:4 InnerRunnable1:3 InnerRunnable1:2 InnerRunnable1:1 InnerThread2:4 InnerThread1:4 InnerRunnable2:4 ThreadMethod: 4 InnerThread2:3 InnerThread1:3 InnerRunnable2:3 ThreadMethod: 3 InnerThread2:2 InnerThread1:2 InnerRunnable2:2 ThreadMethod: 2 InnerThread2:1 InnerThread1:1 InnerRunnable2:1 ThreadMethod: 1 */【21.2.11】加入一個線程
Thread.join() 方法: 若線程A 在另一個線程B上調用 B.join(),則線程A將被掛起,直到目標線程B結束才恢復;?
也可以在join()方法上加個超時參數,如果目標線程在超時時間內沒有結束的話,join還是返回;?
/*** page670 * join() 方法: 在 線程a上調用 線程b的join方法,則線程a阻塞* ,直到線程b運行結束,線程a才繼續運行 * 注意:java.util.concurrent類庫包含 CyclicBarrier工具類,比join更加適合使線程讓步*/ public class Joining {public static void main(String[] args) {Sleeper s1 = new Sleeper("s1", 1500);Sleeper s2 = new Sleeper("s2", 1500);Joiner j1 = new Joiner("j1", s1);Joiner j2 = new Joiner("j2", s2);// s2 被強制中斷s2.interrupt(); } } /*** 睡眠線程*/ class Sleeper extends Thread {private int duration ;public Sleeper(String name, int sleepTime) {super(name);this.duration = sleepTime;start();}@Overridepublic void run() {try {sleep(duration);} catch (Exception e) {System.out.println(getName() + " was interrupted, isInterrupted() = " + isInterrupted());return ;}System.out.println("線程 " + getName() + " 已經被喚醒");} }class Joiner extends Thread {private Sleeper sleeper;public Joiner(String name, Sleeper sleeper) {super(name); this.sleeper = sleeper;start();}public void run() {try {sleeper.join(); // 主線程 Joiner 調用 其他線程 sleeper的join 方法, sleeper沒有執行完, 主線程一直阻塞 } catch (Exception e) {System.out.println("interrupted");}System.out.println("線程 " + getName() + " join 完成"); } } /*s2 was interrupted, isInterrupted() = false 線程 j2 join 完成 線程 s1 已經被喚醒 線程 j1 join 完成 */【21.2.12】創建有響應的用戶界面
/*** page 671 * 有響應的用戶界面* (要想程序有響應,就需要把計算程序放在 run方法里,這樣他才能讓出cpu時間片給其他線程)*/ public class ResponsiveUI extends Thread {private static volatile double d = 1; public ResponsiveUI() {setDaemon(true); // 把當前線程設置為后臺線程 start();}@Overridepublic void run() {while(true) {d = d+ (Math.PI + Math.E) / d;}}public static void main(String[] args) throws Exception {// 創建無響應的ui // new UnresponsiveUI();// 創建響應式 ui new ResponsiveUI();System.in.read(); System.out.println(d); } } /*** 無響應的ui */ class UnresponsiveUI {private volatile double d = 1; public UnresponsiveUI() throws Exception { while(d>0) {d = d+ (Math.PI + Math.E) / d;}System.in.read(); // 永遠不會執行到這里 (此謂無響應)} }【21.2.13】線程組
【21.2.14】捕獲子線程(Thread, Runnable子類或線程池子線程)異常
看個荔枝:下面的程序總會拋出異常:
/*** page 672 * 線程異常(捕獲線程異常)*/ public class ExceptionThread implements Runnable {@Overridepublic void run() {throw new RuntimeException();}public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();try {// 把子線程拋出異常的代碼放到 主線程的try-catch 塊里,主線程的try-catch塊無法捕獲的 exec.execute(new ExceptionThread()); } catch (Exception e) {System.out.println("拋出了異常"); }} } /* Exception in thread "pool-1-thread-1" java.lang.RuntimeExceptionat diy.chapter21.ExceptionThread.run(ExceptionThread.java:13)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)*/可見,把main方法放在 try-catch 中沒有作用;?
荔枝2:
/*** page 672 * 捕獲異常 - 還是沒有捕獲異常 */ public class NavieExceptionHandler {public static void main(String[] args) {try {ExecutorService exec = Executors.newCachedThreadPool();exec.execute(new ExceptionThread());} catch(RuntimeException e) {System.out.println("Exception has been handled");}} } /* Exception in thread "pool-1-thread-1" java.lang.RuntimeExceptionat diy.chapter21.ExceptionThread.run(ExceptionThread.java:13)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)*/為了解決無法捕獲線程異常的問題,需要修改Executor產生線程的方式。設置未捕獲異常處理器去捕獲子線程拋出的異常。(干貨——非常重要,如何捕獲子線程異常)
/*** 設置未捕獲異常處理器去捕獲子線程拋出的異常 */ public class CaptureUncaughtException {public static void main(String[] args) {// 傳入帶有未捕獲異常處理器的線程工廠到 線程池以改變線程池創建線程的方式 ExecutorService executorService = Executors.newCachedThreadPool(new MyHandlerThreadFactory());// 運行任務 executorService.execute(new MyExceptionThread()); } } // 線程類 class MyExceptionThread implements Runnable {@Override public void run() {Thread t = Thread.currentThread(); System.out.println("run() by " + t);// getUncaughtExceptionHandler 表示獲取未捕獲異常處理器 System.out.println("異常處理器 = " + t.getUncaughtExceptionHandler());// 拋出運行時異常 throw new RuntimeException(); } } // 類-未捕獲異常處理器 class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {// uncaughtException方法會在線程因未捕獲的異常而臨近死亡時被調用 @Override public void uncaughtException(Thread t, Throwable e) {System.out.println("我是未捕獲異常處理器-MyUncaughtExceptionHandler,我捕獲到的異常信息為" + e); } } // 線程處理器工廠 class MyHandlerThreadFactory implements ThreadFactory {// 定義創建線程的方式 @Overridepublic Thread newThread(Runnable r) {System.out.println(this + " 創建新線程");Thread t = new Thread(r, ""+System.currentTimeMillis());System.out.println("新線程信息==" + t);// 為線程設置未捕獲異常處理器 t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());System.out.println("異常處理器= " + t.getUncaughtExceptionHandler());return t; } } /*diy.chapter21.MyHandlerThreadFactory@4e25154f 創建新線程 新線程信息==Thread[Thread-0,5,main] 異常處理器= diy.chapter21.MyUncaughtExceptionHandler@70dea4e run() by Thread[Thread-0,5,main] 異常處理器 = diy.chapter21.MyUncaughtExceptionHandler@70dea4e diy.chapter21.MyHandlerThreadFactory@4e25154f 創建新線程 新線程信息==Thread[Thread-1,5,main] 異常處理器= diy.chapter21.MyUncaughtExceptionHandler@15547dba caught java.lang.RuntimeException */設置默認的未捕獲異常處理器;
/*** page 674 * 設置默認的未捕獲異常處理器* 這個處理器-MyUncaughtExceptionHandler* 只有在不存在線程專有的未捕獲異常處理器的情況下才會被調用 */ public class SettingDefaultHandler {public static void main(String[] args) {Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler());ExecutorService executorService = Executors.newCachedThreadPool();executorService.execute(new ExceptionThread()); } } /* * 我是未捕獲異常處理器-MyUncaughtExceptionHandler,我捕獲到的異常信息為java.lang.RuntimeException */【21.3】共享受限資源(臨界資源)
【21.3.1】不正確地訪問資源
多個線程在沒有并發控制前提下同時訪問和修改共享資源,導致資源狀態異常, 不滿足業務場景;
【21.3.2】解決共享資源競爭
1、java提供了關鍵字 synchronized 關鍵字,為防止資源沖突提供了內置支持。當任務要執行被 synchronized 關鍵字保護起來的代碼片段的時候,他將檢查鎖是否可用,然后獲取鎖,執行代碼,釋放鎖;?
共享資源一般指的是對象,但也可以是 文件,輸入輸出端口,或者打印機;
對于同一個對象來說,其所擁有的的 synchronized 方法共享同一個鎖,這可以被用來防止多個任務同時訪問同一個對象內存;?
注意: 在使用并發時,將域設置為? private 非常重要, 否則 synchronized 關鍵字就無法防止其他任務直接訪問域了;
2、對于靜態 static數據,synchronized static 方法可以在類的方位內防止對 static數據的并發訪問;?
/** 使用Synchronized 同步 * page 678* */ public class SynchronizedEvenGenerator extends IntGenerator {private int currentEventValue = 0;// synchronized 關鍵字修飾方法以防止多個線程同時訪問該方法@Overridepublic synchronized int next() { ++currentEventValue;Thread.yield();++currentEventValue;return currentEventValue;}public static void main(String[] args) {EvenChecker.test(new MutextEvenGenerator());} }3、什么時候需要同步呢?
如果你正在寫一個變量, 他可能接下來將被另一個線程讀取,或者正在讀取一個上一次已經被另一個線程寫過的變量,那么你必須使用同步, 并且,讀寫線程都必須用相同的監視器鎖同步;?
注意:每個訪問臨界資源的方法都必須被同步,否則他們就不會正確的工作;?
?
4、使用顯式的Lock 對象?
lock() 與 unlock() 方法之間的代碼就是臨界資源;?
/** 使用顯式的 Lock對象 * page 678 * */ public class MutextEvenGenerator extends IntGenerator{private int currentEventValue = 0;private Lock lock = new ReentrantLock();@Overridepublic int next() {lock.lock(); // 加鎖 try {++currentEventValue;Thread.yield();++currentEventValue;return currentEventValue; } finally {lock.unlock(); // 解鎖 }}public static void main(String[] args) {EvenChecker.test(new MutextEvenGenerator());} }干貨(如何編寫基于Lock的并發控制代碼):當你在使用 Lock對象時,將這里所示的慣用法內部化是很重要的。緊接著的對 lock() 方法的調用,你必須吧臨界資源代碼放置在 finally 子句中帶有 unlock() 的 try-finally 語句中。 很在意, return 語句必須在 try 子句中出現,以確保 unlock()方法不會過早發生,從而將數據暴露給第2個任務;
5、synchronzied 與 Lock 對別:
如果使用 synchronized 關鍵字時,某些事物失敗了,那么就會拋出一個異常; 而你沒有機會去做任何清理工作,以維護系統使其處于良好狀態;
但顯式的Lock對象, 你就可以使用 finally 子句將系統維護在正確的狀態了 ;
小結: 通常情況下,推薦使用 synchronized 關鍵字,因為寫的代碼量更少,并且用戶出現錯誤的可能性也會降低;
只有在特殊情況下,才會使用 顯式的 Lock對象 ;?
/*** page 679* 嘗試獲取鎖 */ public class AttemptLocking {/* 可重入鎖 */private ReentrantLock lock = new ReentrantLock();// 獲得鎖,未設置超時時間 public void untimed() {boolean captured = lock.tryLock(); // 獲得鎖 try {System.out.println("trylock() : " + captured);} finally {if (captured) {lock.unlock(); // 在 finally 子句中解鎖 }}}// 獲得鎖,設置了超時時間 public void timed() {boolean captured = false; try {captured = lock.tryLock(2, TimeUnit.SECONDS); // 基于超時控制嘗試獲取鎖,超時時間為3秒 } catch (Exception e) {throw new RuntimeException(); }try {System.out.println(" tryLock(2, TimeUnit.SECONDS): " + captured);} finally {if (captured) { // 是否獲得鎖,若獲得鎖,則解鎖lock.unlock(); // 解鎖 }}}public static void main(String[] args) {final AttemptLocking al = new AttemptLocking();al.untimed();al.timed();new Thread() { // 子線程 {setDaemon(true);} // 設置其為后臺線程 public void run() {al.lock.lock();System.out.println("acquired:");}}.start();Thread.yield(); // 當前線程讓出 cpu時間片 try { TimeUnit.SECONDS.sleep(1); // 主線程睡眠1秒,讓 子線程先獲得鎖 } catch (InterruptedException e) {e.printStackTrace();}al.untimed(); // 沒有設置超時時間的獲得鎖al.timed(); // 設置了超時時間獲得鎖 } } /* trylock() : truetryLock(2, TimeUnit.SECONDS): true acquired: trylock() : falsetryLock(2, TimeUnit.SECONDS): false */Lock小結: 顯式的Lock 對象在加鎖和釋放鎖方面,相對于內建的 synchronized 鎖來說,還賦予了你更細粒度的控制力;?
?
【21.3.3】原子性與易變性
1、原子操作不需要進行同步控制, 原子操作是不能被線程調度機制中斷的 操作;?
2、但是有個問題:
原子性可以應用于 除了 long 和 double 之外的所有基本類型的操作。 因為jvm 可以將64位(long和double遍歷)的讀取與寫入當做兩個分離的32位操作來執行,這就產生了在一個讀取和寫入操作中間發送上下文切換,從而導致不同 的任務可以看到不正確結果的可能。(干貨——這也叫做字撕裂)
但是, 當你定義long 或double變量時,如果使用 volatile關鍵字,就會獲得原子性(干貨——原子性+volatile 可以獲得原子性);
3、可見性或可視性問題: 在多處理器系統上, 相對單處理器系統而言,可視性問題遠比原子性問題多得多。即一個任務作出的修改,即使在不中斷的意義上講是原子性的,但對其他任務也可能是不可見的?;因此不同的任務對應用的狀態是不同的視圖;
4、volatile關鍵字: 確保了應用中的可視性或可見性;只要對 volatile域產生了寫操作,那么所有的讀操作都可以看這個這個修改。即便使用了本地緩存, 情況也是如此。因為volatile域會立即被寫入主存中,而讀取操作就發生在主存中(干貨——因為volatile域會立即被寫入主存中,而讀取操作就發生在主存中,故 volatile可以保證可見性);?
補充1: 理解原子性與易變性是兩個不同的概念很重要。 在非?volatile域上的原子操作不必刷新到主存去,因此其他讀取該域的任務也沒有必要(當然也不會)看到這個新值;如果多個任務同時訪問某個域,那么這個域就應該是 volatile的,否則,這個域就應該同步訪問;?
補充2:當一個域的值依賴于他之前的值時,如遞增一個計數器,則 volatile無法工作;如果某個域的值收到其他域的值的限制,那么volatile也無法工作,如 Range類的 lower 和 upper 邊界就必須遵循lower <= upper的限制;?
小結:使用volatile而不是 synchronized的唯一安全的情況是類中只有一個可變的域。再次提醒,你的第一選擇應該是? synchronized , 這是最安全的方式,而其他方式都是有風險的(干貨);
/*** page 682 * 原子性測試 */ public class AtomicityTest implements Runnable {private int i = 0; public int getValue() { // 這里的 getValue沒有 synchronized 來修飾,有并發問題 return i ;}// 偶數增量 private synchronized void evenIncrement() {i++; i++; // 當執行第1次自加時, 線程切換 導致 i為奇數 }@Overridepublic void run() {while(true) {evenIncrement();}}public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();AtomicityTest at = new AtomicityTest();exec.execute(at);while(true) {int val = at.getValue();if (val % 2 != 0) {System.out.println(val); // 1 或 3 或 7 System.exit(0); }}} } //1 或 3 或 7上面的程序,?evenIncrement 和?getValue方法都應該是? synchronized 關鍵字來修飾,否則會有并發問題;?
/*** page 683* 序列號生成器 */ public class SerialNumberGenerator {private static volatile int serialNumber = 0;public static int nextSerialNumber() { // 非線程安全,因為方法沒有被設置為同步 // 先返回,然后再自增 return serialNumber++; } } *** page 683 * 數字檢測器 */ public class SerialNumberChecker {private static final int SIZE = 10;private static CircularSet serials = new CircularSet(1000);private static ExecutorService exec = Executors.newCachedThreadPool();// 序列數檢測器,保證序列數是唯一的 static class SerialChecker implements Runnable {@Overridepublic void run() {while(true) {int serial = SerialNumberGenerator.nextSerialNumber();if (serials.contains(serial)) { // 序列數字集合是否存在該數字,存在,結束System.out.println("duplicate:" + serial);System.exit(0); // 終止當前虛擬機 }serials.add(serial); // 否則, 把該數字添加到集合 }}}public static void main(String[] args) {for (int i=0; i<SIZE; i++) {exec.execute(new SerialChecker());}if (args.length > 0) {try {TimeUnit.SECONDS.sleep(new Integer(args[0])); // 睡眠 System.out.println("no duplicates detected.");System.exit(0); } catch (Exception e) {e.printStackTrace();} }}/*duplicate:39705duplicate:39704duplicate:38934duplicate:39706duplicate:39707 */ } /*** 循環集合 */ class CircularSet {private int[] array ;private int len;private int index = 0;// 構造方法 public CircularSet(int size) {array = new int[size];len = size; for (int i=0; i<size; i++) {array[i] = -1;}}// 同步加法 public synchronized void add(int i) {array[index] = i ;index = ++index % len; }// 同步是否包含 public synchronized boolean contains(int val) {for (int i=0; i<len ; i++) {if (array[i] == val) return true; }return false; } } //duplicate:1680 //duplicate:1681 //duplicate:1619 //duplicate:1682這里存在線程安全問題, 因為 SerialNumberGenerator.nextSerialNumber 方法 不是synchronized 修飾, 所以才會有重復的數字,所以才會終止,如果把該方法修改為 synchronized, 則程序不會終止;
【21.3.4】原子類: AtomicInteger, AutomicLong, AtomicReference?
提供了原子性條件更新操作: compareAndSet(expectValue, updateValue);?
荔枝: AtomicInteger?
/*** page 684 * 原子類-AtomicInteger-提供原子性條件更新操作: compareAndSet 方法 */ public class AtomicIntegerTest implements Runnable {private AtomicInteger i = new AtomicInteger(0);public int getValue() {return i.get(); }// 原子性加法,用 AtomicInteger 替換了 synchronized 方法 private void evenIncrement() { i.addAndGet(2); // 原子操作, 線程安全 }@Overridepublic void run() {while(true) {evenIncrement(); }}public static void main(String[] args) {Timer timer = new Timer(); // 定時器 timer.schedule(new TimerTask() {@Overridepublic void run() {System.err.println("aborting");System.exit(0); // 終止當前虛擬機 }}, 5000); // 5秒后運行該子線程 ExecutorService exec = Executors.newCachedThreadPool(); // 線程池 AtomicIntegerTest ait = new AtomicIntegerTest();exec.execute(ait);while(true) {int val = ait.getValue();if (val % 2 != 0) {System.out.println(val); // 永遠不會執行到這里,因為 val 一定為偶數System.exit(0); // 終止當前虛擬機 }}} } /*aborting*/強調:Atomic類被設計用來構建 java.util.concurrent中的類,因此只有在特殊情況下才在自己的代碼中使用他們, 即便使用了也需要確保不存在其他可能出現單的問題。通常依賴于鎖更安全一些(要么是 synchronized關鍵字, 要么是顯式的Lock對象)
【21.3.5】臨界區
1、有時臨界區是方法內部的部分代碼而不是整個方法;synchronized 被用來指定某個對象,此對象的鎖被用來對花括號內部代碼進行同步控制; 同步控制塊如下:??
synchronized(synchObject) {
? ? 臨界區代碼?
}
2、使用 synchronized來創建臨界區
public class CriticalSection {static void testApproaches(PairManager manager1, PairManager manager2) {ExecutorService exec = Executors.newCachedThreadPool();PairManipulator pm1 = new PairManipulator(manager1);PairManipulator pm2 = new PairManipulator(manager2);PairChecker checker1 = new PairChecker(manager1);PairChecker checker2 = new PairChecker(manager2);exec.execute(pm1);exec.execute(pm2);exec.execute(checker1);exec.execute(checker2);try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e ) {System.out.println("sleep interrupted");}System.out.println("pm1:" + pm1 + ", pm2 = " + pm2);System.exit(0);}public static void main(String[] args) {PairManager manager1 = new PairManager1();PairManager manager2 = new PairManager2();testApproaches(manager1, manager2); }/*pm1:pair:x = 3, y = 3, checkCounter = 1, pm2 = pair:x = 7, y = 7, checkCounter = 3*/ } // 命名內部類 class Pair {private int x, y ;public Pair(int x, int y) {this.x = x; this.y = y; }public Pair () {this(0, 0);}public int getX() {return x; }public int getY() {return y;}// x 自增1 public void incrementX() {x++;}// y 自增1 public void incrementY() {y++; }public String toString() {return "x = " + x + ", y = " + y; }// 自定義異常類 public class PairValuesNotEqualException extends RuntimeException {public PairValuesNotEqualException() {super("pair values not equal: " + Pair.this);}}// 檢測x與y是否相等 public void checkState() {if (x != y) {throw new PairValuesNotEqualException(); }} } // 對子管理器 abstract class PairManager {AtomicInteger checkCounter = new AtomicInteger(0);protected Pair p = new Pair();private List<Pair> storage = Collections.synchronizedList(new ArrayList<Pair>()); // 加鎖的list// 同步獲取 pair 對象 public synchronized Pair getPair() {return new Pair(p.getX(), p.getY()); }protected void store(Pair p) {storage.add(p);try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {}}public abstract void increment(); } class PairManager1 extends PairManager {public synchronized void increment() {p.incrementX(); // 讓 x 自增 p.incrementY(); // y 自增 this.store(p);} } class PairManager2 extends PairManager {public synchronized void increment() {Pair temp; synchronized(this) { // 同步控制塊 p.incrementX(); // 讓 x 自增 p.incrementY(); // y 自增 temp = this.getPair(); }this.store(temp); } } /*** 對子操縱器 */ class PairManipulator implements Runnable {private PairManager pm ;public PairManipulator(PairManager pm) {this.pm = pm;}@Overridepublic void run() {while(true) {pm.increment();}}public String toString() {return "pair:" + pm.getPair() + ", checkCounter = " + pm.checkCounter.get(); } } // 對子檢測器 class PairChecker implements Runnable {private PairManager pm ; public PairChecker(PairManager pm) {this.pm = pm; }@Overridepublic void run() {while(true) {pm.checkCounter.incrementAndGet();pm.getPair().checkState();}} }3、使用 Lock來創建臨界區
// 使用Lock對象創建臨界區 class ExplicitPairManager1 extends PairManager {private Lock lock = new ReentrantLock();public synchronized void increment() {lock.lock(); // 加鎖 try {p.incrementX();p.incrementY();store(getPair());} finally {lock.unlock(); // 解鎖 }} }【21.3.6】在其他對象上同步
1、synchronized 塊必須給定一個在其上進行同步的對象,并且最合理的方式?是 ,使用其方法正在被調用的當前對象,如 synchronize(this);
class DualSynch {private Object syncObject = new Object(); // public synchronized void f() {public synchronized void f() {synchronized(syncObject) { // 對 syncObject 進行同步控制,獲取的是 syncObject 對象的鎖 for (int i = 0; i < 5; i++) {print("f()");Thread.yield();}}}public void g() {synchronized (syncObject) { // 對 syncObject 進行同步控制,獲取的是 syncObject 對象的鎖 for (int i = 0; i < 5; i++) {print("g()");Thread.yield();}}} } public class SyncObject {public static void main(String[] args) {final DualSynch ds = new DualSynch();new Thread() {public void run() {ds.f();}}.start();ds.g();} } /* g() g() g() g() g() f() f() f() f() f() */【21.3.7】線程本地存儲 ThreadLocal類實現??
1、可以為使用相同變量的每個不同線程都創建不同的存儲;有5個線程都是用變量x多表示的對象,name線程本地存儲就會生成5塊內存;
class Accessor implements Runnable {private final int id;public Accessor(int idn) {id = idn;}public void run() {while (!Thread.currentThread().isInterrupted()) {ThreadLocalVariableHolder.increment();System.out.println(this);Thread.yield();}}public String toString() {return Thread.currentThread().getName() + ", #" + id + ": " + ThreadLocalVariableHolder.get();} }public class ThreadLocalVariableHolder {/** 使用 ThreadLocal 根除不同線程對變量的共享 */private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() {private Random rand = new Random(47);protected synchronized Integer initialValue() {return rand.nextInt(10000);}};public static void increment() {value.set(value.get() + 1);}public static int get() {return value.get();}public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool();for (int i = 0; i < 5; i++)exec.execute(new Accessor(i));TimeUnit.SECONDS.sleep(3); // Run for a whileexec.shutdownNow(); // All Accessors will quit} } /*pool-1-thread-3, #2: 25714 pool-1-thread-3, #2: 25715 pool-1-thread-3, #2: 25716 pool-1-thread-3, #2: 25717*/【21.4】終結任務
/*** 終結任務-裝飾性花園 =獲取多個大門進入公園的總人數,每個大門有計數器 * 本測試案例的作用在于: 并發控制使得,每個線程的計數總和等于 主線程的總計數值*/ public class OrnamentalGarden {public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool(); // 線程池 for(int i=0; i<5; i++) {exec.execute(new Entrance(i)); // 運行任務 }TimeUnit.SECONDS.sleep(3); // 睡眠3秒 Entrance.cancel(); // 所有線程停止運行,即所有大門關閉 exec.shutdown(); // 關閉線程池 // ExecutorService.awaitTermination 方法等待每個任務結束,// 如果所有任務在超時時間達到之前全部結束,則返回true,否則返回false if (!exec.awaitTermination(1000, TimeUnit.MICROSECONDS)) {System.out.println("some tasks were not terminated");}System.out.println("total :" + Entrance.getTotalCount()); // 獲取count,總計數值 System.out.println("sum of entrances:" + Entrance.sumEntrances()); // 所有大門人數相加} }class Count {private int count = 0;private Random random = new Random(47);public synchronized int increment() {// 自增同步方法 int temp = count;if (random.nextBoolean()) {// 獲取隨機布爾值,50%機會為true Thread.yield(); // 當前線程讓出cpu時間片 }return (count = ++temp); // count 自增}public synchronized int value() { // 同步方法-獲取count return count; } } /*** 入口=大門 */ class Entrance implements Runnable {private static Count count = new Count();// 總計數器,所有大門人數計數器 private static List<Entrance> entrances = new ArrayList<>(); // 入口列表 private int number = 0; // 每個大門的人數 private int id = 0;public static volatile boolean canceled = false;// 停止線程運行,默認為false public static void cancel() {canceled = true; }public Entrance(int id) {this.id = id;entrances.add(this);}@Overridepublic void run() {while(!canceled) {synchronized(this) {// 同步塊-獲取當前對象的鎖 ++number; // number 自增 , 每個大門人數自增 }System.out.println(this + ", total:" + count.increment()); // 總計數器自增,即所有大門人數計數器自增 try {TimeUnit.MICROSECONDS.sleep(1000);// 睡眠1秒 } catch (InterruptedException e) {System.out.println("sleep interrupted");}}System.out.println("stopping " + this); }public synchronized int getValue() { // 同步方法-獲取本大門的人數 return number; }public String toString() {return "entrance " + id + ": " + getValue();}public static int getTotalCount() { // 獲取總計數值 return count.value();}public static int sumEntrances() { // 獲取總和int sum = 0;for (Entrance e : entrances) {sum += e.getValue(); // 所有大門人數求和 }return sum; } } /*stopping entrance 1: 1693 stopping entrance 2: 1695 stopping entrance 4: 1695 total :8473 sum of entrances:8473 */【21.4.2】在阻塞時終結
1、線程狀態:
? ? 新建,當線程被創建時,他會短暫處于這個狀態, 此時線程已經分配了資源,并完成了初始化;?
? ? 就緒, 只要調度器把cpu時間片分配該這個線程,他就可以運行;?
? ? 阻塞:線程能夠運行, 但有個條件阻止他運行。線程可以由阻塞狀態重新進入就緒狀態;?
? ? 死亡:處于死亡或終止狀態的線程不在是可調度的,并且也不會得到 cpu時間片,他的任務已經結束;
2、進入阻塞狀態:進入阻塞狀態的原因:
? ? 2.1、調用 sleep 方法使線程進入休眠狀態;?
? ? 2.2、調用wait()方法使線程掛起, 直到線程得到了 notify或notifyAll方法調用(或 SE5中的java.util.concurrent類庫中的 signal或signalAll() 方法調用), 線程才會進入就緒狀態;?
? ? 2.3、任務在等待某個輸入輸出完成;?
? ? 2.4、任務試圖在某個對象上調用其同步控制方法, 但是對象鎖不可用,因為另一個任務已經獲取了這個鎖;?
【21.4.3】中斷
1、Thread類包含 interrupt方法,可以終止被阻塞的任務。這個方法將設置線程的中斷狀態。 如果一個線程被阻塞,或者視圖執行一個阻塞操作,那么設置這個線程的中斷狀態將拋出 InterruptedException異常。當拋出該異常或該任務調用了 Thread.interrupted() 方法時, 中斷狀態將被復位,設置為true;?
2、如果調用 Executor上調用 shutdownNow方法,那么它將發送一個 interrupte方法調用給他啟動的所有線程。
通過調用submit()而不是executor() 來啟動任務,就可以持有該任務的上下文。
【21.4.3】中斷
1、Thread類包含 interrupt方法,可以終止被阻塞的任務。這個方法將設置線程的中斷狀態。 如果一個線程被阻塞,或者視圖執行一個阻塞操作,那么設置這個線程的中斷狀態將拋出 InterruptedException異常。當拋出該異常或該任務調用了 Thread.interrupted() 方法時, 中斷狀態將被復位,設置為true;?
2、如果調用 Executor上調用 shutdownNow方法,那么它將發送一個 interrupte方法調用給他啟動的所有線程。
通過調用submit()而不是executor() 來啟動任務,就可以持有該任務的上下文。submit()方法將返回一個泛型Future<?>, 持有這個Future的關鍵在于,你可以調用該對象的cancle() 方法, 并因此可以使用他來中斷某個特定任務。如果把true傳給 cancel方法,他就會擁有在該線程上調用 interrupt方法以停止這個線程的權限。
/*** 中斷由線程池管理的某個線程 */ public class Interrupting {private static ExecutorService exec = Executors.newCachedThreadPool();// 線程池 static void test(Runnable r) throws InterruptedException {// 使用 ExecutorService.submit() 而不是 ExecutorService.execute()方法來啟動任務,就可以持有該任務的上下文 ,submit() 方法返回 Future 對象 // exec.execute(r); 不用 execute() 方法 Future<?> f = exec.submit(r);TimeUnit.MILLISECONDS.sleep(1000); // 睡眠1秒 System.out.println("interrupting " + r.getClass().getName()); // 正在中斷某個線程 // 調用Future.cancel() 方法來中斷某個特定任務 // 把true傳給cancel() 方法,該方法就擁有在該線程上調用interrupt() 方法以停止這個線程的權限 // cancel 是一種中斷由 Executor啟動的單個線程的方式 f.cancel(true);System.out.println("interrupt sent to " + r.getClass().getName()); // 中斷信號發送給線程 System.out.println("====================================== seperate line ==================================== ");}public static void main(String[] args) throws Exception {test(new SleepBlocked());test(new IOBlocked(System.in));test(new SynchronizedBlocked()); TimeUnit.SECONDS.sleep(3);System.out.println("aborting with System.exit(0)");System.exit(0);// 終止當前虛擬機進程,所以有部分打印信息無法沒有正常輸出} } // 睡眠式阻塞線程, 可中斷的阻塞 class SleepBlocked implements Runnable {@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(3);// 睡眠3秒} catch (InterruptedException e ) { // 捕獲中斷異常System.out.println("interrupted exception in SleepBlocked ");}System.out.println("exiting SleepBlocked.run()");} } // IO式阻塞線程 , 不可中斷的阻塞 class IOBlocked implements Runnable {private InputStream in;public IOBlocked(InputStream is) {in = is; }@Overridepublic void run() {try {System.out.println("waiting for read();");in.read(); // 等待輸入流輸入數據 } catch (IOException e) { // IO 異常 , 但執行結果沒有報 IO 異常 if (Thread.currentThread().isInterrupted()) {System.out.println("interrupted from blocked IO");} else {throw new RuntimeException();}}System.out.println("exiting IOBlocked.run()");} } // 線程同步式阻塞,不可中斷的阻塞 class SynchronizedBlocked implements Runnable {public synchronized void f() {while(true) {Thread.yield(); // 讓出cpu時間片 }}public SynchronizedBlocked() { // 構造器開啟一個線程 new Thread() { // 匿名線程調用f() 方法,獲取 SynchronizedBlocked 對象鎖,且不釋放;其他線程只能阻塞 public void run() {f();// f() 為同步方法 }}.start(); }@Override public void run() {System.out.println("trying to call 同步f()");f(); // 調用f() 同步方法 , 讓出cpu時間片 System.out.println("exiting SynchronizedBlocked.run()"); // 這里永遠不會執行 } } /*interrupting diy.chapter21.SleepBlocked interrupt sent to diy.chapter21.SleepBlocked ====================================== seperate line ==================================== interrupted exception in SleepBlocked exiting SleepBlocked.run() waiting for read(); interrupting diy.chapter21.IOBlocked interrupt sent to diy.chapter21.IOBlocked ====================================== seperate line ==================================== trying to call 同步f() interrupting diy.chapter21.SynchronizedBlocked interrupt sent to diy.chapter21.SynchronizedBlocked ====================================== seperate line ==================================== aborting with System.exit(0) */小結:
| 序號 | 阻塞方式 | 是否可以中斷 |
| 1 | sleep | 是 |
| 2 | IO | 否 |
| 3 | synchronized獲取鎖 | 否 |
所以,對于IO操作線程或synchronized操作的線程,其具有鎖住多線程程序的潛在危險。
如何解決呢? 關閉任務在其上發生阻塞的底層資源;
/*** 無法中斷線程,但可以關閉任務阻塞所依賴的資源。* 這里只能夠中斷 基于socket輸入流的io線程,因為socket輸入流可以關閉;* 但無法中斷基于系統輸入流的io線程,因為系統輸入流無法關閉;*/ public class CloseResource {public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool(); // 線程池 ServerSocket server = new ServerSocket(8080); // 服務端套接字 InputStream socketInput = new Socket("localhost", 8080).getInputStream();/* 啟動線程 */exec.execute(new IOBlocked(socketInput));exec.execute(new IOBlocked(System.in));TimeUnit.MILLISECONDS.sleep(1000); // 睡眠1秒 System.out.println("shutting down all threads");exec.shutdownNow(); // 發送一個interrupte() 信號給exec啟動的所有線程 TimeUnit.SECONDS.sleep(1); // 睡眠1秒 System.out.println("closing " + socketInput.getClass().getName());socketInput.close(); // 關閉io線程依賴的資源 TimeUnit.SECONDS.sleep(1);System.out.println("closing " + System.in.getClass().getName());System.in.close(); // 關閉io線程依賴的資源 } } /** waiting for read(); waiting for read(); shutting down all threads closing java.net.SocketInputStream interrupted from blocked IO exiting IOBlocked.run() closing java.io.BufferedInputStream */3、nio類提供了更人性化的IO中斷,被阻塞的nio通道會自動響應中斷;?
/*** page 698* nio中斷 */ public class NIOInterruption {public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool(); // 線程池 ServerSocket ss = new ServerSocket(8080); // 服務器套接字 // InetAddress:類的主要作用是封裝IP及DNS, // InetSocketAddress類主要作用是封裝端口 他是在在InetAddress基礎上加端口,但它是有構造器的。InetSocketAddress isa = new InetSocketAddress("localhost", 8080);SocketChannel sc1 = SocketChannel.open(isa); // 套接字通道 SocketChannel sc2 = SocketChannel.open(isa); // 套接字通道 // 使用 ExecutorService.submit() 而不是 ExecutorService.execute()方法來啟動任務,就可以持有該任務的上下文 ,submit() 方法返回 Future 對象 Future<?> f = exec.submit(new NIOBlocked(sc1));// 以submit方式啟動線程 exec.execute(new NIOBlocked(sc2)); // 以 execute方式啟動線程 exec.shutdown(); // 關閉所有線程 TimeUnit.SECONDS.sleep(1); // 睡眠1秒// 調用Future.cancel() 方法來中斷某個特定任務 // 把true傳給cancel() 方法,該方法就擁有在該線程上調用interrupt() 方法以停止這個線程的權限 // cancel 是一種中斷由 Executor啟動的單個線程的方式f.cancel(true); // sc2.close(); // } } // NIO 新io式阻塞 class NIOBlocked implements Runnable {private final SocketChannel sc;public NIOBlocked(SocketChannel sc) {this.sc = sc; }@Overridepublic void run() {try {System.out.println("waiting for read() in " + this);sc.read(ByteBuffer.allocate(1));} catch (ClosedByInterruptException e1) {System.out.println("ClosedByInterruptException, this = " + this);} catch (AsynchronousCloseException e2) {System.out.println("AsynchronousCloseException, this = " + this);} catch (IOException e3) {throw new RuntimeException(e3); }System.out.println("exiting NIOBlocked.run() " + this);} } /** waiting for read() in diy.chapter21.NIOBlocked@3856c761 waiting for read() in diy.chapter21.NIOBlocked@55de2e48 ClosedByInterruptException, this = diy.chapter21.NIOBlocked@55de2e48 exiting NIOBlocked.run() diy.chapter21.NIOBlocked@55de2e48 AsynchronousCloseException, this = diy.chapter21.NIOBlocked@3856c761 exiting NIOBlocked.run() diy.chapter21.NIOBlocked@3856c761 */4、被互斥所阻塞: 一個任務能夠調用在同一個對象中的其他的 synchronized 方法,而這個任務已經持有鎖了 ;?
/*** 被互斥所阻塞* 同步方法f1 和 f2 相互調用直到 count為0 * 一個任務應該能夠調用在同一個對象中的其他 synchronized 方法,因為這個任務已經獲取這個對象的鎖* 2020/04/16 */ public class MultiLock {public synchronized void f1(int count) { // 同步方法 f1 if(count-- > 0) {System.out.println("f1() calling f2() with count = " + count);f2(count); // 調用 f2 }}public synchronized void f2(int count) { // 同步方法f2 if(count-- > 0) {System.out.println("f2() calling f1() with count = " + count);f1(count); // 調用f1 }}public static void main(String[] args) {final MultiLock multiLock = new MultiLock();new Thread() {public void run() {multiLock.f1(5);}}.start(); } } /** f1() calling f2() with count = 4 f2() calling f1() with count = 3 f1() calling f2() with count = 2 f2() calling f1() with count = 1 f1() calling f2() with count = 0 */5、java se5 并發類庫中添加了一個特性,在 ReentrantLock? 可重入鎖上阻塞的任務具備可以被中斷的能力;?
/*** 可重入鎖的可中斷式加鎖 * page 700 */ public class Interrupting2 {public static void main(String[] args) throws Exception {Thread t = new Thread(new Blocked2());t.start();TimeUnit.SECONDS.sleep(1);System.out.println("issuing t.interrupt()"); // 2 t.interrupt(); // 中斷線程 } } /*** 阻塞互斥量 */ class BlockedMutex {private Lock lock = new ReentrantLock(); // 可重入鎖 public BlockedMutex() {lock.lock(); // 構造器即加鎖,且從不會釋放鎖 }public void f() {try {lock.lockInterruptibly(); // 可中斷式加鎖 System.out.println("lock acquired in f()");} catch(InterruptedException e) {System.out.println("interrupted from lock acquisition in f()"); // 3 可中斷阻塞,捕獲中斷異常 }} } class Blocked2 implements Runnable {BlockedMutex blocked = new BlockedMutex(); @Overridepublic void run() {System.out.println("waiting for f() in Blocked Mutex"); // 1 blocked.f();System.out.println("broken out of blocked call"); // 4 } } /*** waiting for f() in Blocked Mutex issuing t.interrupt() interrupted from lock acquisition in f() broken out of blocked call*/【21.4.4】檢查中斷
1、在線程上調用 interrupt方法去中斷線程執行時,能夠中斷線程的前提是: 任務要進入到阻塞操作中,已經在阻塞操作內部;否則,調用 interrupt方法是無法中斷線程的;需要通過其他方式;
其他方式是: 由中斷狀態來表示, 其狀態可以通過調用 interrupt 來設置。通過 Thread.interrupted() 來檢查中斷? 。
/*** 通過 Thread.interrupted() 來檢查中斷 * page 701*/ public class InterruptingIdiom {public static void main(String[] args) throws Exception {if(args.length != 1) {System.out.println("InterruptingIdiom-傻瓜式中斷");}Thread t = new Thread(new Blocked3()); t.start();TimeUnit.SECONDS.sleep(3); // 睡眠 t.interrupt(); // 中斷 } }class NeedsCleanup {private final int id;public NeedsCleanup(int id) {this.id = id;System.out.println("NeedsCleanup " + id);}public void cleanup() {System.out.println("clean up " +id);} } /*** 在run()方法中創建的 NeedsCleanup 資源都必須在其后面緊跟 try-finally 子句, * 以確保 清理資源方法被調用 */ class Blocked3 implements Runnable {private volatile double d = 0.0; @Overridepublic void run() {try {int index = 1;// interrupted方法來檢查中斷狀態 while(!Thread.interrupted()) { // 只要當前線程沒有中斷 System.out.println("========== 第 " + index++ + " 次循環 =========="); NeedsCleanup n1 = new NeedsCleanup(1);try {System.out.println("sleeping-睡眠一秒");TimeUnit.SECONDS.sleep(1);NeedsCleanup n2 = new NeedsCleanup(2);try {System.out.println("calculating-高強度計算");for (int i=1; i<250000; i++) {d = d + (Math.PI + Math.E) / d;}System.out.println("finished time-consuming operation 完成耗時操作."); } finally {n2.cleanup(); // 清理 }} finally{n1.cleanup(); // 清理 }}System.out.println("exiting via while() test-從while循環退出 "); // 從while循環退出 } catch (InterruptedException e) {System.out.println("exiting via InterruptedException-從中斷InterruptedException退出 "); // 從中斷退出 }}} /*** InterruptingIdiom-傻瓜式中斷 ========== 第 1 次循環 ========== NeedsCleanup 1 sleeping-睡眠一秒 NeedsCleanup 2 calculating-高強度計算 finished time-consuming operation 完成耗時操作. clean up 2 clean up 1 ========== 第 2 次循環 ========== NeedsCleanup 1 sleeping-睡眠一秒 NeedsCleanup 2 calculating-高強度計算 finished time-consuming operation 完成耗時操作. clean up 2 clean up 1 ========== 第 3 次循環 ========== NeedsCleanup 1 sleeping-睡眠一秒 clean up 1 exiting via InterruptedException-從中斷InterruptedException退出 */?
【21.5】線程間的協作
1、當任務協作時,關鍵問題是任務間的握手。握手可以通過 Object.wait() Object.notify() 方法來安全實現。當然了 java se5 的并發類庫還提供了具有 await() 和 signal() 方法的Condition對象;
【21.5.1】wait()方法與notifyAll() 方法?
1、wait() 方法會在等待外部世界產生變化的時候將任務掛起,并且只有在 nofity() 或notifyall() 發生時,即表示發生了某些感興趣的事務,這個任務才會被喚醒去檢查鎖產生的變化。wait()方法提供了一種在任務之間對活動同步的方式。
還有,調用wait() 方法將釋放鎖,意味著另一個任務可以獲得鎖,所以該對象上的其他synchronized方法可以在線程A wait期間,被其他線程調用;?
2、有兩種形式的 wait() 調用
形式1: wait方法接收毫秒數作為參數,在wait()期間對象鎖是釋放的;通過 notify() notifyAll() 方法,或者時間到期后,從 wait() 恢復執行;?
形式2:wait方法不接受任何參數,這種wait將無線等待下去,直到線程接收到 notify或 notifyAll方法;?
補充1:wait方法,notify方法, notifyAll方法,都是基類Object的一部分,因為這些方法操作的鎖也是對象的一部分,而所有對象都是OBject的子類;?
補充2:實際上,只能在同步控制方法或同步控制塊里調用 wait, notify, notifyAll方法(因為不操作鎖,所有sleep方法可以在非同步控制方法里調用)。如果在非同步方法中調用 wait, notify, notifyAll方法, 編譯可以通過,但運行就報 IllegalMonitorStateException 異常,異常意思是: 在調用wait, notify, notifyAll方法前,必須獲取對象的鎖;?
(干貨——只能在同步控制方法或同步控制塊里調用 wait, notify, notifyAll方法)?
?
【荔枝】涂蠟與拋光: 拋光任務在涂蠟完成之前,是不能執行其工作的;而涂蠟任務在涂另一層蠟之前,必須等待拋光任務完成;
拋光 WaxOn, WaxOff, 使用了wait和notifyAll方法來掛起和重啟這些任務;
補充:前面的實例強調必須用一個檢查感興趣的條件的while循環包圍wait方法。這很重要,因為:(為啥要用while包裹wait呢)
前面的示例強調必須用一個檢查感興趣的條件的while循環包圍wait()。這很重要,原因如下:
原因1:可能有多個任務出于相同的原因在等待同一個鎖,而第一個喚醒任務可能會改變這種狀況;如果屬于這種情況,那么任務應該被再次掛起,直到其感興趣的條件發生變化;
原因2:在本任務從其 wait()中被喚醒的時刻,有可能會有某個其他任務已經做出了改變,從而使得本任務在此時不能執行,或者執行其操作已顯得無關緊要;此時,應該再次執行wait()將其重新掛起;
(個人理解——比如有2個任務A,B都在等待資源R可用而阻塞,當R可用時,任務A和B均被喚醒,但任務A被喚醒后立即拿到了臨界資源或獲取了鎖,則任務B仍然需要再次阻塞,這就是while的作用)
原因3:有可能某些任務出于不同的原因在等待你的對象上的鎖(必須使用notifyAll喚醒);在這種情況下,需要檢查是否已經由正確的原因喚醒,如果不是,則再次調用wait方法;
用while 包圍wait方法的本質:檢查所有感興趣的條件,并在條件不滿足的情況下再次調用wait方法,讓任務再次阻塞;
?
3、錯失的信號:當兩個線程使用 notify/wait() 或 notifyAll()/ wait() 方法進行協作時,有可能會錯過某個信號;即 notify或 notifyAll發出的信號,帶有wait的線程無法感知到。
荔枝:
// T1: synchronized(sharedMonitor) {<setup condition for T2>sharedMonitor.notify() // 喚醒所有等待線程 } // T2: while(someCondition) {// point 1 synchronized(sharedMonitor) {sharedMonitor.wait(); // 當前線程阻塞 } }當T2 還沒有調用 wait方法時,T1就發送了notify信號; 這個時候T2線程肯定接收不到這個信號;T1發送信號notify后,T2才調用wait方法,這時,T2將永久阻塞下去;因為他錯過了T1的notify信號;
T2正確的寫法如下:
// T2正確的寫法如下: synchronized(sharedMonitor) {while(someCondition) {sharedMonitor.wait(); // 當前線程阻塞 } }如果T1先執行后釋放鎖;此時T2獲取鎖且檢測到 someCondition已經發生了變化,T2不會調用wait() 方法;?
如果T2先執行且調用了wait()方法, 釋放了鎖; 這時T1后執行,然后調用notify()喚醒阻塞線程, 這時T2可以收到T1的 notify信號,從而被喚醒, 由T1修改了 someCondition的條件, 所以T2 不會進入while循環;?
?
【21.5.2】notify與notifyAll方法
1、notify()方法:在使用 notify方法時,在眾多等待同一個鎖的任務中只有一個會被喚醒,如果你希望使用notify,就必須保證被喚醒的是恰當的任務。
2、notifyAll將喚醒所有正在等待的任務。這是否意味著在任何地方,任何處于wait狀態中的任務都將被任何對notifyAll的調用喚醒呢。事實上,當notifyAll因某個特定鎖而被調用時,只有等待這個鎖的任務才會被喚醒;?
補充:
// 阻塞器? class Blocker {synchronized void waitingCall() {try {while(!Thread.interrupted()) {wait(); // 期初所有線程均阻塞,等待 notify 或 notifyAll 來喚醒?System.out.println(Thread.currentThread() + " ");}} catch (InterruptedException e ) {}}synchronized void prod() {notify();// 喚醒單個阻塞線程?}synchronized void prodAll() {notifyAll(); // 喚醒所有阻塞線程?} }Blocker.waitingCall 方法中的while循環, 有兩種方式可以離開這個循環:
方式1:發生異常而離開;
方式2:通過檢查 interrupted標志離開;
【21.5.3】生產者與消費者
1、對于一個飯店,有一個廚師和服務員。服務員必須等待廚師準備好膳食。當廚師準備好時,他會通知服務員,之后服務員上菜,然后返回繼續等待下一次上菜。
這是一個任務協作的荔枝,廚師代表生產者,服務員代表消費者。兩個任務必須在膳食被生產和消費時進行握手,而系統必須以有序的方式關閉。
2、代碼解說, Restraurant是 WaitPerson和Chef的焦點,作為連接兩者的橋梁。他們知道在為哪個Restraurant工作,因為他們必須和這家飯店打交道,以便放置或拿取膳食。
2.1、(干貨)再次提問:如果在等待一個訂單,一旦你被喚醒,這個訂單就必定是可以獲得的嗎?
答案不是的。因為在并發應用中,某個其他的任務可能會在WaitPerson被喚醒時,會突然插足并拿走訂單,唯一安全的方式是使用下面這種慣用的wait() 方法,來保證在退出等待循環前,條件將得到滿足。如果條件不滿足,還可以確保你可以重返等待狀態。
while(conditionIsNotMet) {
?? ?wait();
}
2.2、shutdownNow()將向所有由 ?ExecutorService啟動的任務發送 interrupt信號。但是在Chef中,任務并沒有在獲得該interrupt信號后立即關閉,因為當任務試圖進入一個可中斷阻塞操作時, 這個中斷只能拋出 InterruptException。然后當 Chef 試圖調用sleep()時,拋出了 InterruptedException。如果移除對sleep()的調用,那么這個任務將回到run()循環的頂部,并由于Thread.interrupted()?測試而退出,同時并不拋出異常。
3、使用顯式的Lock和 Condition 對象 ?
使用互斥并允許任務掛起的基本類是 Condition,調用Condition的await() 可以掛起一個任務;調用signal() 可以喚醒一個任務;調用signalAll() 可以喚醒所有在這個Condition上被其自身掛起的任務。
(干貨——與notifyAll()相比,signalAll()方法是更安全的方式)
代碼解說:每個對lock()的調用都必須緊跟一個try-finally子句,用來保證在所有情況下都可以釋放鎖。在使用內建版本時,任務在可以調用 await(), signal(), signalAll() 方法前,必須擁有這個鎖。
(干貨——不推薦使用Lock和Condition對象來控制并發)使用Lock和Condition對象來控制并發比較復雜,只有在更加困難的多線程問題中才使用他們;
【21.5.4】生產者與消費者隊列
1、wait()和notifyAll() 是一種低級的方式來解決任務協作問題;也可以使用同步隊列這種高級方式來解決,同步隊列在任何時刻都只允許一個任務插入或移除元素。
2、同步隊列 BlockingQueue,兩個實現,LinkedBlockingQueue,無界隊列, ArrayBlockingQueue-固定尺寸,放置有限數量的元素;
3、若消費者任務試圖從隊列中獲取元素,而該隊列為空時,隊列可以掛起消費者任務讓其阻塞;并且當有更多元素可用時,隊列可以喚醒消費者任務。
阻塞隊列可以解決非常多的問題,且比 wait()與notifyAll()簡單得多。
【看個荔枝】
【吐司BlockingQueue】
1、一臺機器有3個任務: 一個制作吐司,一個給吐司抹黃油,另一個在抹過黃油的吐司上涂果醬;
【21.5.5】任務間使用管道進行輸入輸出
1、通過輸入輸出在線程間通信很常用。提供線程功能的類庫以管道的形式對線程間的輸入輸出提供了支持,分別是PipedWriter和PipedReader類,分別允許任務向管道寫和允許不同任務從同一個管道讀取。
這種模式可以看做是 生產者-消費者問題的變體,管道就是一個封裝好了的解決方案。管道可以看做是一個阻塞隊列,存在于多個引入 BlockingQueue之間的java版本中。?
代碼解說1: 當Receiver調用read() 方法時,如果沒有更多的數據,管道將自動阻塞;
補充1:注意sender和receiver是在main()中啟動的,即對象構造徹底完成以后。如果你啟動一個沒有構造完成的對象,在不同的平臺上管道可能會產生不一致的行為。(BlockingQueue使用起來更加健壯且容易)(干貨)
補充2:在shudownNow() 被調用時,PipedReader與普通IO之間的區別是:PipiedReader是可以中斷的。 如果將 in.read() 修改為System.in.read(), 那么interrupt調用將不能打斷read()調用。?(干貨)
【21.6】死鎖
1、作為哲學家,他們很窮,只能買5根筷子(通俗講,筷子數量與哲學家數量相同);他們坐在桌子周圍,每人之間放一根筷子,當一個哲學家要就餐的時候,這個哲學家必須同時擁有左邊和右邊的筷子。
如果一個哲學家左邊或右表已經有人在使用筷子了,那么這個哲學家就必須等待,直至可以得到必須的筷子。
【代碼-Chopstick】
【代碼-Philosopher】
/*** 哲學家* page 718 */ public class Philosopher implements Runnable {private Chopstick left; // 左筷private Chopstick right; // 右筷 private int id = 1; // 編號 private int ponderFactor = 0; // 思考因素 private Random rand = new Random(47); // 隨機數發生器 private void pause() throws InterruptedException { // 暫停 if (ponderFactor == 0) return ; TimeUnit.MILLISECONDS.sleep(rand.nextInt(ponderFactor * 250)); // 睡眠 }public Philosopher(Chopstick left, Chopstick right, int ident, int ponder) {this.left = left; this.right = right; this.id = ident; this.ponderFactor = ponder; } @Overridepublic void run() {try {while (!Thread.interrupted()) {System.out.println(this + " " + "thinking"); // 思考 pause(); // 暫停 System.out.println(this + " " + "grabbing right"); // 拿走右邊筷子 right.take(); System.out.println(this + " " + "grabbing left"); // 拿走左邊筷子 left.take();System.out.println(this + " eating"); // 吃飯 pause(); // 暫停 right.drop(); //放下右邊筷子left.drop(); //放下左邊筷子 }} catch (InterruptedException e) {System.out.println(this + " exiting via interrupt. " ); }}@Overridepublic String toString() {return "Philosopher-哲學家" + id; } }【代碼-DeadLockDiningPhilosophers】
/*** 發生死鎖的哲學家晚餐* page720 */ public class DeadLockDiningPhilosophers {public static void main(String[] args) throws Exception {int ponder = 0; // 把 ponder-思考時間 調整為0,發生死鎖 int size = 5; ExecutorService exec = Executors.newCachedThreadPool(); // 線程池 Chopstick[] sticks = new Chopstick[size]; // 筷子數組 for (int i = 0; i < sticks.length; i++) {sticks[i] = new Chopstick(); // 初始化數組 }for (int i = 0; i < sticks.length; i++) {exec.execute(new Philosopher(sticks[i], sticks[(i+1)%size], i+1, ponder)); // 執行哲學家任務 }System.out.println("press enter to quit");System.in.read(); exec.shutdownNow(); } } /** 死鎖發生了 Philosopher-哲學家2 thinking Philosopher-哲學家4 thinking press enter to quit Philosopher-哲學家1 thinking Philosopher-哲學家3 thinking Philosopher-哲學家3 grabbing right Philosopher-哲學家1 grabbing right Philosopher-哲學家5 thinking Philosopher-哲學家4 grabbing right Philosopher-哲學家4 grabbing left Philosopher-哲學家2 grabbing right Philosopher-哲學家5 grabbing right Philosopher-哲學家1 grabbing left Philosopher-哲學家3 grabbing left Philosopher-哲學家5 grabbing left Philosopher-哲學家2 grabbing left */代碼解說:如果philosopher花費更多的時間思考而不是進餐(ponder值越大,思考時間越長),那么他們請求共享資源的可能性就會小許多,這樣你就會確信該程序不會死鎖,盡管他們并非如此。
2、死鎖發生條件: 當以下4個條件同事滿足時,死鎖發生;(干貨——死鎖發生的4個條件,同時滿足)
條件1:互斥條件。 任務使用的資源至少有一個是不能共享的;這里,一根筷子一次就只能被一個哲學家使用;
條件2:有任務請求被其他任務占用的共享資源。至少有一個任務,它必須持有一個資源且正在等待獲取一個當前被別的任務持有的資源;即,要發生死鎖,哲學家必須拿著一根筷子,且等待另一根;
條件3:資源不能被任務搶占。任務必須把資源釋放當做普通事件;即哲學家不會從其他哲學家那里搶筷子;
條件4:必須有循環等待。一個任務等待其他任務所持有的資源,后者又在等待另一個任務所持有的資源,這樣一直下去,直到有一個任務在等待第一個任務所持有的資源,使得大家都被鎖住。;
在 DeadLockDiningPhilosophers 程序中,每個哲學家都試圖先得到右邊的筷子,然后得到左邊的筷子,所以發生了循環等待;
3、要防止死鎖,只需要破壞其中一個條件即可。最容易的方法是破壞第4個條件。
因為每個哲學家都先拿右邊筷子,后拿左邊筷子。 如果最后一個哲學家先拿左邊筷子,后拿右邊筷子,那么這個哲學家將永遠不會阻止其右邊的哲學家拿起筷子。即破壞了第4個條件。
【21.7】新類庫中的構件
【21.7.1】 CountDownLatch
1、作用:被用來同步一個或多個任務,強制他們等待由其他任務執行的一組操作完成;
2、向CountDownLatch 對象設置一個初始值,任何在這個對象上調用wait() 方法都將阻塞,直到這個計數值到達0;調用 countDown()來減小這個計數值;
3、CountDownLatch 只能被初始化一次或觸發一次,計數值不能被重置。如果需要重置,使用 CyclicBarrier;
4、典型用法:將一個程序分為n個相互獨立的可解決任務,并創建值為0的 CountDownLatch;當每個任務完成時,都會在這個鎖上調用 await方法,將自己攔住,直到鎖存器計數為0結束;
【代碼-CountDownLatchDemo】
【21.7.2】CyclicBarrier
1、定義:可以將鎖存器計數重置;
2、應用場景:創建一組任務,并行地執行工作,然后再進行下一個步驟之前等待,直到所有任務都完成。它使得所有的并行任務都講在柵欄處列隊等待,可以一致向前移動;
3、區別: CountDownLatch 只能使用一次,而 CyclicBarrier 可以循環重復使用;
【21.7.3】 DelayQueue
1、定義:無界的阻塞隊列 BlockingQueue,用于放置實現了 Delayed接口的對象,其中的對象只能在其到期時才能從隊列中取走;
2、這種隊列是有序的,即隊頭對象的延遲到期的時間最長。如果沒有任何延遲到期,那么就不會有任何頭元素,并且poll() 方法將返回null;所以,不能將null放置到該隊列中;
【代碼-DelayQueueDemo】
代碼解說: 上述控制臺輸出信息為:
[555 ] task 1
[961 ] task 4
[1693] task 2
[1861] task 3
[4258] task 0
task.summary() (0: 4258)
task.summary() (1: 555)
task.summary() (2: 1693)
task.summary() (3: 1861)
task.summary() (4: 961)
task.summary() (5: 5000)
小結1:其中 555 是最小的延遲時間,即 DelayedTaskConsumer 將最緊急的任務從隊列中取出,然后運行它;
小結2:在 DelayedQueue中, 任務創建順序與執行沒有關系,任務是按照所期望的延遲順序來執行的;
如上, task1 最先執行,但其是第2個創建的任務task.summary 是從 sequence取值的,sequence記錄了創建順序;
【21.7.4】 PriorityBlockingQueue 優先級阻塞隊列
1、定義:其執行順序是按照優先級順序來執行的;
【代碼-PriorityBlockQueueDemo】
代碼解說:
toString() = 線程優先級[9? ] task-線程編號- 5
toString() = 線程優先級[8? ] task-線程編號- 0
toString() = 線程優先級[8? ] task-線程編號- 6
toString() = 線程優先級[7? ] task-線程編號- 9
toString() = 線程優先級[5? ] task-線程編號- 1
toString() = 線程優先級[3? ] task-線程編號- 2
toString() = 線程優先級[2? ] task-線程編號- 8
toString() = 線程優先級[1? ] task-線程編號- 4
toString() = 線程優先級[1? ] task-線程編號- 3
toString() = 線程優先級[0? ] task-線程編號- 7
根據輸出信息,可以看出,優先級高的線程先執行,其執行順序與線程創建順序無關;
【21.7.5】使用 ScheduledExecutor的溫室控制器 (干貨——ScheduledExecutor計劃調度器可用于系統后臺的周期性或定時跑批,如每日凌晨跑批,采用cron 表達式)
1、背景:每個期望的溫室事件都是一個在預定時間運行的任務。ScheduledThreadPoolExecutor 提供了解決該問題的服務。
2、如何解決:通過使用schedule() 方法運行一次任務或者 scheduleAtFixedRate()(每隔規則的時間重復執行任務),你可以將Runnable對象設置為在將來的某個時刻執行。
【代碼-GreenHouseScheduler】
代碼解說:volatile和 synchronized 都得到了應用,以防止任務之間的相互干涉。在持有 DataPoint的List中的所有方法都是 synchronized,這是因為在List被創建時,使用了 Collections工具 synchronizedList();
【21.7.6】Semaphore
1、正常的鎖(current.locks或 synchronized鎖)在任何時刻都只允許一個任務訪問一項資源,而計數信號量允許n個任務同時訪問這個資源;你還可以將信號量看做是在向外分發使用資源的許可證,盡管實際上沒有使用任何許可證對象。
2、看個荔枝:線程池。 管理著數量有限的對象,當要使用對象時可以簽出他們,而在用戶使用完成時,可以將它們簽回;
【代碼-Pool】
?
/*** 信號量演示(非常重要的荔枝) * page 734 * (* 一旦池中的所有對象被簽出,semaphore 將不允許執行任何簽出操作; * blocked的run()方法因此會被阻塞, 2秒鐘后,cancel()方法會被調用, 以此來掙脫Future的束縛* )*/ public class SemaphoreDemo {final static int size = 5; public static void main(String[] args) throws InterruptedException {final Pool<Fat> pool = new Pool<>(Fat.class, size); // 對象池,通過信號量來管理 ExecutorService exec = Executors.newCachedThreadPool(); // 線程池 for (int i = 0; i < size; i++) {exec.execute(new CheckOutTask<Fat>(pool)); // 運行簽出任務 }System.out.println("all checkout tasks created");List<Fat> list = new ArrayList<>();for (int i = 0; i < size; i++) {Fat f = pool.checkOut(); // 簽出對象 System.out.println(i + " : main() thread check out");f.operation(); // "fat id: " + id list.add(f);}Future<?> blocked = exec.submit(new Runnable() {@Overridepublic void run() {try {pool.checkOut();// 開啟單個線程,簽出對象 } catch (InterruptedException e) {System.out.println("checkout() interrupted. "); }}});TimeUnit.SECONDS.sleep(2); //睡眠2秒 blocked.cancel(true); // 嘗試取消執行此任務 for (Fat f : list) {pool.checkIn(f);}for (Fat f : list) { // 冗余的簽入將被pool 忽略 pool.checkIn(f);} exec.shutdown(); // 關閉線程池 } } // 創建一個任務,先簽出Fat對象,持有一段時間后,再簽入,以此來測試Pool這個類 class CheckOutTask<T> implements Runnable { // 簽出任務 private static int counter = 0;private final int id = counter++;private Pool<T> pool;public CheckOutTask(Pool<T> pool) {this.pool = pool; }@Overridepublic void run() {try {T item = pool.checkOut(); // 簽出對象,獲取信號量許可證System.out.println(this + " checked out " + item);System.out.println(this + " checking in " + item);pool.checkIn(item); // 簽入對象,釋放許可證歸還給信號量 } catch (InterruptedException e) {System.out.println("CheckOutTask interrupted");}}@Overridepublic String toString() {return "checkout task " + id + " "; } } /*checkout task 1 checked out fat id: 1 checkout task 4 checked out fat id: 4 checkout task 4 checking in fat id: 4 all checkout tasks created checkout task 3 checked out fat id: 3 checkout task 3 checking in fat id: 3 checkout task 0 checked out fat id: 0 checkout task 0 checking in fat id: 0 checkout task 2 checked out fat id: 2 checkout task 2 checking in fat id: 2 0 : main() thread check out fat id: 4 1 : main() thread check out checkout task 1 checking in fat id: 1 fat id: 0 2 : main() thread check out fat id: 1 3 : main() thread check out fat id: 2 4 : main() thread check out fat id: 3 checkout() interrupted. */【21.7.7】 Exchanger 交換器
1、定義:Exchanger 是在兩個任務之間交換對象的柵欄。當這些任務進入柵欄時,各自擁有一個對象,當它們離開時,它們都擁有之前由對象持有的對象;
2、應用場景:一個任務在創建對象,這些對象的生產代價很高昂;而另一個任務在消費這些對象。通過這種方式,可以有更多的對象在被創建的同時被消費;
代碼解說:
在main方法中,創建了用于兩個任務的單一的Exchanger 交換器,以及兩個用于互換的 CopyOnWriteArrayList。這個特定的list變體允許在列表被遍歷時調用remove()方法,而不拋出異常 ModificationExcetpion。ExchangeProduer 填充這個list, ExchangerConsumer消費這個list,然后將這個滿列表交換為 ExchangerConsumer傳遞給它的空列表。因為有了 Exchanger,填充一個列表和消費另一個列表可以同時發生了。
(干貨——引入了CopyOnWriteArrayList,允許在列表被遍歷時調用remove()方法,而不拋出異常 ModificationExcetpion)
【21.8】仿真
【21.9】性能調優
1、比較 synchronize 與 Lock的性能
/*** page 748* 啟動單個任務比較 synchronized關鍵字和 Lock和Atomic類的區別 */ public class SimpleMicroBenchmark {static long test(Incrementable incr) {long start = System.nanoTime();for (long i=0; i < 100000000L; i++) {incr.increment();}return System.nanoTime() - start; }public static void main(String[] args) {long synchTime = test(new SynchronizingTest());long lockTime = test(new LockingTest());System.out.printf("synchronized 花費多少納秒: %1$10d \n", synchTime); // 203974196 System.out.printf("lock 花費多少納秒: %1$10d \n", lockTime); // 164559713 System.out.printf("lock/synchronized: %1$.3f\n", lockTime/(double)synchTime); // 0.807 } } abstract class Incrementable {protected long counter = 0;public abstract void increment(); } // synchronized 關鍵字性能測試 class SynchronizingTest extends Incrementable {public synchronized void increment() {++counter;} } // lock鎖性能測試 class LockingTest extends Incrementable {private Lock lock = new ReentrantLock();public void increment() {lock.lock();try {++counter;} finally {lock.unlock();}} } /* synchronized 花費多少納秒: 2166063742 lock 花費多少納秒: 1725775548 lock/synchronized: 0.797 */以上代碼是單線程,不具體代表性;我們需要構建復雜程序,或多個任務來測試;
【代碼-SynchronizationComparison】
/*** page 749 * 啟動多個任務測試 synchronize, lock , Atomic類性能 (經過驗證,Atomic原子類同步性能最佳 )*/ public class SynchronizationComparison{static BaseLine baseLine = new BaseLine(); // 基線 static SynchronizedTest synch = new SynchronizedTest(); // synchronize測試 static LockTest lock = new LockTest(); // ReentrantLock 可重入鎖測試 static AtomicTest atomic = new AtomicTest(); // 原子類測試 static void test() { System.out.println("=============================="); System.out.printf("%-12s : %13d\n", "Cycles", Accumulator.cycles);// 運行任務測試總時長baseLine.timedTest(); // 基本測試,無任何同步方法 synch.timedTest(); // synchronize同步 lock.timedTest(); // lock 同步 atomic.timedTest(); // Atomic 類同步 // 比較兩種模擬器性能 Accumulator.report(synch, baseLine); // 1.65 Accumulator.report(lock, baseLine); // 2.31 Accumulator.report(atomic, baseLine); // 0.91 Accumulator.report(synch, lock); // 0.71 Accumulator.report(synch, atomic); // 1.82 Accumulator.report(lock, atomic); // 2.54 }public static void main(String[] args) {int iteration = 5; System.out.println("warm up");baseLine.timedTest();for (int i = 0; i < iteration; i++) {test();Accumulator.cycles *= 2; }Accumulator.exec.shutdown(); } } abstract class Accumulator { // 模擬器 public static long cycles = 50000L; // 循環次數 private static final int N = 4; public static ExecutorService exec = Executors.newFixedThreadPool(N*2); // 線程池 // CyclicBarrier也叫同步屏障,在JDK1.5被引入,可以讓一組線程達到一個屏障時被阻塞 // ,直到最后一個線程達到屏障時,所以被阻塞的線程才能繼續執行。 private static CyclicBarrier barrier = new CyclicBarrier(N*2+1); // 同步屏障 protected volatile int index = 0;protected volatile long value = 0;protected long duration = 0; // 持續時間 protected String id = "error";protected final static int SIZE = 100000 ;protected static int[] preLoaded = new int[SIZE]; static {Random rand = new Random(47); // 隨機數發生器 for (int i=0; i< SIZE; i++) {preLoaded[i] = rand.nextInt(); // 預加載}}public abstract void accumulate(); // 模擬方法,抽象,由子類實現 public abstract long read(); // 讀取方法 private class Modifier implements Runnable { // 修改器 @Overridepublic void run() { // 模板方法模式,由子類提供實現 for(long i=0; i<cycles; i++) {accumulate(); // 調用模擬方法 }try {barrier.await(); // 屏障阻塞,直到給定數量的線程都等待為止 } catch (Exception e) {throw new RuntimeException(e);}}}private class Reader implements Runnable { // 讀取器 private volatile long value; @Overridepublic void run() {for(long i=0; i<cycles; i++) {value = read(); }try {barrier.await(); // 屏障阻塞,直到給定數量的線程都等待為止 } catch (Exception e) {throw new RuntimeException(e);}}}public void timedTest() { // 時間測試 long start = System.nanoTime(); for (int i=0; i<N; i++) {exec.execute(new Modifier()); // 執行修改器 exec.execute(new Reader()); // 執行讀取器 }try {// 程序中必須有一個 CyclicBarrier, 因為需要確保所有任務在聲明每個測試完成之前都已經完成 barrier.await(); // 屏障阻塞,直到給定數量的線程都等待為止 } catch (Exception e) {throw new RuntimeException(e);}duration = System.nanoTime() - start; // 總時長 System.out.printf("%-13s:%13d\n", id, duration);}public static void report(Accumulator acc1, Accumulator acc2) { // 報告, System.out.printf("%-22s: %.2f\n", acc1.id + "/" + acc2.id, acc1.duration/(double)acc2.duration); } } class BaseLine extends Accumulator { // 基本測試,無任何同步方法 {id = "baseline"; }@Override public void accumulate() {if (index >= SIZE-1) index = 0;value += preLoaded[(index++)%SIZE];}@Overridepublic long read() {return value;} } class SynchronizedTest extends Accumulator { // synchronize同步 測試 {id = "Synchronized"; }@Overridepublic synchronized void accumulate() {if (index >= SIZE-1) index = 0;value += preLoaded[index++];}@Overridepublic long read() {return value;} } class LockTest extends Accumulator { // ReentrantLock可重入鎖同步 測試 {id = "lock";}private Lock lock = new ReentrantLock(); @Overridepublic void accumulate() {lock.lock();try {if (index >= SIZE-1) index = 0;value += preLoaded[index++];} finally {lock.unlock();}}@Overridepublic long read() {lock.lock();try {return value; } finally {lock.unlock(); }} } class AtomicTest extends Accumulator { // Atomic原子類同步 測試 {id = "atomic";}private AtomicInteger index = new AtomicInteger(0);private AtomicLong value = new AtomicLong(0);@Overridepublic void accumulate() {int i = index.getAndIncrement();value.getAndAdd(preLoaded[i%SIZE]);if (++i >= SIZE-1) {index.set(0);}}@Overridepublic long read() {return value.get();} } /* warm up baseline : 12811667 ============================== Cycles : 50000 baseline : 10401913 Synchronized : 18486698 lock : 26550332 atomic : 8931189 Synchronized/baseline : 1.78 lock/baseline : 2.55 atomic/baseline : 0.86 Synchronized/lock : 0.70 Synchronized/atomic : 2.07 lock/atomic : 2.97 ============================== Cycles : 100000 baseline : 18458982 Synchronized : 28172394 lock : 36321361 atomic : 14323233 Synchronized/baseline : 1.53 lock/baseline : 1.97 atomic/baseline : 0.78 Synchronized/lock : 0.78 Synchronized/atomic : 1.97 lock/atomic : 2.54 ============================== Cycles : 200000 baseline : 36408153 Synchronized : 50424697 lock : 71790482 atomic : 28702992 Synchronized/baseline : 1.38 lock/baseline : 1.97 atomic/baseline : 0.79 Synchronized/lock : 0.70 Synchronized/atomic : 1.76 lock/atomic : 2.50 ============================== Cycles : 400000 baseline : 68541253 Synchronized : 103632938 lock : 144097706 atomic : 53405164 Synchronized/baseline : 1.51 lock/baseline : 2.10 atomic/baseline : 0.78 Synchronized/lock : 0.72 Synchronized/atomic : 1.94 lock/atomic : 2.70 ============================== Cycles : 800000 baseline : 137235667 Synchronized : 180808536 lock : 283742763 atomic : 108986327 Synchronized/baseline : 1.32 lock/baseline : 2.07 atomic/baseline : 0.79 Synchronized/lock : 0.64 Synchronized/atomic : 1.66 lock/atomic : 2.60 */代碼解說:程序中有一個CyclicBarrier 循環屏障,因為我們希望確保所有的任務在聲明每個測試完成之前都已經完成了;
【互斥技術總結】
1、Atomic:如果涉及多個Atomic對象,你就有可能會被強制要求放棄這種用法;因為Atomic對象只有在非常簡單的情況下才有用,這些情況通常包括你只有一個要被修改的Atomic對象,并且這個對象獨立于其他所有的對象。更安全的做法:只有在性能方面的需求能夠明確指示時,再替換為 Atomic,否則還是推薦使用 synchronized; (干貨——Atomic類的使用場景)
2、推薦使用 synchronize進行并發控制:因為 synchronize關鍵字所產生的代碼,與Lock所需的 加鎖-try-finally-解鎖慣用方法鎖產生的代碼相比,可讀性提高了很多;所以推薦使用 synchronize。就如我在本書其他地方提到的,代碼被閱讀次數遠多于被編寫的次數。在編程時,與其他人交流相對于與計算機交流而言,要重要得多,因此代碼的可讀性至關重要。因此,從 synchronized 入手,只有在性能調優時才替換為 Lock對象這種做法,具有實際意義的。(干貨——推薦使用 synchronize進行并發控制)
【21.9.2】免鎖容器
1、CopyOnWriteArrayList:寫入將導致創建整個底層數組的副本,而源數組將保留在原地,使得復制的數組在被修改時,讀取操作可以安全執行; CopyOnWriteArrayList好處是當多個迭代器同時遍歷和修改這個列表時,不會拋出 ConcurrentModificationException;CopyOnWriteArraySet 使用了CopyOnWriteArrayList 來實現其免鎖行為;
2、ConcurrentHashMap 與ConcurrentLinkedQueue 使用了類似的技術,允許并發的讀取和寫入,但是容器中只有部分內容而不是整個容器可以被復制和修改。然后,任何修改在完成之前,讀取者仍舊不能看到他們。ConcurrentHashMap 不會拋出 ConcurrentModificationException異常。
3、樂觀鎖: 只要你主要是從免鎖容器中讀取,那么它就會比 synchronized 快很多,因為獲取和釋放鎖的開銷省掉了;
4、比較并發控制的list容器
(干貨——測試并發編程下的list性能: CopyOnWriteArrayList性能 優于? SynchronizedList)
【代碼——Tester】
【代碼——ListComparisons——比較列表】
/*** 測試并發編程下的list性能: CopyOnWriteArrayList性能 優于 SynchronizedList* page 758 */ public class ListComparisons {public static void main(String[] args) {Tester.initMain(null);new SynchronizedArrayListTest(10, 0);new SynchronizedArrayListTest(9, 1);new SynchronizedArrayListTest(5, 5);new CopyOnWriteArrayListTest(10, 0);new CopyOnWriteArrayListTest(9, 1);new CopyOnWriteArrayListTest(5, 5);Tester.exec.shutdown();} } /** List測試類 */ abstract class ListTest extends Tester<List<Integer>> {ListTest(String testId, int nReaders, int nWriters) {super(testId, nReaders, nWriters);}class Reader extends TestTask { // 讀取任務 long result = 0;void test() {for (int i = 0; i < testCycles; i++) {for (int j = 0; j < containerSize; j++) {result += testContainer.get(j);}}}void putResults() {readResult += result;readTime += duration; }}class Writer extends TestTask { // 寫入任務 @Overridevoid test() {for (int i = 0; i < testCycles; i++) {for (int j = 0; j < containerSize; j++) {testContainer.set(i, writeData[j]);}}}@Overridevoid putResults() {writeTime += duration; }}/** 運行讀取任務和寫入任務 */void startReadersAndWriters() {for (int i = 0; i < nReaders; i++) {exec.execute(new Reader());}for (int i = 0; i < nWriters; i++) {exec.execute(new Writer());}} } /** 同步list-SynchronizedList*/ class SynchronizedArrayListTest extends ListTest {List<Integer> containerInitializer() {return Collections.synchronizedList(new ArrayList<Integer>(new CountingIntegerList(containerSize)));}SynchronizedArrayListTest(int nreaders, int nwriters) {super("synched arraylist", nreaders, nwriters);} } /** 同步list-CopyOnWriteArrayList*/ class CopyOnWriteArrayListTest extends ListTest {@OverrideList<Integer> containerInitializer() {/*CopyOnWriteArrayList好處是當多個迭代器同時遍歷和修改這個列表時* ,不會拋出 ConcurrentModificationException;*/ return new CopyOnWriteArrayList<Integer>(new CountingIntegerList(containerSize));} CopyOnWriteArrayListTest(int nreaders, int nwriters) {super("CopyOnWriteArrayListTest", nreaders, nwriters);} } /* type readTime write Time t = null t = null t = null synched arraylist , 10 reader thread, 0 writer thread 86062 0 synched arraylist , 10 reader thread, 0 writer thread 140764 0 synched arraylist , 10 reader thread, 0 writer thread 535339 0 t = null t = null t = null synched arraylist , 9 reader thread, 1 writer thread 238497 20422 readTime + writeTime = 258919 synched arraylist , 9 reader thread, 1 writer thread 188900 4376 readTime + writeTime = 193276 synched arraylist , 9 reader thread, 1 writer thread 192182 3647 readTime + writeTime = 195829 t = null t = null t = null synched arraylist , 5 reader thread, 5 writer thread 86791 74393 readTime + writeTime = 161184 synched arraylist , 5 reader thread, 5 writer thread 605721 540446 readTime + writeTime = 1146167 synched arraylist , 5 reader thread, 5 writer thread 39385 76216 readTime + writeTime = 115601 t = null t = null t = null CopyOnWriteArrayListTest , 10 reader thread, 0 writer thread 63456 0 CopyOnWriteArrayListTest , 10 reader thread, 0 writer thread 48866 0 CopyOnWriteArrayListTest , 10 reader thread, 0 writer thread 44126 0 t = null t = null t = null CopyOnWriteArrayListTest , 9 reader thread, 1 writer thread 30997 35738 readTime + writeTime = 66735 CopyOnWriteArrayListTest , 9 reader thread, 1 writer thread 71475 21516 readTime + writeTime = 92991 CopyOnWriteArrayListTest , 9 reader thread, 1 writer thread 24434 21151 readTime + writeTime = 45585 t = null t = null t = null CopyOnWriteArrayListTest , 5 reader thread, 5 writer thread 19692 680843 readTime + writeTime = 700535 CopyOnWriteArrayListTest , 5 reader thread, 5 writer thread 48503 622496 readTime + writeTime = 670999 CopyOnWriteArrayListTest , 5 reader thread, 5 writer thread 14587 756695 readTime + writeTime = 771282*/代碼解說: synchronized ArrayList 無論讀者和寫入者的數量是多少,都具有大致相同的性能——讀取者與其他讀取者競爭鎖的方式與寫入者相同。但 CopyOnWriteArrayList 在沒有寫入者時,速度會快很多。通過測試,CopyOnWriteArrayList 性能優于 synchronized list,對列表寫入的影響并沒有超過短期同步整個列表的影響。
5、比較并發控制的map容器性能
(干貨——測試并發編程下的Map性能: CurrentHashMap性能優于 synchronizedHashMap)
【21.9.3】樂觀加鎖
1、原理:在執行某項計算時,實際上沒有使用互斥,但在計算完成時,準備更新時,需要使用 compareAndSet的方法。你把舊值和新值一起提交給這個方法,如果舊值與他在Atomic對象中發現的值不一致,那么這個操作就會失敗,這意味著某個其他任務已經于此操作前修改了這個對象。
2、通常情況下,我們使用 synchronized 或 lock來防止多個任務同時修改同一個對象,但這里我們是樂觀的,因為我們保持數據為為鎖定狀態,并希望沒有任何其他任務插入修改他。使用 Atomic替代 synchronized或 Lock,可以獲得性能上的好處;
3、注意:compareAndSet() 方法操作失敗會發生什么? 建議程序做好補償機制;
【代碼——FastSimulation】
【21.9.4】ReadWriteLock 讀寫鎖
1、定義: ReadWriteLock對向數據結構相對不頻繁寫入,但是有多個任務要經常讀取這個數據結構的這類情況進行了優化。
ReadWriteLock 使得你可以同時有多個讀取這,只要他們都不試圖寫入。如果寫鎖已經被其他任務持有,那么任何讀取者都不能訪問,直到這個寫鎖被釋放為止。
2、ReadWriteLock是否能夠提高程序性能是不確定的,取決于數據被讀取與修改的頻率,讀取和寫入操作的時間,有多少線程競爭以及是否在多處理器上運行等因素。
最好的方法就是用實驗來證明是否性能提升。
【代碼——ReaderWriterList】
【21.10】活動對象
1、有一種可替換的方式被稱為活動對象或行動者。之所以稱這些對象是活動的,是因為每個對象都維護著它自己的工作器線程和消息隊列,并且所有對這種對象的請求都將進入隊列排隊,任何時刻都只能運行其中一個。然而,有個活動對象,可以串行化消息而不是方法,這意味著不需要防備一個任務在其循環的中間被中斷這種問題。
2、當你向一個活動對象發送消息時,這條消息會轉變為一個任務,該任務會被插入到這個對象的隊列中,等待在以后的某個時刻運行。Future在實現這種模式時將派上用場。
【看個荔枝】有兩個方法,可以將方法調用排進隊列;
/*** 活動對象演示* page 764 */ public class ActiveObjectDemo {/** 線程池 */ private ExecutorService exec = Executors.newSingleThreadExecutor();private Random rand = new Random(47);/** 暫停方法,睡眠 */private void pause(int factor) {try {TimeUnit.MILLISECONDS.sleep(100+ rand.nextInt(factor));} catch (InterruptedException e) {System.out.println("sleep interrupt");} } /** 調用int方法 */public Future<Integer> calculateInt(final int x, final int y) {return exec.submit(new Callable<Integer>() {public Integer call() {System.out.println("starting x = " + x + ", y = " + y);pause(500);return x+ y ;}});}public Future<Float> calculateFloat(final float x, final float y) {return exec.submit(new Callable<Float>() {public Float call() {System.out.println("starting x = " + x + ", y = " + y);pause(2000);return x+ y; }});}public void shutdown() {exec.shutdown(); }public static void main(String[] args) {ActiveObjectDemo d1 = new ActiveObjectDemo(); /*在計算機中就是當你想要對一塊內存進行修改時,我們不在原有內存塊中進行寫操作* ,而是將內存拷貝一份,在新的內存中進行寫操作* ,寫完之后呢,就將指向原來內存指針指向新的內存,原來的內存就可以被回收掉嘛!*/List<Future<?>> results = new CopyOnWriteArrayList<Future<?>>();for (float f = 0.0f; f < 1.0f; f += 0.2f) {results.add(d1.calculateFloat(f, f));}for (int i = 0; i < 5; i++) {results.add(d1.calculateInt(i, i));}System.out.println("========== all asynch calls made ========== ");int index = 0;while(results.size() >0) { // while 循環,再放一層for循環,因為 可能f.isDone() 為false for (Future<?> f: results) {if (f.isDone()) {try {System.out.println("f.get(" + ++index +") = " + f.get()); } catch (Exception e) {throw new RuntimeException(e);}results.remove(f);}} d1.shutdown(); }} } /* ========== all asynch calls made ========== starting x = 0.0, y = 0.0 f.get(1) = 0.0 starting x = 0.2, y = 0.2 f.get(2) = 0.4 starting x = 0.4, y = 0.4 f.get(3) = 0.8 starting x = 0.6, y = 0.6 f.get(4) = 1.2 starting x = 0.8, y = 0.8 f.get(5) = 1.6 starting x = 0, y = 0 starting x = 1, y = 1 f.get(6) = 0 f.get(7) = 2 starting x = 2, y = 2 f.get(8) = 4 starting x = 3, y = 3 f.get(9) = 6 starting x = 4, y = 4 f.get(10) = 8 */代碼解說:使用 CopyOnWriteArrayList 可以移除為了防止 ConcurrentModificationException而復制List的這種需求;
小結:
小結1:為了能夠在不經意間就可以防止線程之間的耦合,任何傳遞給活動對象方法調用的參數都必須是只讀的其他活動對象,或者是不連續對象。即沒有連接任何其他任務的對象。
小結2;有個活動對象, 你可以干一下事情:
事情1、每個對象都可以擁有自己的工作器線程;
事情2、每個對象都將維護對他自己的域的全部控制權; 這比普通類要嚴格一些,普通類只是擁有防護它們的域的選擇權;
事情3、所有在活動對象之間的通信都將以在這些對象之間的消息形式發生;
事情4、活動對象之間的所有消息都要排隊;
?
【21.11】總結
【21.11.1】java線程進行并發編碼的基礎知識;
1、可以運行多個獨立任務;
2、必須考慮當這些任務關閉時,可能出現的所有問題;
3、任務可能會在共享資源上彼此干涉。互斥(或鎖)是用來防止這種沖突的基本工具;
4、如果任務設計得不夠合理,就有可能會死鎖;
【21.11.2】什么時候使用并發,什么時候應該避免并發非常關鍵,使用并發的原因如下:
1、要處理很多任務,它們交織在一起,應用并發能夠更有效地使用計算機;
2、要能夠更好地組織代碼;
3、要便于用戶使用;
【21.11.3】 線程的好處
1、輕量級的上下文切換:輕量級的線程上下萬切換只需要100條指令,重量級的進程上下文切換需要上千條指令;
2、因為一個給定進程內的所有線程共享相同的內存空間,輕量級的上下文切換只是改變了程序的執行序列和局部變量,而進程切換必須改變所有內存空間;(干貨——線程的上下文切換是輕量級的,進程的上下文切換是重量級的)
【21.11.4】多線程的缺陷
1、等待共享資源的時候性能降低;
2、需要處理線程的額外cpu花費;
3、糟糕的程序設計導致不必要的復雜度;
4、有可能產生一些病態行為,如餓死,競爭,死鎖,活鎖(多個運行各自任務的線程使得整體無法完成);
5、不同平臺導致的不一致性;
?
?
?
?
總結
以上是生活随笔為你收集整理的think-in-java(21)并发的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何快速查询ccc证书号
- 下一篇: 吃透 | Elasticsearch f