日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

5W字高质量java并发系列详解教程(上)-附PDF下载

發(fā)布時(shí)間:2024/2/28 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 5W字高质量java并发系列详解教程(上)-附PDF下载 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

  • 第一章 java.util.concurrent簡(jiǎn)介
    • 主要的組件
    • Executor
    • ExecutorService
    • ScheduledExecutorService
    • Future
    • CountDownLatch
    • CyclicBarrier
    • Semaphore
    • ThreadFactory
  • 第二章 java并發(fā)中的Synchronized關(guān)鍵詞
    • 為什么要同步
    • Synchronized關(guān)鍵詞
      • Synchronized Instance Methods
      • Synchronized Static Methods
      • Synchronized Blocks
  • 第三章 java中的Volatile關(guān)鍵字使用
    • 什么時(shí)候使用volatile
    • Happens-Before
  • 第四章 wait和sleep的區(qū)別
    • Wait和sleep的區(qū)別
    • 喚醒wait和sleep
  • 第五章 java中Future的使用
    • 創(chuàng)建Future
    • 從Future獲取結(jié)果
    • 取消Future
    • 多線程環(huán)境中運(yùn)行
  • 第六章 java并發(fā)中ExecutorService的使用
    • 創(chuàng)建ExecutorService
    • 為ExecutorService分配Tasks
    • 關(guān)閉ExecutorService
    • Future
    • ScheduledExecutorService
    • ExecutorService和 Fork/Join
  • 第七章 java中Runnable和Callable的區(qū)別
    • 運(yùn)行機(jī)制
    • 返回值的不同
    • Exception處理
  • 第八章 ThreadLocal的使用
    • 在Map中存儲(chǔ)用戶數(shù)據(jù)
    • 在ThreadLocal中存儲(chǔ)用戶數(shù)據(jù)
  • 第九章 java中線程的生命周期
    • java中Thread的狀態(tài)
    • NEW
    • Runnable
    • BLOCKED
    • WAITING
    • TIMED_WAITING
    • TERMINATED
  • 第十章 java中join的使用
  • 第十一章 怎么在java中關(guān)閉一個(gè)thread
  • 第十二章 java中的Atomic類
    • 問(wèn)題背景
    • Lock
    • 使用Atomic
  • 第十三章 java中interrupt,interrupted和isInterrupted的區(qū)別
    • isInterrupted
    • interrupted
    • interrupt
  • 總結(jié)

并發(fā)是java高級(jí)程序員必須要深入研究的話題,從Synchronized到Lock,JDK本身提供了很多優(yōu)秀的并發(fā)類和鎖控制器,靈活使用這些類,可以寫(xiě)出優(yōu)秀的并發(fā)程序,而這些類基本上都是在java.util.concurrent包中的,本文將會(huì)從具體的例子出發(fā),一步一步帶領(lǐng)大家進(jìn)入java高質(zhì)量并發(fā)的世界。

本文PDF下載鏈接concurrent-all-in-one.pdf

本文的例子可以參考https://github.com/ddean2009/learn-java-concurrency/

第一章 java.util.concurrent簡(jiǎn)介

java.util.concurrent包提供了很多有用的類,方便我們進(jìn)行并發(fā)程序的開(kāi)發(fā)。本文將會(huì)做一個(gè)總體的簡(jiǎn)單介紹。

主要的組件

java.util.concurrent包含了很多內(nèi)容, 本文將會(huì)挑選其中常用的一些類來(lái)進(jìn)行大概的說(shuō)明:

  • Executor
  • ExecutorService
  • ScheduledExecutorService
  • Future
  • CountDownLatch
  • CyclicBarrier
  • Semaphore
  • ThreadFactory

Executor

Executor是一個(gè)接口,它定義了一個(gè)execute方法,這個(gè)方法接收一個(gè)Runnable,并在其中調(diào)用Runnable的run方法。

我們看一個(gè)Executor的實(shí)現(xiàn):

public class Invoker implements Executor {@Overridepublic void execute(Runnable r) {r.run();} }

現(xiàn)在我們可以直接調(diào)用該類中的方法:

public void execute() {Executor executor = new Invoker();executor.execute( () -> {log.info("{}", Thread.currentThread().toString());});}

注意,Executor并不一定要求執(zhí)行的任務(wù)是異步的。

ExecutorService

如果我們真正的需要使用多線程的話,那么就需要用到ExecutorService了。

ExecutorService管理了一個(gè)內(nèi)存的隊(duì)列,并定時(shí)提交可用的線程。

我們首先定義一個(gè)Runnable類:

