《Java高并发编程详解:多线程与架构设计》笔记(一)
目錄
序言
?
線程的查看
線程生命周期
線程的構造函數
JVM內存結構
守護線程
Thread API
線程的關閉
異常退出
進程假死
線程安全與數據同步
死鎖原因
線程間通信
同步阻塞和異步阻塞
單線程間通信
多線程通信
自定義顯式鎖BooleanLock
ThreadGroup
Hook線程以及捕獲線程執行異常
線程池原理以及自定義線程池
總結
序言
當個人的發展遇到了瓶頸,想一想是不是因為自己的基礎不夠扎實,或者是之前之前看過的書沒有理解。靜下心來回過頭看看書,看會不會有新的理解,畢竟有些東西自己沒怎么用很容易忘,我個人也認為基礎還是比較重要的東西,看視頻會是一個比較容易接受的方式,不過看書才最能鞏固基礎,畢竟書本上的內容才是最成體系的。
該篇是汪文君2018年第一版的《Java高并發編程詳解:多線程與架構設計》一書的第一章至第八章的筆記。
?
線程的查看
1、使用Jconsole或Jstack命令來查看JVM線程。Jconsole是JDK自帶的可視化界面,如下圖,
Jstack命令查看對應PID(使用命令ps -ef查看進程),如下圖我寫了一個死鎖的demo,使用jstack 進程號可以看到情況。
線程生命周期
線程NEW狀態:當用關鍵字new創建一個Thread對象時,線程為New狀態。
線程RUNNABLE狀態:當線程調用start方法進入RUNNABLE狀態。RUNNABLE狀態只能意外終止或者進入RUNNING狀態。
線程RUNNING狀態:一旦CPU通過輪詢或者其他方式從可執行隊列中選中了該線程,該線程才真正地執行自己的邏輯代碼,進入RUNNING狀態。調用yield方法放棄CPU執行權進入RUNNABLE狀態。
線程BLOCKED狀態:比如從RUNNING狀態調用了sleep或者wait方法加入waitSet中;競爭鎖資源而加入到阻塞隊列;阻塞的IO操作進入阻塞狀態。
線程TERMINATED狀態:JDK不推薦使用stop方法或者意外死亡(JVM Crash),一般線程正常結束生命周期。
線程的構造函數
線程的構造函數:如果一個線程沒有顯式的指定ThreadGroup則它和父線程同屬一個ThreadGroup。
棧內存通過xss參數設置,線程的構造函數中stacksize越大表明線程內方法遞歸調用深度就越深,stacksize越小則代表創建的線程數量越多。
JVM內存結構
?
堆內存不變,棧內存越大,可創建的線程數量越小。這是由于虛擬機棧內存線程私有,每一個線程都會占有指定的內存大小,Java進程的內存大小=堆內存+線程數*棧內存。
JVM可創建多少個線程與堆內存、棧內存有關,線程數量 = ,其中MaxProcessMemory是最大地址空間,HeapMemory是JVM堆內存,ReservedOsMemory是系統保留內存(一般136M)。
守護線程
一般用于處理后臺工作,如JDK垃圾回收線程。正常情況下,JVM中沒有一個非守護線程,則JVM的進程會退出。守護線程具備自動結束聲明周期的特性。
設置守護線程只需調用Thread.setDaemon(true)方法即可,它常用作執行一些后臺任務,當需要關閉某些線程的時候,或者退出JVM進程的時候,一些線程能夠自動關閉,這時采用守護線程。
Thread API
?
線程的關閉
JDK有一個過期(Deprecated)方法stop,早已不推薦使用,保留是為了兼容舊服務。stop方法存在的問題是關閉線程時可能不會釋放掉monitor的鎖,所以強烈不推薦使用。關閉線程有以下幾種方法:
1、線程結束生命周期:線程正常運行結束(生命周期結束)。
2、捕獲中斷信號關閉線程:線程中循環執行某個任務,如心跳檢查。通過檢查線程interrupt的標識來決定是否退出。
3、使用volatile開關控制:由于線程的interrupt標識很有可能被擦除,或者邏輯單元不會調用任何可中斷方法,使用volatile修飾的開發flag關閉線程是一種常用做法。
public class FlagThreadExit {static class MyTask extends Thread{private volatile boolean closed = false;@Overridepublic void run(){System.out.println("I will start work");while(!closed && !isInterrupted()){//working}System.out.println("I will be exiting.");}public void close(){this.closed = true;this.interrupt();}}public static void main(String[] args) throws InterruptedException {MyTask t = new MyTask();t.start();TimeUnit.MINUTES.sleep(1);System.out.println("System will be shutdown.");t.close();} }異常退出
在線程執行單元中,不允許拋checked異常(無論Thread.run方法還是Runnable的run方法),如果需要捕獲的話將checked異常封裝成unchecked異常(RuntimeException)拋出而結束線程生命周期。
進程假死
假死的絕大部分原因是某個線程阻塞了,或者出現死鎖的情況。使用jstack、jconsole、jvisualvm工具診斷。
線程安全與數據同步
多個線程同時對同一份資源進行訪問(讀寫操作)時,保證多個線程訪問到的數據一致,出現不一致的原因是由于線程的執行是由CPU時間片輪詢調度的。
通過synchronized關鍵字可以防止線程干擾和內存一致性錯誤,synchronized關鍵字的具體表現如下:
- synchronzed關鍵字提供了鎖的機制,能確保共享變量的互斥訪問。
- synchroinzed關鍵字包含monitor enter和monitor exit兩個JVM指令,它能保證在任何時候任何線程執行到monitor enter成功之前都必須從主內存中獲取數據,而不是從緩存中,在monitor exit成功之后,共享變量被更新后的值必須刷入主內存。
- synchronized指令嚴格遵守happends-before規則,一個monitor exit指令之前必須要有一個monitor enter。
舉個簡單栗子,創建5個線程,每個線程持有鎖1分鐘,如下,
package com.hust.zhang.threadSafe;import java.util.concurrent.TimeUnit;public class Mutex {private final static Object MUTEX = new Object();public void accessResource() {synchronized (MUTEX) {try {TimeUnit.MINUTES.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {final Mutex mutex = new Mutex();for (int i = 0; i < 5; i++) {new Thread(mutex::accessResource).start();}} }Jconsole中可以看到當前持有鎖的線程為Thread-0,且線程狀態為TIMED_WAITING狀態。
?
使用JDK命令javap對編譯后的class文件進行反匯編可以看到monitor enter和monitor exit成對出現,且滿足happen-before原則。每個對象都有一個監視器鎖(monitor),被占用就會處于鎖定狀態,若已占有該monitor,重新進入(monitor enter),則進入數+1。?
使用synchronized需要注意的幾個地方:
?
死鎖原因
線程間通信
同步阻塞和異步阻塞
同步阻塞消息處理缺點:客戶端等待時間過長會陷入阻塞;吞吐量不高;頻繁創建開啟與銷毀;業務高峰系統性能低。
異步非阻塞消息處理:優勢明顯,但也存在缺陷,如客戶端再次調用接口方法仍然需要進行查詢(可通過異步回調接口解決)。
單線程間通信
服務器端與客戶端通過事件隊列進行通信的case比較好的方式就是使用通知機制:創建一個事件隊列,有事件則通知工作線程開始工作,沒有則工作線程休息并等待通知。下面就是這樣的case。
事件隊列:
package com.hust.zhang.conn;import java.util.LinkedList;import static java.lang.Thread.currentThread;public class EventQueue {private int max;public EventQueue(int num) {this.max = num;}public EventQueue() {this(DEFAULT_MAX_EVENT);}//object類是所有類的父類static class Event {}private final LinkedList<Event> eventQueue = new LinkedList<>();private final static int DEFAULT_MAX_EVENT = 10;public void offer(Event event) {synchronized (eventQueue) {//當共享資源eventQueue隊列達到上限,調用eventQueue的wait方法使當前線程進入wait set中并釋放monitor的鎖if (eventQueue.size() >= max) {try {console("the queue is full.");/*** wait方法:* 1、可中斷,一旦調用wait方法進入阻塞狀態,其他線程是可以使用interrupt方法將其打斷。* 2、執行某個對象的wait方法后,加入與之對應的wait set中,每一個對象的monitor都有一個與之關聯的wait set。* 3、必須在同步方法中使用wait和notify,因為執行wait和notify前提條件是必須持有同步方法的monitor所有權。否則會出現IllegalMonitorStateException。* */eventQueue.wait();} catch (InterruptedException e) {e.printStackTrace();}}console("the event is submitted");eventQueue.addLast(event);eventQueue.notify();}}public Event take() {synchronized (eventQueue) {if (eventQueue.isEmpty()) {try {console("the queue is empty");//eventQueue是Event類的集合,調用的是父類Object的wait方法eventQueue.wait();} catch (InterruptedException e) {e.printStackTrace();}}Event event = eventQueue.removeFirst();//notify喚醒在此對象監視器monitor上等待的單個線程this.eventQueue.notify();console("the event " + event + " is handled.");return event;}}private void console(String message) {System.out.printf("%s:%s\n", currentThread().getName(), message);}}模擬服務者和消費者的兩個線程:
package com.hust.zhang.conn;import java.util.concurrent.TimeUnit;public class EventClient {public static void main(String[] args) {final EventQueue eventQueue = new EventQueue();new Thread(() -> {for (; ; ) {eventQueue.offer(new EventQueue.Event());}}, "Producer").start();new Thread(() -> {for (; ; ) {eventQueue.take();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}, "Consumer").start();} }多線程通信
上面的case中Producer很快提交了10個Event數據,此時隊列已滿,然后執行eventQueue的wait方法進入阻塞狀態,Consumer線程由于要處理數據,花費1秒處理其中的一條數據,然后通知Producer線程可以繼續提交數據了,如此循環。
但是上面的case如果有多個線程同時take或offer上面的程序就會出現數據不一致的問題,當eventQueue元素為空時,兩個線程執行take方法分別調用wait方法進入阻塞,另一個offer線程執行addLast方法后喚醒了其中一個阻塞的線程,該線程順利消費了一個元素之后恰巧再次喚醒了一個take線程,這時導致執行空LinkedList的removeFirst方法。所以再在上面做了一定的優化,判斷eventQuque隊列滿或空變成了輪詢隊列條件(if -> while),喚醒在此對象監視器monitor等待的單個線程變成喚醒在此對象監視器monitor等待的所有線程(notify -> notifyAll)。這樣改進可以防止多個線程同時take或offer造成的線程安全問題。
自定義顯式鎖BooleanLock
synchronized提供的是一種排他式的數據同步機制,某個線程在獲取monitor lock的時候可能會被阻塞,而這種阻塞有兩個很明顯的缺陷:
下面是一個缺陷分析的case。
package com.hust.zhang.synchronizedAnalysis;import java.util.concurrent.TimeUnit;public class SynchronizedDefect {public synchronized void syncMethod() {try {//阻塞時間長無法控制TimeUnit.HOURS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws InterruptedException {SynchronizedDefect defect = new SynchronizedDefect();Thread t1 = new Thread(defect::syncMethod, "T1");//make sure the t1 startt1.start();TimeUnit.MICROSECONDS.sleep(2);//T2因爭搶monitor的鎖而進入阻塞狀態,無法中斷Thread t2 = new Thread(defect::syncMethod, "T2");t2.start();//雖然可以設置中斷標識,但是無法被中斷TimeUnit.MICROSECONDS.sleep(2);t2.interrupt();System.out.println("t2.isInterrupt: " + t2.isInterrupted()); //trueSystem.out.println("t1.state: " + t1.getState()); //TIMED_WAITINGSystem.out.println("t2.state: " + t2.getState()); //BLOCKED} }上面的case可以看到線程t2因為爭搶monitor的鎖而進入阻塞狀態,對其調用interrupt方法只會設置中斷標識,線程一直處于阻塞狀態無法被中斷。但如果是休眠中的線程(Thread.sleep),調用interrupt方法會中斷該線程并拋出InterruptException異常。
所以這里采用自定義顯式鎖BooleanLock,demo如下,
鎖接口:
package com.hust.zhang.synchronizedAnalysis;import java.util.List; import java.util.concurrent.TimeoutException;public interface Lock {//永遠阻塞,除非獲取到了鎖,方法可以被中斷void lock() throws InterruptedException;//增加超時功能void lock(long mills) throws InterruptedException, TimeoutException;//鎖的釋放void unlock();//獲取當前哪些線程被阻塞List<Thread> getBlockedThreads(); }自定義顯式鎖實現類:
package com.hust.zhang.synchronizedAnalysis;import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.TimeoutException;import static java.lang.System.currentTimeMillis; import static java.lang.Thread.currentThread;public class BooleanLock implements Lock {//當前擁有鎖的線程private Thread currentThread;//boolean開關,true代表該鎖被某個線程獲得,false代表當前鎖沒有被哪個線程獲得或者已經釋放private boolean locked = false;//存儲哪些線程在獲取當前線程時進入阻塞狀態private final List<Thread> blockedList = new ArrayList<>();@Overridepublic void lock() throws InterruptedException {//同步代碼塊synchronized (this) {//當前鎖被某線程獲得,則該線程加入阻塞隊列,并使當前線程wait釋放對this monitor的所有權while (locked) {blockedList.add(currentThread());this.wait();}//如果當前線程沒有被其他線程獲得,則該線程會從阻塞隊列中刪除自己(如未進入阻塞隊列刪除也不會有影響)blockedList.remove(currentThread());//locked開關設為truethis.locked = true;//記錄獲取鎖的線程this.currentThread = currentThread();}}@Overridepublic void lock(long mills) throws InterruptedException, TimeoutException {//同步代碼塊synchronized (this) {//如果mills不合法,則默認調用lock方法,拋出異常也是一個比較好的做法if (mills <= 0) {this.lock();} else {long remainingMills = mills;long endMills = currentTimeMillis() + remainingMills;while (locked) {//如果remainingMills<=0,則表示當前線程被其他線程喚醒或者在指定的wait時間到之后還沒有獲得鎖if (remainingMills <= 0) throw new TimeoutException("can not get the lock during " + mills);if (!blockedList.contains(currentThread)) blockedList.add(currentThread());//等待remainingMills的毫秒數,該值最開始由其他線程傳入,但多次wait過程中會重新計算this.wait(remainingMills);//重新計算remainingMillsremainingMills = endMills - currentTimeMillis();}//獲得該鎖,并且從block隊列中刪除當前線程,將locked的狀態設置為true,并且指定獲得鎖的線程就是當前線程blockedList.remove(currentThread());this.locked = true;this.currentThread = currentThread();}}}@Overridepublic void unlock() {synchronized (this) {//判斷當前線程是否為獲取鎖的那個線程,只有加了鎖的線程才有資格進行解鎖if (currentThread == currentThread()) {this.locked = false;//Optional類是一個可以為null的容器對象。ifPresent方法可以接受接口段或lambda表達式Optional.of(currentThread().getName() + "release the lock.").ifPresent(System.out::println);//通知其他在wait set中的線程,大家可以嘗試搶鎖了this.notifyAll();}}}@Overridepublic List<Thread> getBlockedThreads() {//重構收發Encapsulate Collection(封裝集群)將參數中的List返回一個不可修改的Listreturn Collections.unmodifiableList(blockedList);} }測試類:
package com.hust.zhang.synchronizedAnalysis;import java.util.concurrent.TimeUnit; import java.util.stream.IntStream;import static java.lang.Thread.currentThread; import static java.util.concurrent.ThreadLocalRandom.current;public class BooleanLockTest {private final Lock lock = new BooleanLock();public void synMethod() throws InterruptedException {lock.lock();try {int randomInt = current().nextInt(10);System.out.println(currentThread() + "get the lock.");TimeUnit.SECONDS.sleep(randomInt);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public static void main(String[] args) {BooleanLockTest test = new BooleanLockTest();IntStream.range(0, 10).mapToObj(i -> new Thread(() -> {try {test.synMethod();} catch (InterruptedException e) {e.printStackTrace();}})).forEach(Thread::start);} }ThreadGroup
默認情況下,新的線程都會被加入到main線程的group中。
ThreadGroup currentGroup = Thread.currentThread().getThreadGroup();ThreadGroup group1 = new ThreadGroup("Group1");System.out.println(group1.getParent() == currentGroup); //trueThreadGroup group2 = new ThreadGroup(group1, "Group2"); //trueSystem.out.println(group2.getParent() == group1);ThreadGroup中的enumerate方法會將ThreadGroup中的active線程全部復制到Thread數組中。
package com.hust.zhang.threadGroup;import java.util.concurrent.TimeUnit;public class ThreadGroupEnumerateThreads {public static void main(String[] args) throws InterruptedException {ThreadGroup myGroup = new ThreadGroup("MyGroup");Thread thread = new Thread(myGroup, () -> {while (true) {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}, "MyThread");thread.start();TimeUnit.MICROSECONDS.sleep(2);ThreadGroup mainGroup = Thread.currentThread().getThreadGroup();Thread[] list = new Thread[mainGroup.activeCount()];/*** enumerate方法獲取的線程僅僅是預估值,并不能百分之百的保證當前group的活躍線程數,* 比如在調用復制之后,某個線程結束了生命周期或者新的線程加入進來,都會導致數據的不準確。*/int recurseSize = mainGroup.enumerate(list);System.out.println("主線程組活躍線程數 = " + recurseSize); //3//遞歸recurse設置為false,myGroup中的線程不會包含在內recurseSize = mainGroup.enumerate(list, false); //2System.out.println(recurseSize);} }enumerate也可以復制ThreadGroup線程組,如下
package com.hust.zhang.threadGroup;import java.util.concurrent.TimeUnit;public class ThreadGroupEnumerateThreadGroup {public static void main(String[] args) throws InterruptedException {ThreadGroup myGroup1 = new ThreadGroup("MyGroup1");ThreadGroup myGroup2 = new ThreadGroup(myGroup1, "MyGroup2");TimeUnit.MICROSECONDS.sleep(2);ThreadGroup mainGroup = Thread.currentThread().getThreadGroup();ThreadGroup[] list = new ThreadGroup[mainGroup.activeGroupCount()];int recurseSize = mainGroup.enumerate(list);System.out.println("主線程組活躍子線程組數 = " + recurseSize); //2recurseSize = mainGroup.enumerate(list, false);System.out.println(recurseSize); //1} }注意事項:
Hook線程以及捕獲線程執行異常
Hook線程也被成為鉤子。Thread類中,處理運行時異常的API總共四個:
- setUncaughtExceptionHandler方法:為某個特定線程指定UncaughtExceptionHandler。
- setDefaultUncaughtExceptionHandler方法:設置全局的UncaughtExceptionHandler。
- getUncaughtExceptionHandler方法:獲取特定線程的UncaughtExceptionHandler。
- getDefaultUncaughtExceptionHandler方法:獲取全局的UncaughtExceptionHandler。
UncaughtExceptionHandler是一個FuncationalInterface,只有一個抽象方法,該回調接口會被Thread中的dispatchUncaughtException方法調用。
下面就是一個UncaughtExceptionHandler的栗子,設置的回調接口將獲得該異常信息并打印出來
package com.hust.zhang.hook;import java.util.concurrent.TimeUnit;public class CaptureThreadException {public static void main(String[] args) {Thread.setDefaultUncaughtExceptionHandler((t, e) -> {System.out.println(t.getName() + "occur exception");e.printStackTrace();});final Thread thread = new Thread(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}//這里出現unchecked異常System.out.println(1 / 0);}, "Test-thread");thread.start();} }Hook線程實戰:在開發中為了防止某個程序被重復啟動,在進程啟動的時候創建一個lock文件,進程收到中斷信息的時候會刪除這個lock文件。在mysql服務器、zookeeper、kafka等系統中都能看到lock文件的存在。下面模擬一個防止重復啟動的程序。
package com.hust.zhang.hook;import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; import java.util.Set; import java.util.concurrent.TimeUnit;public class PreventDuplicated {private final static String LOCK_PATH = "/Users/kaizhang/workspace/hust-zhang/locks";private final static String LOCK_FILE = ".lock";private final static String PERMISSIONS = "rw-------";public static void main(String[] args) throws IOException {//注入hook線程,在程序退出時刪除lock文件Runtime.getRuntime().addShutdownHook(new Thread(() -> {System.out.println("The program received kill SIGNAL.");getLockFile().toFile().delete();}));//檢查是否存在.lock文件checkRunning();//模擬當前程序運行for (; ; ) {try {TimeUnit.MICROSECONDS.sleep(1);System.out.println("program is running.");} catch (InterruptedException e) {e.printStackTrace();}}}private static void checkRunning() throws IOException {Path path = getLockFile();if (path.toFile().exists()) throw new RuntimeException("The program already running.");Set<PosixFilePermission> perms = PosixFilePermissions.fromString(PERMISSIONS);Files.createFile(path, PosixFilePermissions.asFileAttribute(perms));}private static Path getLockFile() {return Paths.get(LOCK_PATH, LOCK_FILE);} }啟動后,會在目錄路徑下生成一個.lock文件(checkRunning方法),
kill pid或者kill -9?pid命令后,JVM收到中斷信息,并且啟動hook線程刪除.lock文件,這個大家可以下去自己實操一下。需要注意的是下面幾點:
線程池原理以及自定義線程池
說到線程池之前的一篇文章也寫到阿里禁止直接使用JUC(JDK的Java Utilities Concurrent)中的ExecutorService創建線程池。線程池用來異步執行線程任務,主要原理圖如下,
然后本章主要是結合前面基礎寫了一個比較簡單的ThreadPool,告訴我們是采用什么樣的思路去開發線程池,線程池也存在著很多問題,這里就不貼出來了,直接可以看ThreadPoolExecutor類實現線程池的源碼,下面放一個圖,可以看到ThreadPoolExecutor是考慮到了線程安全問題的,Worker內部類繼承自AQS實現,幾個線程池參數都使用了volatile關鍵字修飾確保線程可見性和禁止指令重排序。
?
?
總結
本書前面八章內容會讓大家對多線程有基本的認識,這是后面內容的基礎。小伙伴們加油!
總結
以上是生活随笔為你收集整理的《Java高并发编程详解:多线程与架构设计》笔记(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: sql 约束语句
- 下一篇: Java回调函数实例