public class Task implements Runnable {@Overridepublic void run() {// task details} }

我們可以通過(guò)Executors來(lái)方便的創(chuàng)建ExecutorService:

ExecutorService executor = Executors.newFixedThreadPool(10);

上面創(chuàng)建了一個(gè)ThreadPool, 我們也可以創(chuàng)建單線程的ExecutorService:

ExecutorService executor =Executors.newSingleThreadExecutor();

我們這樣提交task:

public void execute() { executor.submit(new Task()); }

因?yàn)镋xecutorService維持了一個(gè)隊(duì)列,所以它不會(huì)自動(dòng)關(guān)閉, 我們需要調(diào)用executor.shutdown() 或者executor.shutdownNow()來(lái)關(guān)閉它。

如果想要判斷ExecutorService中的線程在收到shutdown請(qǐng)求后是否全部執(zhí)行完畢,可以調(diào)用如下的方法:

try {executor.awaitTermination( 5l, TimeUnit.SECONDS );} catch (InterruptedException e) {e.printStackTrace();}

ScheduledExecutorService

ScheduledExecutorService和ExecutorService很類似,但是它可以周期性的執(zhí)行任務(wù)。

我們這樣創(chuàng)建ScheduledExecutorService:

ScheduledExecutorService executorService= Executors.newSingleThreadScheduledExecutor();

executorService的schedule方法,可以傳入Runnable也可以傳入Callable:

Future<String> future = executorService.schedule(() -> {// ...return "Hello world";}, 1, TimeUnit.SECONDS);ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {// ...}, 1, TimeUnit.SECONDS);

還有兩個(gè)比較相近的方法:

scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit )scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit )

兩者的區(qū)別是前者的period是以任務(wù)開(kāi)始時(shí)間來(lái)計(jì)算的,后者是以任務(wù)結(jié)束時(shí)間來(lái)計(jì)算。

Future

Future用來(lái)獲取異步執(zhí)行的結(jié)果。可以調(diào)用cancel(boolean mayInterruptIfRunning) 方法來(lái)取消線程的執(zhí)行。

我們看下怎么得到一個(gè)Future對(duì)象:

public void invoke() {ExecutorService executorService = Executors.newFixedThreadPool(10);Future<String> future = executorService.submit(() -> {// ...Thread.sleep(10000l);return "Hello world";}); }

我們看下怎么獲取Future的結(jié)果:

if (future.isDone() && !future.isCancelled()) {try {str = future.get();} catch (InterruptedException | ExecutionException e) {e.printStackTrace();} }

future還可以接受一個(gè)時(shí)間參數(shù),超過(guò)指定的時(shí)間,將會(huì)報(bào)TimeoutException。

try {future.get(10, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) {e.printStackTrace(); }

CountDownLatch

CountDownLatch是一個(gè)并發(fā)中很有用的類,CountDownLatch會(huì)初始化一個(gè)counter,通過(guò)這個(gè)counter變量,來(lái)控制資源的訪問(wèn)。我們會(huì)在后面的文章詳細(xì)介紹。

CyclicBarrier

CyclicBarrier和CountDownLatch很類似。CyclicBarrier主要用于多個(gè)線程互相等待的情況,可以通過(guò)調(diào)用await() 方法等待,知道達(dá)到要等的數(shù)量。

public class Task implements Runnable {private CyclicBarrier barrier;public Task(CyclicBarrier barrier) {this.barrier = barrier;}@Overridepublic void run() {try {LOG.info(Thread.currentThread().getName() + " is waiting");barrier.await();LOG.info(Thread.currentThread().getName() + " is released");} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}} public void start() {CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {// ...LOG.info("All previous tasks are completed");});Thread t1 = new Thread(new Task(cyclicBarrier), "T1"); Thread t2 = new Thread(new Task(cyclicBarrier), "T2"); Thread t3 = new Thread(new Task(cyclicBarrier), "T3"); if (!cyclicBarrier.isBroken()) { t1.start(); t2.start(); t3.start(); } }

Semaphore

Semaphore包含了一定數(shù)量的許可證,通過(guò)獲取許可證,從而獲得對(duì)資源的訪問(wèn)權(quán)限。通過(guò) tryAcquire()來(lái)獲取許可,如果獲取成功,許可證的數(shù)量將會(huì)減少。

一旦線程release()許可,許可的數(shù)量將會(huì)增加。

我們看下怎么使用:

static Semaphore semaphore = new Semaphore(10);public void execute() throws InterruptedException {LOG.info("Available permit : " + semaphore.availablePermits());LOG.info("Number of threads waiting to acquire: " + semaphore.getQueueLength());if (semaphore.tryAcquire()) {try {// ...}finally {semaphore.release();}}}

ThreadFactory

ThreadFactory可以很方便的用來(lái)創(chuàng)建線程:

public class ThreadFactoryUsage implements ThreadFactory {private int threadId;private String name;public ThreadFactoryUsage(String name) {threadId = 1;this.name = name;}@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r, name + "-Thread_" + threadId);log.info("created new thread with id : " + threadId +" and name : " + t.getName());threadId++;return t;} }

第二章 java并發(fā)中的Synchronized關(guān)鍵詞

如果在多線程的環(huán)境中,我們經(jīng)常會(huì)遇到資源競(jìng)爭(zhēng)的情況,比如多個(gè)線程要去同時(shí)修改同一個(gè)共享變量,這時(shí)候,就需要對(duì)資源的訪問(wèn)方法進(jìn)行一定的處理,保證同一時(shí)間只有一個(gè)線程訪問(wèn)。

java提供了synchronized關(guān)鍵字,方便我們實(shí)現(xiàn)上述操作。

為什么要同步

我們舉個(gè)例子,我們創(chuàng)建一個(gè)類,提供了一個(gè)setSum的方法:

public class SynchronizedMethods {private int sum = 0;public void calculate() {setSum(getSum() + 1);} }

如果我們?cè)诙嗑€程的環(huán)境中調(diào)用這個(gè)calculate方法:

@Testpublic void givenMultiThread_whenNonSyncMethod() throws InterruptedException {ExecutorService service = Executors.newFixedThreadPool(3);SynchronizedMethods summation = new SynchronizedMethods();IntStream.range(0, 1000).forEach(count -> service.submit(summation::calculate));service.shutdown();service.awaitTermination(1000, TimeUnit.MILLISECONDS);assertEquals(1000, summation.getSum());}

按照上面的方法,我們預(yù)計(jì)要返回1000, 但是實(shí)際上基本不可能得到1000這個(gè)值,因?yàn)樵诙嗑€程環(huán)境中,對(duì)同一個(gè)資源進(jìn)行同時(shí)操作帶來(lái)的不利影響。

那我們?cè)趺床拍軌蚪ň€程安全的環(huán)境呢?

Synchronized關(guān)鍵詞

java提供了多種線程安全的方法,本文主要講解Synchronized關(guān)鍵詞,Synchronized關(guān)鍵詞可以有很多種形式:

  • Instance methods
  • Static methods
  • Code blocks

當(dāng)我們使用synchronized時(shí),java會(huì)在相應(yīng)的對(duì)象上加鎖,從而在同一個(gè)對(duì)象等待鎖的方法都必須順序執(zhí)行,從而保證了線程的安全。

Synchronized Instance Methods

Synchronized關(guān)鍵詞可以放在實(shí)例方法的前面:

public synchronized void synchronisedCalculate() {setSum(getSum() + 1);}

看下調(diào)用結(jié)果:

@Test public void givenMultiThread_whenMethodSync() {ExecutorService service = Executors.newFixedThreadPool(3);SynchronizedMethods method = new SynchronizedMethods();IntStream.range(0, 1000).forEach(count -> service.submit(method::synchronisedCalculate));service.awaitTermination(1000, TimeUnit.MILLISECONDS);assertEquals(1000, method.getSum()); }

這里synchronized將會(huì)鎖住該方法的實(shí)例對(duì)象,多個(gè)線程中只有獲得該實(shí)例對(duì)象鎖的線程才能夠執(zhí)行。

Synchronized Static Methods

Synchronized關(guān)鍵詞也可以用在static方法前面:

public static synchronized void syncStaticCalculate() {staticSum = staticSum + 1;}

Synchronized放在static方法前面和實(shí)例方法前面鎖住的對(duì)象不同。放在static方法前面鎖住的對(duì)象是這個(gè)Class本身,因?yàn)橐粋€(gè)Class在JVM中只會(huì)存在一個(gè),所以不管有多少該Class的實(shí)例,在同一時(shí)刻只會(huì)有一個(gè)線程可以執(zhí)行該放方法。

@Testpublic void givenMultiThread_whenStaticSyncMethod() throws InterruptedException {ExecutorService service = Executors.newCachedThreadPool();IntStream.range(0, 1000).forEach(count ->service.submit(SynchronizedMethods::syncStaticCalculate));service.shutdown();service.awaitTermination(100, TimeUnit.MILLISECONDS);assertEquals(1000, SynchronizedMethods.staticSum);}

Synchronized Blocks

有時(shí)候,我們可能不需要Synchronize整個(gè)方法,而是同步其中的一部分,這時(shí)候,我們可以使用Synchronized Blocks:

public void performSynchronizedTask() {synchronized (this) {setSum(getSum() + 1);}}

我們看下怎么測(cè)試:

@Testpublic void givenMultiThread_whenBlockSync() throws InterruptedException {ExecutorService service = Executors.newFixedThreadPool(3);SynchronizedMethods synchronizedBlocks = new SynchronizedMethods();IntStream.range(0, 1000).forEach(count ->service.submit(synchronizedBlocks::performSynchronizedTask));service.shutdown();service.awaitTermination(100, TimeUnit.MILLISECONDS);assertEquals(1000, synchronizedBlocks.getSum());}

上面我們同步的是實(shí)例,如果在靜態(tài)方法中,我們也可以同步class:

public static void performStaticSyncTask(){synchronized (SynchronizedMethods.class) {staticSum = staticSum + 1;}}

我們看下怎么測(cè)試:

@Testpublic void givenMultiThread_whenStaticSyncBlock() throws InterruptedException {ExecutorService service = Executors.newCachedThreadPool();IntStream.range(0, 1000).forEach(count ->service.submit(SynchronizedMethods::performStaticSyncTask));service.shutdown();service.awaitTermination(100, TimeUnit.MILLISECONDS);assertEquals(1000, SynchronizedMethods.staticSum);}

第三章 java中的Volatile關(guān)鍵字使用

在本文中,我們會(huì)介紹java中的一個(gè)關(guān)鍵字volatile。 volatile的中文意思是易揮發(fā)的,不穩(wěn)定的。那么在java中使用是什么意思呢?

我們知道,在java中,每個(gè)線程都會(huì)有個(gè)自己的內(nèi)存空間,我們稱之為working memory。這個(gè)空間會(huì)緩存一些變量的信息,從而提升程序的性能。當(dāng)執(zhí)行完某個(gè)操作之后,thread會(huì)將更新后的變量更新到主緩存中,以供其他線程讀寫(xiě)。

因?yàn)樽兞看嬖趙orking memory和main memory兩個(gè)地方,那么就有可能出現(xiàn)不一致的情況。 那么我們就可以使用Volatile關(guān)鍵字來(lái)強(qiáng)制將變量直接寫(xiě)到main memory,從而保證了不同線程讀寫(xiě)到的是同一個(gè)變量。

什么時(shí)候使用volatile

那么我們什么時(shí)候使用volatile呢?當(dāng)一個(gè)線程需要立刻讀取到另外一個(gè)線程修改的變量值的時(shí)候,我們就可以使用volatile。我們來(lái)舉個(gè)例子:

public class VolatileWithoutUsage {private int count = 0;public void incrementCount() {count++;}public int getCount() {return count;} }

這個(gè)類定義了一個(gè)incrementCount()方法,會(huì)去更新count值,我們接下來(lái)在多線程環(huán)境中去測(cè)試這個(gè)方法:

@Testpublic void testWithoutVolatile() throws InterruptedException {ExecutorService service= Executors.newFixedThreadPool(3);VolatileWithoutUsage volatileWithoutUsage=new VolatileWithoutUsage();IntStream.range(0,1000).forEach(count ->service.submit(volatileWithoutUsage::incrementCount) );service.shutdown();service.awaitTermination(1000, TimeUnit.MILLISECONDS);assertEquals(1000,volatileWithoutUsage.getCount() );}

運(yùn)行一下,我們會(huì)發(fā)現(xiàn)結(jié)果是不等于1000的。

java.lang.AssertionError: Expected :1000 Actual :999

這是因?yàn)槎嗑€程去更新同一個(gè)變量,我們?cè)谏掀恼乱蔡岬搅?#xff0c;這種情況可以通過(guò)加Synchronized關(guān)鍵字來(lái)解決。

那么是不是我們加上Volatile關(guān)鍵字后就可以解決這個(gè)問(wèn)題了呢?

public class VolatileFalseUsage {private volatile int count = 0;public void incrementCount() {count++;}public int getCount() {return count;}}

上面的類中,我們加上了關(guān)鍵字Volatile,我們?cè)贉y(cè)試一下:

@Testpublic void testWithVolatileFalseUsage() throws InterruptedException {ExecutorService service= Executors.newFixedThreadPool(3);VolatileFalseUsage volatileFalseUsage=new VolatileFalseUsage();IntStream.range(0,1000).forEach(count ->service.submit(volatileFalseUsage::incrementCount) );service.shutdown();service.awaitTermination(5000, TimeUnit.MILLISECONDS);assertEquals(1000,volatileFalseUsage.getCount() );}

運(yùn)行一下,我們會(huì)發(fā)現(xiàn)結(jié)果還是錯(cuò)誤的:

java.lang.AssertionError: Expected :1000 Actual :992 ~~為什么呢? 我們先來(lái)看下count++的操作,count++可以分解為三步操作,1. 讀取count的值,2.給count加1, 3.將count寫(xiě)回內(nèi)存。添加Volatile關(guān)鍵詞只能夠保證count的變化立馬可見(jiàn),而不能保證1,2,3這三個(gè)步驟的總體原子性。 要實(shí)現(xiàn)總體的原子性還是需要用到類似Synchronized的關(guān)鍵字。下面看下正確的用法:~~~java public class VolatileTrueUsage {private volatile int count = 0;public void setCount(int number) {count=number;}public int getCount() {return count;} } @Testpublic void testWithVolatileTrueUsage() throws InterruptedException {VolatileTrueUsage volatileTrueUsage=new VolatileTrueUsage();Thread threadA = new Thread(()->volatileTrueUsage.setCount(10));threadA.start();Thread.sleep(100);Thread reader = new Thread(() -> {int valueReadByThread = volatileTrueUsage.getCount();assertEquals(10, valueReadByThread);});reader.start();}

Happens-Before

從java5之后,volatile提供了一個(gè)Happens-Before的功能。Happens-Before 是指當(dāng)volatile進(jìn)行寫(xiě)回主內(nèi)存的操作時(shí),會(huì)將之前的非volatile的操作一并寫(xiě)回主內(nèi)存。

public class VolatileHappenBeforeUsage {int a = 0;volatile boolean flag = false;public void writer() {a = 1; // 1 線程A修改共享變量flag = true; // 2 線程A寫(xiě)volatile變量} }

上面的例子中,a是一個(gè)非volatile變量,flag是一個(gè)volatile變量,但是由于happens-before的特性,a 將會(huì)表現(xiàn)的和volatile一樣。

第四章 wait和sleep的區(qū)別

在本篇文章中,我們將會(huì)討論一下java中wait()和sleep()方法的區(qū)別。并討論一下怎么使用這兩個(gè)方法。

Wait和sleep的區(qū)別

wait() 是Object中定義的native方法:

public final native void wait(long timeout) throws InterruptedException;

所以每一個(gè)類的實(shí)例都可以調(diào)用這個(gè)方法。wait()只能在synchronized block中調(diào)用。它會(huì)釋放synchronized時(shí)加在object上的鎖。

sleep()是定義Thread中的native靜態(tài)類方法:

public static native void sleep(long millis) throws InterruptedException;

所以Thread.sleep()可以在任何情況下調(diào)用。Thread.sleep()將會(huì)暫停當(dāng)前線程,并且不會(huì)釋放任何鎖資源。

我們先看一下一個(gè)簡(jiǎn)單的wait使用:

@Slf4j public class WaitUsage {private static Object LOCK = new Object();public static void WaitExample() throws InterruptedException {synchronized (LOCK) {LOCK.wait(1000);log.info("Object '" + LOCK + "' is woken after" +" waiting for 1 second");}} }

再看一下sleep的使用:

@Slf4j public class SleepUsage {public static void sleepExample() throws InterruptedException {Thread.sleep(1000);log.info("Thread '" + Thread.currentThread().getName() +"' is woken after sleeping for 1 second");} }

喚醒wait和sleep

sleep()方法自帶sleep時(shí)間,時(shí)間過(guò)后,Thread會(huì)自動(dòng)被喚醒。
或者可以通過(guò)調(diào)用interrupt()方法來(lái)中斷。

相比而言wait的喚醒會(huì)比較復(fù)雜,我們需要調(diào)用notify() 和 notifyAll()方法來(lái)喚醒等待在特定wait object上的線程。

notify()會(huì)根據(jù)線程調(diào)度的機(jī)制選擇一個(gè)線程來(lái)喚醒,而notifyAll()會(huì)喚醒所有等待的線程,由這些線程重新?tīng)?zhēng)奪資源鎖。

wait,notity通常用在生產(chǎn)者和消費(fèi)者情形,我們看下怎么使用:

@Slf4j public class WaitNotifyUsage {private int count =0;public void produceMessage() throws InterruptedException {while(true) {synchronized (this) {while (count == 5) {log.info("count == 5 , wait ....");wait();}count++;log.info("produce count {}", count);notify();}}}public void consumeMessage() throws InterruptedException {while (true) {synchronized (this) {while (count == 0) {log.info("count == 0, wait ...");wait();}log.info("consume count {}", count);count--;notify();}}} }

看下怎么調(diào)用:

@Testpublic void testWaitNotifyUsage() throws InterruptedException{WaitNotifyUsage waitNotifyUsage=new WaitNotifyUsage();ExecutorService executorService=Executors.newFixedThreadPool(4);executorService.submit(()-> {try {waitNotifyUsage.produceMessage();} catch (InterruptedException e) {e.printStackTrace();}});executorService.submit(()-> {try {waitNotifyUsage.consumeMessage();} catch (InterruptedException e) {e.printStackTrace();}});Thread.sleep(50000);}

第五章 java中Future的使用

Future是java 1.5引入的一個(gè)interface,可以方便的用于異步結(jié)果的獲取。 本文將會(huì)通過(guò)具體的例子講解如何使用Future。

創(chuàng)建Future

正如上面所說(shuō),Future代表的是異步執(zhí)行的結(jié)果,意思是當(dāng)異步執(zhí)行結(jié)束之后,返回的結(jié)果將會(huì)保存在Future中。

那么我們什么時(shí)候會(huì)用到Future呢? 一般來(lái)說(shuō),當(dāng)我們執(zhí)行一個(gè)長(zhǎng)時(shí)間運(yùn)行的任務(wù)時(shí),使用Future就可以讓我們暫時(shí)去處理其他的任務(wù),等長(zhǎng)任務(wù)執(zhí)行完畢再返回其結(jié)果。

經(jīng)常會(huì)使用到Future的場(chǎng)景有:1. 計(jì)算密集場(chǎng)景。2. 處理大數(shù)據(jù)量。3. 遠(yuǎn)程方法調(diào)用等。

接下來(lái)我們將會(huì)使用ExecutorService來(lái)創(chuàng)建一個(gè)Future。

<T> Future<T> submit(Callable<T> task);

上面是ExecutorService中定義的一個(gè)submit方法,它接收一個(gè)Callable參數(shù),并返回一個(gè)Future。

我們用一個(gè)線程來(lái)計(jì)算一個(gè)平方運(yùn)算:

private ExecutorService executor= Executors.newSingleThreadExecutor();public Future<Integer> calculate(Integer input) {return executor.submit(() -> {System.out.println("Calculating..."+ input);Thread.sleep(1000);return input * input;});}

submit需要接受一個(gè)Callable參數(shù),Callable需要實(shí)現(xiàn)一個(gè)call方法,并返回結(jié)果。這里我們使用lamaba表達(dá)式來(lái)簡(jiǎn)化這一個(gè)流程。

從Future獲取結(jié)果

上面我們創(chuàng)建好了Future,接下來(lái)我們看一下怎么獲取到Future的值。

FutureUsage futureUsage=new FutureUsage();Future<Integer> futureOne = futureUsage.calculate(20);while(!futureOne.isDone()) {System.out.println("Calculating...");Thread.sleep(300);}Integer result = futureOne.get();

首先我們通過(guò)Future.isDone() 來(lái)判斷這個(gè)異步操作是否執(zhí)行完畢,如果完畢我們就可以直接調(diào)用futureOne.get()來(lái)獲得Futre的結(jié)果。

這里futureOne.get()是一個(gè)阻塞操作,會(huì)一直等待異步執(zhí)行完畢才返回結(jié)果。

如果我們不想等待,future提供了一個(gè)帶時(shí)間的方法:

Integer result = futureOne.get(500, TimeUnit.MILLISECONDS);

如果在等待時(shí)間結(jié)束的時(shí)候,Future還有返回,則會(huì)拋出一個(gè)TimeoutException。

取消Future

如果我們提交了一個(gè)異步程序,但是想取消它, 則可以這樣:

uture<Integer> futureTwo = futureUsage.calculate(4);boolean canceled = futureTwo.cancel(true);

Future.cancel(boolean) 傳入一個(gè)boolean參數(shù),來(lái)選擇是否中斷正在運(yùn)行的task。

如果我們cancel之后,再次調(diào)用get()方法,則會(huì)拋出CancellationException。

多線程環(huán)境中運(yùn)行

如果有兩個(gè)計(jì)算任務(wù),先看下在單線程下運(yùn)行的結(jié)果。

Future<Integer> future1 = futureUsage.calculate(10);Future<Integer> future2 = futureUsage.calculate(100);while (!(future1.isDone() && future2.isDone())) {System.out.println(String.format("future1 is %s and future2 is %s",future1.isDone() ? "done" : "not done",future2.isDone() ? "done" : "not done"));Thread.sleep(300);}Integer result1 = future1.get();Integer result2 = future2.get();System.out.println(result1 + " and " + result2);

因?yàn)槲覀兺ㄟ^(guò)Executors.newSingleThreadExecutor()來(lái)創(chuàng)建的單線程池。所以運(yùn)行結(jié)果如下:

Calculating...10 future1 is not done and future2 is not done future1 is not done and future2 is not done future1 is not done and future2 is not done future1 is not done and future2 is not done Calculating...100 future1 is done and future2 is not done future1 is done and future2 is not done future1 is done and future2 is not done 100 and 10000

如果我們使用Executors.newFixedThreadPool(2)來(lái)創(chuàng)建一個(gè)多線程池,則可以得到如下的結(jié)果:

calculating...10 calculating...100 future1 is not done and future2 is not done future1 is not done and future2 is not done future1 is not done and future2 is not done future1 is not done and future2 is not done 100 and 10000

第六章 java并發(fā)中ExecutorService的使用

ExecutorService是java中的一個(gè)異步執(zhí)行的框架,通過(guò)使用ExecutorService可以方便的創(chuàng)建多線程執(zhí)行環(huán)境。

本文將會(huì)詳細(xì)的講解ExecutorService的具體使用。

創(chuàng)建ExecutorService

通常來(lái)說(shuō)有兩種方法來(lái)創(chuàng)建ExecutorService。

第一種方式是使用Executors中的工廠類方法,例如:

ExecutorService executor = Executors.newFixedThreadPool(10);

除了newFixedThreadPool方法之外,Executors還包含了很多創(chuàng)建ExecutorService的方法。

第二種方法是直接創(chuàng)建一個(gè)ExecutorService, 因?yàn)镋xecutorService是一個(gè)interface,我們需要實(shí)例化ExecutorService的一個(gè)實(shí)現(xiàn)。

這里我們使用ThreadPoolExecutor來(lái)舉例:

ExecutorService executorService =new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());

為ExecutorService分配Tasks

ExecutorService可以執(zhí)行Runnable和Callable的task。其中Runnable是沒(méi)有返回值的,而Callable是有返回值的。我們分別看一下兩種情況的使用:

Runnable runnableTask = () -> {try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace();} };Callable<String> callableTask = () -> {TimeUnit.MILLISECONDS.sleep(300);return "Task's execution"; };

將task分配給ExecutorService,可以通過(guò)調(diào)用xecute(), submit(), invokeAny(), invokeAll()這幾個(gè)方法來(lái)實(shí)現(xiàn)。

execute() 返回值是void,他用來(lái)提交一個(gè)Runnable task。

executorService.execute(runnableTask);

submit() 返回值是Future,它可以提交Runnable task, 也可以提交Callable task。 提交Runnable的有兩個(gè)方法:

<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);

第一個(gè)方法在返回傳入的result。第二個(gè)方法返回null。

再看一下callable的使用:

Future<String> future = executorService.submit(callableTask);

invokeAny() 將一個(gè)task列表傳遞給executorService,并返回其中的一個(gè)成功返回的結(jié)果。

String result = executorService.invokeAny(callableTasks);

invokeAll() 將一個(gè)task列表傳遞給executorService,并返回所有成功執(zhí)行的結(jié)果:

List<Future<String>> futures = executorService.invokeAll(callableTasks);

關(guān)閉ExecutorService

如果ExecutorService中的任務(wù)運(yùn)行完畢之后,ExecutorService不會(huì)自動(dòng)關(guān)閉。它會(huì)等待接收新的任務(wù)。如果需要關(guān)閉ExecutorService, 我們需要調(diào)用shutdown() 或者 shutdownNow() 方法。

shutdown() 會(huì)立即銷毀ExecutorService,它會(huì)讓ExecutorServic停止接收新的任務(wù),并等待現(xiàn)有任務(wù)全部執(zhí)行完畢再銷毀。

executorService.shutdown();

shutdownNow()并不保證所有的任務(wù)都被執(zhí)行完畢,它會(huì)返回一個(gè)未執(zhí)行任務(wù)的列表:

List<Runnable> notExecutedTasks = executorService.shutdownNow();

oracle推薦的最佳關(guān)閉方法是和awaitTermination一起使用:

executorService.shutdown();try {if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {executorService.shutdownNow();}} catch (InterruptedException e) {executorService.shutdownNow();}

先停止接收任務(wù),然后再等待一定的時(shí)間讓所有的任務(wù)都執(zhí)行完畢,如果超過(guò)了給定的時(shí)間,則立刻結(jié)束任務(wù)。

Future

submit() 和 invokeAll() 都會(huì)返回Future對(duì)象。之前的文章我們已經(jīng)詳細(xì)講過(guò)了Future。 這里就只列舉一下怎么使用:

Future<String> future = executorService.submit(callableTask); String result = null; try {result = future.get(); } catch (InterruptedException | ExecutionException e) {e.printStackTrace(); }

ScheduledExecutorService

ScheduledExecutorService為我們提供了定時(shí)執(zhí)行任務(wù)的機(jī)制。

我們這樣創(chuàng)建ScheduledExecutorService:

ScheduledExecutorService executorService= Executors.newSingleThreadScheduledExecutor();

executorService的schedule方法,可以傳入Runnable也可以傳入Callable:

Future<String> future = executorService.schedule(() -> {// ...return "Hello world";}, 1, TimeUnit.SECONDS);ScheduledFuture<?> scheduledFuture = executorService.schedule(() -> {// ...}, 1, TimeUnit.SECONDS);

還有兩個(gè)比較相近的方法:

scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit )scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit )

兩者的區(qū)別是前者的period是以任務(wù)開(kāi)始時(shí)間來(lái)計(jì)算的,后者是以任務(wù)結(jié)束時(shí)間來(lái)計(jì)算。

ExecutorService和 Fork/Join

java 7 引入了Fork/Join框架。 那么兩者的區(qū)別是什么呢?

ExecutorService可以由用戶來(lái)自己控制生成的線程,提供了對(duì)線程更加細(xì)粒度的控制。而Fork/Join則是為了讓任務(wù)更加快速的執(zhí)行完畢。

第七章 java中Runnable和Callable的區(qū)別

在java的多線程開(kāi)發(fā)中Runnable一直以來(lái)都是多線程的核心,而Callable是java1.5添加進(jìn)來(lái)的一個(gè)增強(qiáng)版本。

本文我們會(huì)詳細(xì)探討Runnable和Callable的區(qū)別。

運(yùn)行機(jī)制

首先看下Runnable和Callable的接口定義:

@FunctionalInterface public interface Runnable {/*** When an object implementing interface <code>Runnable</code> is used* to create a thread, starting the thread causes the object's* <code>run</code> method to be called in that separately executing* thread.* <p>* The general contract of the method <code>run</code> is that it may* take any action whatsoever.** @see java.lang.Thread#run()*/public abstract void run(); } @FunctionalInterface public interface Callable<V> {/*** Computes a result, or throws an exception if unable to do so.** @return computed result* @throws Exception if unable to compute a result*/V call() throws Exception; }

Runnable需要實(shí)現(xiàn)run()方法,Callable需要實(shí)現(xiàn)call()方法。

我們都知道要自定義一個(gè)Thread有兩種方法,一是繼承Thread,而是實(shí)現(xiàn)Runnable接口,這是因?yàn)門(mén)hread本身就是一個(gè)Runnable的實(shí)現(xiàn):

class Thread implements Runnable {/* Make sure registerNatives is the first thing <clinit> does. */private static native void registerNatives();static {registerNatives();}...

所以Runnable可以通過(guò)Runnable和之前我們介紹的ExecutorService 來(lái)執(zhí)行,而Callable則只能通過(guò)ExecutorService 來(lái)執(zhí)行。

返回值的不同

根據(jù)上面兩個(gè)接口的定義,Runnable是不返還值的,而Callable可以返回值。

如果我們都通過(guò)ExecutorService來(lái)提交,看看有什么不同:

  • 使用runnable
public void executeTask() {ExecutorService executorService = Executors.newSingleThreadExecutor();Future future = executorService.submit(()->log.info("in runnable!!!!"));executorService.shutdown();}
  • 使用callable
public void executeTask() {ExecutorService executorService = Executors.newSingleThreadExecutor();Future future = executorService.submit(()->{log.info("in callable!!!!");return "callable";});executorService.shutdown();}

雖然我們都返回了Future,但是runnable的情況下Future將不包含任何值。

Exception處理

Runnable的run()方法定義沒(méi)有拋出任何異常,所以任何的Checked Exception都需要在run()實(shí)現(xiàn)方法中自行處理。

Callable的Call()方法拋出了throws Exception,所以可以在call()方法的外部,捕捉到Checked Exception。我們看下Callable中異常的處理。

public void executeTaskWithException(){ExecutorService executorService = Executors.newSingleThreadExecutor();Future future = executorService.submit(()->{log.info("in callable!!!!");throw new CustomerException("a customer Exception");});try {Object object= future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();e.getCause();}executorService.shutdown();}

上面的例子中,我們?cè)贑allable中拋出了一個(gè)自定義的CustomerException。

這個(gè)異常會(huì)被包含在返回的Future中。當(dāng)我們調(diào)用future.get()方法時(shí),就會(huì)拋出ExecutionException,通過(guò)e.getCause(),就可以獲取到包含在里面的具體異常信息。

第八章 ThreadLocal的使用

ThreadLocal主要用來(lái)為當(dāng)前線程存儲(chǔ)數(shù)據(jù),這個(gè)數(shù)據(jù)只有當(dāng)前線程可以訪問(wèn)。

在定義ThreadLocal的時(shí)候,我們可以同時(shí)定義存儲(chǔ)在ThreadLocal中的特定類型的對(duì)象。

ThreadLocal<Integer> threadLocalValue = new ThreadLocal<>();

上面我們定義了一個(gè)存儲(chǔ)Integer的ThreadLocal對(duì)象。

要存儲(chǔ)和獲取ThreadLocal中的對(duì)象也非常簡(jiǎn)單,使用get()和set()即可:

threadLocalValue.set(1); Integer result = threadLocalValue.get();

我可以將ThreadLocal看成是一個(gè)map,而當(dāng)前的線程就是map中的key。

除了new一個(gè)ThreadLocal對(duì)象,我們還可以通過(guò):

public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {return new SuppliedThreadLocal<>(supplier);}

ThreadLocal提供的靜態(tài)方法withInitial來(lái)初始化一個(gè)ThreadLocal。

ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 1);

withInitial需要一個(gè)Supplier對(duì)象,通過(guò)調(diào)用Supplier的get()方法獲取到初始值。

要想刪除ThreadLocal中的存儲(chǔ)數(shù)據(jù),可以調(diào)用:

threadLocal.remove();

下面我通過(guò)兩個(gè)例子的對(duì)比,來(lái)看一下使用ThreadLocal的好處。

在實(shí)際的應(yīng)用中,我們通常會(huì)需要為不同的用戶請(qǐng)求存儲(chǔ)不同的用戶信息,一般來(lái)說(shuō)我們需要構(gòu)建一個(gè)全局的Map,來(lái)根據(jù)不同的用戶ID,來(lái)存儲(chǔ)不同的用戶信息,方便在后面獲取。

在Map中存儲(chǔ)用戶數(shù)據(jù)

我們先看下如果使用全局的Map該怎么用:

public class SharedMapWithUserContext implements Runnable {public static Map<Integer, Context> userContextPerUserId= new ConcurrentHashMap<>();private Integer userId;private UserRepository userRepository = new UserRepository();public SharedMapWithUserContext(int i) {this.userId=i;}@Overridepublic void run() {String userName = userRepository.getUserNameForUserId(userId);userContextPerUserId.put(userId, new Context(userName));} }

這里我們定義了一個(gè)static的Map來(lái)存取用戶信息。

再看一下怎么使用:

@Testpublic void testWithMap(){SharedMapWithUserContext firstUser = new SharedMapWithUserContext(1);SharedMapWithUserContext secondUser = new SharedMapWithUserContext(2);new Thread(firstUser).start();new Thread(secondUser).start();assertEquals(SharedMapWithUserContext.userContextPerUserId.size(), 2);}

在ThreadLocal中存儲(chǔ)用戶數(shù)據(jù)

如果我們要在ThreadLocal中使用可以這樣:

public class ThreadLocalWithUserContext implements Runnable {private static ThreadLocal<Context> userContext= new ThreadLocal<>();private Integer userId;private UserRepository userRepository = new UserRepository();public ThreadLocalWithUserContext(int i) {this.userId=i;}@Overridepublic void run() {String userName = userRepository.getUserNameForUserId(userId);userContext.set(new Context(userName));System.out.println("thread context for given userId: "+ userId + " is: " + userContext.get());}}

測(cè)試代碼如下:

public class ThreadLocalWithUserContextTest {@Testpublic void testWithThreadLocal(){ThreadLocalWithUserContext firstUser= new ThreadLocalWithUserContext(1);ThreadLocalWithUserContext secondUser= new ThreadLocalWithUserContext(2);new Thread(firstUser).start();new Thread(secondUser).start();} }

運(yùn)行之后,我們可以得到下面的結(jié)果:

thread context for given userId: 1 is: com.flydean.Context@411734d4 thread context for given userId: 2 is: com.flydean.Context@1e9b6cc

不同的用戶信息被存儲(chǔ)在不同的線程環(huán)境中。

注意,我們使用ThreadLocal的時(shí)候,一定是我們可以自由的控制所創(chuàng)建的線程。如果在ExecutorService環(huán)境下,就最好不要使用ThreadLocal,因?yàn)樵贓xecutorService中,線程是不可控的。

第九章 java中線程的生命周期

線程是java中繞不過(guò)去的一個(gè)話題, 今天本文將會(huì)詳細(xì)講解java中線程的生命周期,希望可以給大家一些啟發(fā)。

java中Thread的狀態(tài)

java中Thread有6種狀態(tài),分別是:

  • NEW - 新創(chuàng)建的Thread,還沒(méi)有開(kāi)始執(zhí)行
  • RUNNABLE - 可運(yùn)行狀態(tài)的Thread,包括準(zhǔn)備運(yùn)行和正在運(yùn)行的。
  • BLOCKED - 正在等待資源鎖的線程
  • WAITING - 正在無(wú)限期等待其他線程來(lái)執(zhí)行某個(gè)特定操作
  • TIMED_WAITING - 在一定的時(shí)間內(nèi)等待其他線程來(lái)執(zhí)行某個(gè)特定操作
  • TERMINATED - 線程執(zhí)行完畢
  • 我們可以用一個(gè)圖來(lái)直觀的表示:

    JDK代碼中的定義如下:

    public enum State {/*** Thread state for a thread which has not yet started.*/NEW,/*** Thread state for a runnable thread. A thread in the runnable* state is executing in the Java virtual machine but it may* be waiting for other resources from the operating system* such as processor.*/RUNNABLE,/*** Thread state for a thread blocked waiting for a monitor lock.* A thread in the blocked state is waiting for a monitor lock* to enter a synchronized block/method or* reenter a synchronized block/method after calling* {@link Object#wait() Object.wait}.*/BLOCKED,/*** Thread state for a waiting thread.* A thread is in the waiting state due to calling one of the* following methods:* <ul>* <li>{@link Object#wait() Object.wait} with no timeout</li>* <li>{@link #join() Thread.join} with no timeout</li>* <li>{@link LockSupport#park() LockSupport.park}</li>* </ul>** <p>A thread in the waiting state is waiting for another thread to* perform a particular action.** For example, a thread that has called <tt>Object.wait()</tt>* on an object is waiting for another thread to call* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on* that object. A thread that has called <tt>Thread.join()</tt>* is waiting for a specified thread to terminate.*/WAITING,/*** Thread state for a waiting thread with a specified waiting time.* A thread is in the timed waiting state due to calling one of* the following methods with a specified positive waiting time:* <ul>* <li>{@link #sleep Thread.sleep}</li>* <li>{@link Object#wait(long) Object.wait} with timeout</li>* <li>{@link #join(long) Thread.join} with timeout</li>* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>* </ul>*/TIMED_WAITING,/*** Thread state for a terminated thread.* The thread has completed execution.*/TERMINATED;}

    NEW

    NEW 表示線程創(chuàng)建了,但是還沒(méi)有開(kāi)始執(zhí)行。我們看一個(gè)NEW的例子:

    public class NewThread implements Runnable{public static void main(String[] args) {Runnable runnable = new NewThread();Thread t = new Thread(runnable);log.info(t.getState().toString());}@Overridepublic void run() {} }

    上面的代碼將會(huì)輸出:

    NEW

    Runnable

    Runnable表示線程正在可執(zhí)行狀態(tài)。包括正在運(yùn)行和準(zhǔn)備運(yùn)行兩種。

    為什么這兩種都叫做Runnable呢?我們知道在多任務(wù)環(huán)境中,CPU的個(gè)數(shù)是有限的,所以任務(wù)都是輪循占有CPU來(lái)處理的,JVM中的線程調(diào)度器會(huì)為每個(gè)線程分配特定的執(zhí)行時(shí)間,當(dāng)執(zhí)行時(shí)間結(jié)束后,線程調(diào)度器將會(huì)釋放CPU,以供其他的Runnable線程執(zhí)行。

    我們看一個(gè)Runnable的例子:

    public class RunnableThread implements Runnable {@Overridepublic void run() {}public static void main(String[] args) {Runnable runnable = new RunnableThread();Thread t = new Thread(runnable);t.start();log.info(t.getState().toString());} }

    上面的代碼將會(huì)輸出:

    RUNNABLE

    BLOCKED

    BLOCKED表示線程正在等待資源鎖,而目前該資源正在被其他線程占有。

    我們舉個(gè)例子:

    public class BlockThread implements Runnable {@Overridepublic void run() {loopResource();}public static synchronized void loopResource() {while(true) {//無(wú)限循環(huán)}}public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(new BlockThread());Thread t2 = new Thread(new BlockThread());t1.start();t2.start();Thread.sleep(1000);log.info(t1.getState().toString());log.info(t2.getState().toString());System.exit(0);} }

    上面的例子中,由于t1是無(wú)限循環(huán),將會(huì)一直占有資源鎖,導(dǎo)致t2無(wú)法獲取資源鎖,從而位于BLOCKED狀態(tài)。

    我們會(huì)得到如下結(jié)果:

    12:40:11.710 [main] INFO com.flydean.BlockThread - RUNNABLE 12:40:11.713 [main] INFO com.flydean.BlockThread - BLOCKED

    WAITING

    WAITING 狀態(tài)表示線程正在等待其他的線程執(zhí)行特定的操作。有三種方法可以導(dǎo)致線程處于WAITTING狀態(tài):

  • object.wait()
  • thread.join()
  • LockSupport.park()
  • 其中1,2方法不需要傳入時(shí)間參數(shù)。

    我們看下使用的例子:

    public class WaitThread implements Runnable{public static Thread t1;@Overridepublic void run() {Thread t2 = new Thread(()->{try {Thread.sleep(10000);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("Thread interrupted", e);}log.info("t1"+t1.getState().toString());});t2.start();try {t2.join();} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("Thread interrupted", e);}log.info("t2"+t2.getState().toString());}public static void main(String[] args) {t1 = new Thread(new WaitThread());t1.start();} }

    在這個(gè)例子中,我們調(diào)用的t2.join(),這會(huì)使調(diào)用它的t1線程處于WAITTING狀態(tài)。

    我們看下輸出結(jié)果:

    12:44:12.958 [Thread-1] INFO com.flydean.WaitThread - t1 WAITING 12:44:12.964 [Thread-0] INFO com.flydean.WaitThread - t2 TERMINATED

    TIMED_WAITING

    TIMED_WAITING狀態(tài)表示在一個(gè)有限的時(shí)間內(nèi)等待其他線程執(zhí)行特定的某些操作。

    java中有5中方式來(lái)達(dá)到這種狀態(tài):

  • thread.sleep(long millis)
  • wait(int timeout) 或者 wait(int timeout, int nanos)
  • thread.join(long millis)
  • LockSupport.parkNanos
  • LockSupport.parkUntil
  • 我們舉個(gè)例子:

    public class TimedWaitThread implements Runnable{@Overridepublic void run() {try {Thread.sleep(5000);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("Thread interrupted", e);}}public static void main(String[] args) throws InterruptedException {TimedWaitThread obj1 = new TimedWaitThread();Thread t1 = new Thread(obj1);t1.start();// The following sleep will give enough time for ThreadScheduler// to start processing of thread t1Thread.sleep(1000);log.info(t1.getState().toString());} }

    上面的例子中我們調(diào)用了Thread.sleep(5000)來(lái)讓線程處于TIMED_WAITING狀態(tài)。

    看下輸出:

    12:58:02.706 [main] INFO com.flydean.TimedWaitThread - TIMED_WAITING

    那么問(wèn)題來(lái)了,TIMED_WAITING和WAITTING有什么區(qū)別呢?

    TIMED_WAITING如果在給定的時(shí)間內(nèi)沒(méi)有等到其他線程的特定操作,則會(huì)被喚醒,從而進(jìn)入爭(zhēng)奪資源鎖的隊(duì)列,如果能夠獲取到鎖,則會(huì)變成Runnable狀態(tài),如果獲取不到鎖,則會(huì)變成BLOCKED狀態(tài)。

    TERMINATED

    TERMINATED表示線程已經(jīng)執(zhí)行完畢。我們看下例子:

    public class TerminatedThread implements Runnable{@Overridepublic void run() {}public static void main(String[] args) throws InterruptedException {Thread t1 = new Thread(new TerminatedThread());t1.start();// The following sleep method will give enough time for// thread t1 to completeThread.sleep(1000);log.info(t1.getState().toString());} }

    輸出結(jié)果:

    13:02:38.868 [main] INFO com.flydean.TerminatedThread - TERMINATED

    第十章 java中join的使用

    join()應(yīng)該是我們?cè)趈ava中經(jīng)常會(huì)用到的一個(gè)方法,它主要是將當(dāng)前線程置為WAITTING狀態(tài),然后等待調(diào)用的線程執(zhí)行完畢或被interrupted。

    join()是Thread中定義的方法,我們看下他的定義:

    /*** Waits for this thread to die.** <p> An invocation of this method behaves in exactly the same* way as the invocation** <blockquote>* {@linkplain #join(long) join}{@code (0)}* </blockquote>** @throws InterruptedException* if any thread has interrupted the current thread. The* <i>interrupted status</i> of the current thread is* cleared when this exception is thrown.*/public final void join() throws InterruptedException {join(0);}

    我們看下join是怎么使用的,通常我們需要在線程A中調(diào)用線程B.join():

    public class JoinThread implements Runnable{public int processingCount = 0;JoinThread(int processingCount) {this.processingCount = processingCount;log.info("Thread Created");}@Overridepublic void run() {log.info("Thread " + Thread.currentThread().getName() + " started");while (processingCount > 0) {try {Thread.sleep(1000);} catch (InterruptedException e) {log.info("Thread " + Thread.currentThread().getName() + " interrupted");}processingCount--;}log.info("Thread " + Thread.currentThread().getName() + " exiting");}@Testpublic void joinTest()throws InterruptedException {Thread t2 = new Thread(new JoinThread(1));t2.start();log.info("Invoking join");t2.join();log.info("Returned from join");log.info("t2 status {}",t2.isAlive());} }

    我們?cè)谥骶€程中調(diào)用了t2.join(),則主線程將會(huì)等待t2執(zhí)行完畢,我們看下輸出結(jié)果:

    06:17:14.775 [main] INFO com.flydean.JoinThread - Thread Created 06:17:14.779 [main] INFO com.flydean.JoinThread - Invoking join 06:17:14.779 [Thread-0] INFO com.flydean.JoinThread - Thread Thread-0 started 06:17:15.783 [Thread-0] INFO com.flydean.JoinThread - Thread Thread-0 exiting 06:17:15.783 [main] INFO com.flydean.JoinThread - Returned from join 06:17:15.783 [main] INFO com.flydean.JoinThread - t2 status false

    當(dāng)線程已經(jīng)執(zhí)行完畢或者還沒(méi)開(kāi)始執(zhí)行的時(shí)候,join()將會(huì)立即返回:

    Thread t1 = new SampleThread(0); t1.join(); //returns immediately

    join還有兩個(gè)帶時(shí)間參數(shù)的方法:

    public final void join(long millis) throws InterruptedException public final void join(long millis,int nanos) throws InterruptedException

    如果在給定的時(shí)間內(nèi)調(diào)用的線程沒(méi)有返回,則主線程將會(huì)繼續(xù)執(zhí)行:

    @Testpublic void testJoinTimeout()throws InterruptedException {Thread t3 = new Thread(new JoinThread(10));t3.start();t3.join(1000);log.info("t3 status {}", t3.isAlive());}

    上面的例子將會(huì)輸出:

    06:30:58.159 [main] INFO com.flydean.JoinThread - Thread Created 06:30:58.163 [Thread-0] INFO com.flydean.JoinThread - Thread Thread-0 started 06:30:59.172 [main] INFO com.flydean.JoinThread - t3 status true

    Join()還有個(gè)happen-before的特性,這就是如果thread t1調(diào)用 t2.join(), 那么當(dāng)t2返回時(shí),所有t2的變動(dòng)都會(huì)t1可見(jiàn)。

    之前我們講volatile關(guān)鍵詞的時(shí)候也提到了這個(gè)happen-before規(guī)則。 我們看下例子:

    @Testpublic void testHappenBefore() throws InterruptedException {JoinThread t4 = new JoinThread(10);t4.start();// not guaranteed to stop even if t4 finishes.do {log.info("inside the loop");Thread.sleep(1000);} while ( t4.processingCount > 0);}

    我們運(yùn)行下,可以看到while循環(huán)一直在進(jìn)行中,即使t4中的變量已經(jīng)變成了0。

    所以如果我們需要在這種情況下使用的話,我們需要用到j(luò)oin(),或者其他的同步機(jī)制。

    第十一章 怎么在java中關(guān)閉一個(gè)thread

    我們經(jīng)常需要在java中用到thread,我們知道thread有一個(gè)start()方法可以開(kāi)啟一個(gè)線程。那么怎么關(guān)閉這個(gè)線程呢?

    有人會(huì)說(shuō)可以用Thread.stop()方法。但是這個(gè)方法已經(jīng)被廢棄了。

    根據(jù)Oracle的官方文檔,Thread.stop是不安全的。因?yàn)檎{(diào)用stop方法的時(shí)候,將會(huì)釋放它獲取的所有監(jiān)視器鎖(通過(guò)傳遞ThreadDeath異常實(shí)現(xiàn))。如果有資源該監(jiān)視器鎖所保護(hù)的話,就可能會(huì)出現(xiàn)數(shù)據(jù)不一致的異常。并且這種異常很難被發(fā)現(xiàn)。 所以現(xiàn)在已經(jīng)不推薦是用Thread.stop方法了。

    那我們還有兩種方式來(lái)關(guān)閉一個(gè)Thread。

  • Flag變量
  • 如果我們有一個(gè)無(wú)法自動(dòng)停止的Thread,我們可以創(chuàng)建一個(gè)條件變量,通過(guò)不斷判斷該變量的值,來(lái)決定是否結(jié)束該線程的運(yùn)行。

    public class KillThread implements Runnable {private Thread worker;private final AtomicBoolean running = new AtomicBoolean(false);private int interval;public KillThread(int sleepInterval) {interval = sleepInterval;}public void start() {worker = new Thread(this);worker.start();}public void stop() {running.set(false);}public void run() {running.set(true);while (running.get()) {try {Thread.sleep(interval);} catch (InterruptedException e){Thread.currentThread().interrupt();log.info("Thread was interrupted, Failed to complete operation");}// do something here}log.info("finished");}public static void main(String[] args) {KillThread killThread= new KillThread(1000);killThread.start();killThread.stop();}}

    上面的例子中,我們通過(guò)定義一個(gè)AtomicBoolean 的原子變量來(lái)存儲(chǔ)Flag標(biāo)志。

    我們將會(huì)在后面的文章中詳細(xì)的講解原子變量。

  • 調(diào)用interrupt()方法
  • 通過(guò)調(diào)用interrupt()方法,將會(huì)中斷正在等待的線程,并拋出InterruptedException異常。

    根據(jù)Oracle的說(shuō)明,如果你想自己處理這個(gè)異常的話,需要reasserts出去,注意,這里是reasserts而不是rethrows,因?yàn)橛行┣闆r下,無(wú)法rethrow這個(gè)異常,我們需要這樣做:

    Thread.currentThread().interrupt();

    這將會(huì)reasserts InterruptedException異常。

    看下我們第二種方法怎么調(diào)用:

    public class KillThread implements Runnable {private Thread worker;private final AtomicBoolean running = new AtomicBoolean(false);private int interval;public KillThread(int sleepInterval) {interval = sleepInterval;}public void start() {worker = new Thread(this);worker.start();}public void interrupt() {running.set(false);worker.interrupt();}public void stop() {running.set(false);}public void run() {running.set(true);while (running.get()) {try {Thread.sleep(interval);} catch (InterruptedException e){Thread.currentThread().interrupt();log.info("Thread was interrupted, Failed to complete operation");}// do something here}log.info("finished");}public static void main(String[] args) {KillThread killThread= new KillThread(1000);killThread.start();killThread.interrupt();} }

    上面的例子中,當(dāng)線程在Sleep中時(shí),調(diào)用了interrupt方法,sleep會(huì)退出,并且拋出InterruptedException異常。

    第十二章 java中的Atomic類

    問(wèn)題背景

    在多線程環(huán)境中,我們最常遇到的問(wèn)題就是變量的值進(jìn)行同步。因?yàn)樽兞啃枰诙嗑€程中進(jìn)行共享,所以我們必須需要采用一定的同步機(jī)制來(lái)進(jìn)行控制。

    通過(guò)之前的文章,我們知道可以采用Lock的機(jī)制,當(dāng)然也包括今天我們講的Atomic類。

    下面我們從兩種方式來(lái)分別介紹。

    Lock

    在之前的文章中,我們也講了同步的問(wèn)題,我們?cè)倩仡櫼幌隆?如果定義了一個(gè)計(jì)數(shù)器如下:

    public class Counter {int counter;public void increment() {counter++;}}

    如果是在單線程環(huán)境中,上面的代碼沒(méi)有任何問(wèn)題。但是如果在多線程環(huán)境中,counter++將會(huì)得到不同的結(jié)果。

    因?yàn)殡m然counter++看起來(lái)是一個(gè)原子操作,但是它實(shí)際上包含了三個(gè)操作:讀數(shù)據(jù),加一,寫(xiě)回?cái)?shù)據(jù)。

    我們之前的文章也講了,如何解決這個(gè)問(wèn)題:

    public class LockCounter {private volatile int counter;public synchronized void increment() {counter++;} }

    通過(guò)加synchronized,保證同一時(shí)間只會(huì)有一個(gè)線程去讀寫(xiě)counter變量。

    通過(guò)volatile,保證所有的數(shù)據(jù)直接操作的主緩存,而不使用線程緩存。

    這樣雖然解決了問(wèn)題,但是性能可能會(huì)受影響,因?yàn)閟ynchronized會(huì)鎖住整個(gè)LockCounter實(shí)例。

    使用Atomic

    通過(guò)引入低級(jí)別的原子化語(yǔ)義命令(比如compare-and-swap (CAS)),從而能在保證效率的同時(shí)保證原子性。

    一個(gè)標(biāo)準(zhǔn)的CAS包含三個(gè)操作:

  • 將要操作的內(nèi)存地址M。
  • 現(xiàn)有的變量A。
  • 新的需要存儲(chǔ)的變量B。
  • CAS將會(huì)先比較A和M中存儲(chǔ)的值是否一致,一致則表示其他線程未對(duì)該變量進(jìn)行修改,則將其替換為B。 否則不做任何操作。

    使用CAS可以不用阻塞其他的線程,但是我們需要自己處理好當(dāng)更新失敗的情況下的業(yè)務(wù)邏輯處理情況。

    Java提供了很多Atomic類,最常用的包括AtomicInteger, AtomicLong, AtomicBoolean, 和 AtomicReference.

    其中的主要方法:

  • get() – 直接中主內(nèi)存中讀取變量的值,類似于volatile變量。
  • set() – 將變量寫(xiě)回主內(nèi)存。類似于volatile變量。
  • lazySet() – 延遲寫(xiě)回主內(nèi)存。一種常用的情景是將引用重置為null的情況。
  • compareAndSet() – 執(zhí)行CAS操作,成功返回true,失敗返回false。
  • weakCompareAndSet() – 比較弱的CAS操作,不同的是它不執(zhí)行happens-before操作,從而不保證能夠讀取到其他變量最新的值。
  • 我們看下怎么用:

    public class AtomicCounter {private final AtomicInteger counter = new AtomicInteger(0);public int getValue() {return counter.get();}public void increment() {while(true) {int existingValue = getValue();int newValue = existingValue + 1;if(counter.compareAndSet(existingValue, newValue)) {return;}}} }

    第十三章 java中interrupt,interrupted和isInterrupted的區(qū)別

    前面的文章我們講到了調(diào)用interrupt()來(lái)停止一個(gè)Thread,本文將會(huì)詳細(xì)講解java中三個(gè)非常相似的方法interrupt,interrupted和isInterrupted。

    isInterrupted

    首先看下最簡(jiǎn)單的isInterrupted方法。isInterrupted是Thread類中的一個(gè)實(shí)例方法:

    public boolean isInterrupted() {return isInterrupted(false);}

    通過(guò)調(diào)用isInterrupted()可以判斷實(shí)例線程是否被中斷。

    它的內(nèi)部調(diào)用了isInterrupted(false)方法:

    /*** Tests if some Thread has been interrupted. The interrupted state* is reset or not based on the value of ClearInterrupted that is* passed.*/private native boolean isInterrupted(boolean ClearInterrupted);

    這個(gè)方法是個(gè)native方法,接收一個(gè)是否清除Interrupted標(biāo)志位的參數(shù)。

    我們可以看到isInterrupted()傳入的參數(shù)是false,這就表示isInterrupted()只會(huì)判斷是否被中斷,而不會(huì)清除中斷狀態(tài)。

    interrupted

    interrupted是Thread中的一個(gè)類方法:

    public static boolean interrupted() {return currentThread().isInterrupted(true);}

    我們可以看到,interrupted()也調(diào)用了isInterrupted(true)方法,不過(guò)它傳遞的參數(shù)是true,表示將會(huì)清除中斷標(biāo)志位。

    注意,因?yàn)閕nterrupted()是一個(gè)類方法,調(diào)用isInterrupted(true)判斷的是當(dāng)前線程是否被中斷。注意這里currentThread()的使用。

    interrupt

    前面兩個(gè)是判斷是否中斷的方法,而interrupt()就是真正觸發(fā)中斷的方法。

    我們先看下interrupt的定義:

    public void interrupt() {if (this != Thread.currentThread())checkAccess();synchronized (blockerLock) {Interruptible b = blocker;if (b != null) {interrupt0(); // Just to set the interrupt flagb.interrupt(this);return;}}interrupt0();}

    從定義我們可以看到interrupt()是一個(gè)實(shí)例方法。

    它的工作要點(diǎn)有下面4點(diǎn):

  • 如果當(dāng)前線程實(shí)例在調(diào)用Object類的wait(),wait(long)或wait(long,int)方法或join(),join(long),join(long,int)方法,或者在該實(shí)例中調(diào)用了Thread.sleep(long)或Thread.sleep(long,int)方法,并且正在阻塞狀態(tài)中時(shí),則其中斷狀態(tài)將被清除,并將收到InterruptedException。

  • 如果此線程在InterruptibleChannel上的I / O操作中處于被阻塞狀態(tài),則該channel將被關(guān)閉,該線程的中斷狀態(tài)將被設(shè)置為true,并且該線程將收到j(luò)ava.nio.channels.ClosedByInterruptException異常。

  • 如果此線程在java.nio.channels.Selector中處于被被阻塞狀態(tài),則將設(shè)置該線程的中斷狀態(tài)為true,并且它將立即從select操作中返回。

  • 如果上面的情況都不成立,則設(shè)置中斷狀態(tài)為true。

  • 我們來(lái)舉個(gè)例子:

    @Slf4j public class InterruptThread extends Thread {@Overridepublic void run() {for (int i = 0; i < 1000; i++) {log.info("i= {}", (i+1));log.info("call inside thread.interrupted(): {}", Thread.interrupted());}}@Testpublic void testInterrupt(){InterruptThread thread=new InterruptThread();thread.start();thread.interrupt();//test isInterruptedlog.info("first call isInterrupted(): {}", thread.isInterrupted());log.info("second call isInterrupted(): {}", thread.isInterrupted());//test interrupted()log.info("first call outside thread.interrupted(): {}", Thread.interrupted());log.info("second call outside thread.interrupted() {}:", Thread.interrupted());log.info("thread is alive : {}",thread.isAlive() );} }

    輸出結(jié)果如下:

    13:07:17.804 [main] INFO com.flydean.InterruptThread - first call isInterrupted(): true 13:07:17.808 [main] INFO com.flydean.InterruptThread - second call isInterrupted(): true13:07:17.808 [Thread-1] INFO com.flydean.InterruptThread - call inside thread.interrupted(): true 13:07:17.808 [Thread-1] INFO com.flydean.InterruptThread - call inside thread.interrupted(): false13:07:17.808 [main] INFO com.flydean.InterruptThread - first call outside thread.interrupted(): false 13:07:17.809 [main] INFO com.flydean.InterruptThread - second call outside thread.interrupted() false

    上面的例子中,兩次調(diào)用thread.isInterrupted()的值都是true。

    在線程內(nèi)部調(diào)用Thread.interrupted(), 只有第一次返回的是ture,后面返回的都是false,這表明第一次被重置了。

    在線程外部,因?yàn)椴](méi)有中斷外部線程,所以返回的值一直都是false。

    總結(jié)

    本文介紹了java并發(fā)系列文章1到14章,因?yàn)槲募拗?#xff0c;剩下的章節(jié)將會(huì)在
    5W字高質(zhì)量java并發(fā)系列詳解教程(下) 進(jìn)行介紹,敬請(qǐng)期待!

    本文的例子https://github.com/ddean2009/learn-java-concurrency/

    本文PDF下載鏈接concurrent-all-in-one.pdf

    歡迎關(guān)注我的公眾號(hào):程序那些事,更多精彩等著您!
    更多內(nèi)容請(qǐng)?jiān)L問(wèn) www.flydean.com

    總結(jié)

    以上是生活随笔為你收集整理的5W字高质量java并发系列详解教程(上)-附PDF下载的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。