深入理解 Kotlin Coroutine (一)
原文鏈接:https://github.com/enbandari/Kotlin-Tutorials
本文主要介紹 Kotlin Coroutine 的基礎 API,有關 Kotlinx.Coroutine 的內容,我們將在下一期給大家介紹。由于本人水平有限,如果大家有什么異議,歡迎直接拋出來跟我討論。
1. 什么是 Coroutine
Coroutine 被翻譯成了“協程”,意思就是要各個子任務協作運行的意思,所以大家一下就明白了它被創造出來是要解決異步問題的。
我們寫 Java 的程序員,對線程更熟悉一些。線程是比進程更小一級的運行單位,它的調度由操作系統來完成,所以我們只管 new Thread 和 start,至于什么時候 run,什么時候 run 完,我們都沒辦法預見。
Thread t = new Thread(task); t.start();盡管有諸多不可控的因素,不過我們可以肯定的是起了一個新的線程并啟動它之后,當前線程并不會受到阻塞。如果大家再往深處想想,CPU 在任意時刻運行什么進程及其線程,是操作系統決定的,但歸根結底一個單線程的 CPU 在任一時刻只能運行一個任務。
那么協程呢?協程的調度是應用層完成的,比如我們說 Lua 支持協程,那么各個協程如何運行,這一調度工作實際上是 Lua 自己的虛擬機來完成的。這個調度與線程調度有著比較大的差別,線程調度是搶占式調度,很有可能線程 A 運行得美滋滋的,線程 B 突然把 CPU 搶過來,跟 A 說“你給我下去吧你”,于是線程 A 只能干瞪眼沒辦法;而協程的調度是非搶占式的,目前常見的各種支持協程的語言實現中都有 yield 關鍵字,它有“妥協、退讓”的意思,如果一個協程執行到一段代碼需要歇會兒,那么它將把執行權讓出來,如果它不這么做,沒人跟它搶。
在 接觸 Kotlin 的協程之前呢,我們先給大家看一個 Lua 的例子,比較直觀:
function foo(a) print("foo", a) return coroutine.yield(2 * a) end co = coroutine.create(function ( a, b ) print("co-body", a, b) local r = foo(a + 1) print("co-body", r) local r, s = coroutine.yield(a + b, a - b) print("co-body", r, s) return b, "end" end) print("main", coroutine.resume(co, 1, 10)) print("main", coroutine.resume(co, "r")) print("main", coroutine.resume(co, "x", "y")) print("main", coroutine.resume(co, "x", "y"))運行結果如下:
co-body 1 10 foo 2 main true 4 co-body r main true 11 -9 co-body x y main true 10 end main false cannot resume dead coroutine首先定義了一個 foo 函數,然后創建 coroutine,創建了之后還需要調用 resume 才能執行協程,運行過程是謙讓的,是交替的:
圖中數字表示第n次
協程為我們的程序提供了一種暫停的能力,就好像狀態機,只有等到下一次輸入,它才做狀態轉移。顯然,用協程來描述一個狀態機是再合適不過的了。
也許大家對 lua 的語法不是很熟悉,不過沒關系,上面的例子只需要知道大概是在干什么就行:這例子就好像,main 和 Foo 在交替干活,有點兒像 A B 兩個人分工協作,A 干一會兒 B 來,B 干一會兒,再讓 A 來一樣。如果我們用線程來描述這個問題,那么可能會用到很多回調,相信寫 Js 的兄弟聽到這兒要感到崩潰了,因為 Js 的代碼寫著寫著就容易回調滿天飛,業務邏輯的實現越來越抽象,可讀性越來越差;而用協程的話,就好像一個很平常的同步操作一樣,一點兒異步任務的感覺都沒有。
我們前面提到的協程的非搶占調度方式,以及這個交替執行代碼的例子,基本上可以說明協程實際上致力于用同步一樣的代碼來完成異步任務的運行。
一句話,有了協程,你的異步程序看起來就像同步代碼一樣。
2. Kotlin 協程初體驗
Kotlin 1.1 對協程的基本支持都在 Kotlin 標準庫當中,主要涉及兩個類和幾個包級函數和擴展方法:
CoroutineContext,協程的上下文,這個上下文可以是多個的組合,組合的上下文可以通過 key 來獲取。EmptyCoroutineContext 是一個空實現,沒有任何功能,如果我們在使用協程時不需要上下文,那么我們就用這個對象作為一個占位即可。上下文這個東西,不管大家做什么應用,總是能遇到,比如 Android 里面的 Context,JSP 里面的 PageContext 等等,他們扮演的角色都大同小異:資源管理,數據持有等等,協程的上下文也基本上是如此。
Continuation,顧名思義,繼續、持續的意思。我們前面說過,協程提供了一種暫停的能力,可繼續執行才是最終的目的,Continuation 有兩個方法,一個是 resume,如果我們的程序沒有任何異常,那么直接調用這個方法并傳入需要返回的值;另一個是 resumeWithException,如果我們的程序出了異常,那我們可以通過調用這個方法把異常傳遞出去。
- 協程的基本操作,包括創建、啟動、暫停和繼續,繼續的操作在 Continuation 當中,剩下的三個都是包級函數或擴展方法:
這幾個類和函數其實與我們前面提到的 Lua 的協程 API 非常相似,都是協程最基礎的 API。
除此之外,Kotlin 還增加了一個關鍵字:suspend,用作修飾會被暫停的函數,被標記為 suspend 的函數只能運行在協程或者其他 suspend 函數當中。
好,介紹完這些基本概念,讓我們來看一個例子:
fun main(args: Array<String>) { log("before coroutine") //啟動我們的協程 asyncCalcMd5("test.zip") { log("in coroutine. Before suspend.") //暫停我們的線程,并開始執行一段耗時操作 val result: String = suspendCoroutine { continuation -> log("in suspend block.") continuation.resume(calcMd5(continuation.context[FilePath]!!.path)) log("after resume.") } log("in coroutine. After suspend. result = $result") } log("after coroutine") } /** * 上下文,用來存放我們需要的信息,可以靈活的自定義 */ class FilePath(val path: String): AbstractCoroutineContextElement(FilePath){ companion object Key : CoroutineContext.Key<FilePath> } fun asyncCalcMd5(path: String, block: suspend () -> Unit) { val continuation = object : Continuation<Unit> { override val context: CoroutineContext get() = FilePath(path) override fun resume(value: Unit) { log("resume: $value") } override fun resumeWithException(exception: Throwable) { log(exception.toString()) } } block.startCoroutine(continuation) } fun calcMd5(path: String): String{ log("calc md5 for $path.") //暫時用這個模擬耗時 Thread.sleep(1000) //假設這就是我們計算得到的 MD5 值 return System.currentTimeMillis().toString() }這段程序在模擬計算文件的 Md5 值。我們知道,文件的 Md5 值計算是一項耗時操作,所以我們希望啟動一個協程來處理這個耗時任務,并在任務運行結束時打印出來計算的結果。
我們先來一段一段分析下這個示例:
/** * 上下文,用來存放我們需要的信息,可以靈活的自定義 */ class FilePath(val path: String): AbstractCoroutineContextElement(FilePath){ companion object Key : CoroutineContext.Key<FilePath> }我們在計算過程中需要知道計算哪個文件的 Md5,所以我們需要通過上下文把這個路徑傳入協程當中。如果有多個數據,也可以一并添加進去,在運行當中,我們可以通過 Continuation 的實例拿到上下文,進而獲取到這個路徑:
continuation.context[FilePath]!!.path接著,我們再來看下 Continuation:
val continuation = object : Continuation<Unit> { override val context: CoroutineContext get() = FilePath(path) override fun resume(value: Unit) { log("resume: $value") } override fun resumeWithException(exception: Throwable) { log(exception.toString()) } }我們除了給定了 FilePath 這樣一個上下文之外就是簡單的打了幾行日志,比較簡單。這里傳入的 Continuation 當中的 resume 和 resumeWithException 只有在協程最終執行完成后才會被調用,這一點需要注意一下,也正是因為如此,startCoroutine 把它叫做 completion:
public fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T>那么下面我們看下最關鍵的這段代碼:
asyncCalcMd5("test.zip") { log("in coroutine. Before suspend.") //暫停我們的協程,并開始執行一段耗時操作 val result: String = suspendCoroutine { continuation -> log("in suspend block.") continuation.resume(calcMd5(continuation.context[FilePath]!!.path)) log("after resume.") } log("in coroutine. After suspend. result = $result") }suspendCoroutine 這個方法將外部的代碼執行權拿走,并轉入傳入的 Lambda 表達式中,而這個表達式當中的操作就對應異步的耗時操作了,在這里我們“計算”出了 Md5 值,接著調用 continuation.resume 將結果傳了出去,傳給了誰呢?傳給了 suspendCoroutine 的返回值也即 result,這時候協程繼續執行,打印 result 結束。
下面就是運行結果了:
2017-01-30T06:43:52.284Z [main] before coroutine 2017-01-30T06:43:52.422Z [main] in coroutine. Before suspend. 2017-01-30T06:43:52.423Z [main] in suspend block. 2017-01-30T06:43:52.423Z [main] calc md5 for test.zip. 2017-01-30T06:43:53.426Z [main] after resume. 2017-01-30T06:43:53.427Z [main] in coroutine. After suspend. result = 1485758633426 2017-01-30T06:43:53.427Z [main] resume: 1485758633426 2017-01-30T06:43:53.427Z [main] after coroutine細心的讀者肯定一看就發現,所謂的異步操作是怎么個異步法?從日志上面看,明明上面這段代碼就是順序執行的嘛,不然 after coroutine 這句日志為什么非要等到最后才打印?
還有,整個程序都只運行在了主線程上,我們的日志足以說明這一點了,根本沒有異步嘛。難道說協程就是一個大騙子??
3. 實現異步
這一部分我們就要回答上一節留下的問題。不過在此之前,我們再來回顧一下協程存在的意義:讓異步代碼看上去像同步代碼,直接自然易懂。至于它如何做到這一點,可能各家的語言實現各有不同,但協程給人的感覺更像是底層并發 API(比如線程)的語法糖。當然,如果你愿意,我們通常所謂的線程也可以被稱作操作系統級 API 的語法糖了吧,畢竟各家語言對于線程的實現也各有不同,這個就不是我們今天要討論的內容了。
不管怎么樣,你只需要知道,協程的異步需要依賴比它更底層的 API 支持,那么在 Kotlin 當中,這個所謂的底層 API 就非線程莫屬了。
知道了這一點,我們就要考慮想辦法來把前面的示例完善一下了。
首先我們實例化一個線程池:
private val executor = Executors.newSingleThreadScheduledExecutor { Thread(it, "scheduler") }接著我們把計算 Md5 的部分交給線程池去運行:
asyncCalcMd5("test.zip") { log("in coroutine. Before suspend.") //暫停我們的線程,并開始執行一段耗時操作 val result: String = suspendCoroutine { continuation -> log("in suspend block.") executor.submit { continuation.resume(calcMd5(continuation.context[FilePath]!!.path)) log("after resume.") } } log("in coroutine. After suspend. result = $result") executor.shutdown() }那么結果呢?
2017-01-30T07:18:04.496Z [main] before coroutine 2017-01-30T07:18:04.754Z [main] in coroutine. Before suspend. 2017-01-30T07:18:04.757Z [main] in suspend block. 2017-01-30T07:18:04.765Z [main] after coroutine 2017-01-30T07:18:04.765Z [scheduler] calc md5 for test.zip. 2017-01-30T07:18:05.769Z [scheduler] in coroutine. After suspend. result = 1485760685768 2017-01-30T07:18:05.769Z [scheduler] resume: 1485760685768 2017-01-30T07:18:05.769Z [scheduler] after resume.我們看到在協程被暫停的那一刻,協程外面的代碼被執行了。一段時間之后,協程被繼續執行,打印結果。
截止到現在,我們用協程來實現異步操作的功能已經實現。
你可能要問,如果我們想要完成異步操作,直接用線程池加回調豈不更直接簡單,為什么要用協程呢,搞得代碼這么讓人費解不說,也沒有變的很簡單啊。
說的對,如果我們實際當中把協程的代碼都寫成這樣,肯定會被蛋疼死,我前面展示給大家的,是 Kotlin 標準庫當中最為基礎的 API,看起來非常的原始也是理所應當的,如果我們對其加以封裝,那效果肯定大不一樣。
除此之外,在高并發的場景下,多個協程可以共享一個或者多個線程,性能可能會要好一些。舉個簡單的例子,一臺服務器有 1k 用戶與之連接,如果我們采用類似于 Tomcat 的實現方式,一個用戶開一個線程去處理請求,那么我們將要開 1k 個線程,這算是個不小的數目了;而我們如果使用協程,為每一個用戶創建一個協程,考慮到同一時刻并不是所有用戶都需要數據傳輸,因此我們并不需要同時處理所有用戶的請求,那么這時候可能只需要幾個專門的 IO 線程和少數來承載用戶請求對應的協程的線程,只有當用戶有數據傳輸事件到來的時候才去相應,其他時間直接掛起,這種事件驅動的服務器顯然對資源的消耗要小得多。
4. 進一步封裝
這一節的內容較多的參考了 Kotlin 官方的 https://github.com/Kotlin/kotlin-coroutines“>Coroutine Example,里面有更多的例子,大家可以參考學習。
4.1 異步
剛才那個示例讓我們感覺到,寫個協程調用異步代碼實在太原始了,所以我們決定對它做一下封裝。如果我們能在調用 suspendCoroutine 的時候直接把后面的代碼攔截,并切到線程池當中執行,那么我們就不用每次自己搞一個線程池來做這事兒了,嗯,讓我們研究下有什么辦法可以做到這一點。
攔截…怎么攔截呢?
public interface ContinuationInterceptor : CoroutineContext.Element { companion object Key : CoroutineContext.Key<ContinuationInterceptor> public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> }我們發現,Kotlin 的協程 API 當中提供了這么一個攔截器,可以把協程的操作攔截,傳入的是原始的 Continuation,返回的是我們經過線程切換的 Continuation,這樣就可以實現我們的目的了。
open class Pool(val pool: ForkJoinPool) : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { override fun <T> interceptContinuation(continuation: Continuation<T>) : Continuation<T> = PoolContinuation(pool, //下面這段代碼是要查找其他攔截器,并保證能調用它們的攔截方法 continuation.context.fold(continuation, { cont, element -> if (element != this@Pool && element is ContinuationInterceptor) element.interceptContinuation(cont) else cont })) } private class PoolContinuation<T>( val pool: ForkJoinPool, val continuation: Continuation<T> ) : Continuation<T> by continuation { override fun resume(value: T) { if (isPoolThread()) continuation.resume(value) else pool.execute { continuation.resume(value) } } override fun resumeWithException(exception: Throwable) { if (isPoolThread()) continuation.resumeWithException(exception) else pool.execute { continuation.resumeWithException(exception) } } fun isPoolThread(): Boolean = (Thread.currentThread() as? ForkJoinWorkerThread)?.pool == pool }這個 Pool 是什么鬼?我們讓它繼承 AbstractCoroutineContextElement 表明它其實就是我們需要的上下文。實際上這個上下文可以給任意協程使用,于是我們再定義一個 object:
object CommonPool : Pool(ForkJoinPool.commonPool())有了這個,我們就可以把沒加線程池的版本改改了:
fun main(args: Array<String>) { log("before coroutine") //啟動我們的協程 asyncCalcMd5("test.zip") { ... } log("after coroutine") //加這句的原因是防止程序在協程運行完之前停止 CommonPool.pool.awaitTermination(10000, TimeUnit.MILLISECONDS) } ... fun asyncCalcMd5(path: String, block: suspend () -> String) { val continuation = object : Continuation<String> { override val context: CoroutineContext //注意這個寫法,上下文可以通過 + 來組合使用 get() = FilePath(path) + CommonPool ... } block.startCoroutine(continuation) } ...那么運行結果呢?
2017-01-30T09:13:11.183Z [main] before coroutine 2017-01-30T09:13:11.334Z [main] after coroutine 2017-01-30T09:13:11.335Z [ForkJoinPool.commonPool-worker-1] in coroutine. Before suspend. 2017-01-30T09:13:11.337Z [ForkJoinPool.commonPool-worker-1] in suspend block. 2017-01-30T09:13:11.337Z [ForkJoinPool.commonPool-worker-1] calc md5 for test.zip. 2017-01-30T09:13:12.340Z [ForkJoinPool.commonPool-worker-1] after resume. 2017-01-30T09:13:12.341Z [ForkJoinPool.commonPool-worker-1] in coroutine. After suspend. result = 1485767592340 2017-01-30T09:13:12.341Z [ForkJoinPool.commonPool-worker-1] resume: 1485767592340我們看到程序已經非常完美的實現異步調用。顯然,這種寫法要比線程池回調的寫法看上去順理成章得多。
4.2 啟動協程
在討論完異步的封裝后,有人肯定還是會提出新問題:啟動協程的寫法是不是有點兒啰嗦了啊?沒錯,每次構造一個 Continuation,也沒干多少事兒,實在沒什么必要,干脆封裝一個通用的版本得了:
class StandaloneCoroutine(override val context: CoroutineContext): Continuation<Unit> { override fun resume(value: Unit) {} override fun resumeWithException(exception: Throwable) { //處理異常 val currentThread = Thread.currentThread() currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception) } }這樣就好辦了,我們每次啟動協程只需要針對當前協程提供特定的上下文即可,那么我們是不是再把啟動的那個函數改改呢?
fun launch(context: CoroutineContext, block: suspend () -> Unit) = block.startCoroutine(StandaloneCoroutine(context))有了這個,我們前面的代碼就可以進一步修改:
fun main(args: Array<String>) { log("before coroutine") //啟動我們的協程 launch(FilePath("test.zip") + CommonPool) { log("in coroutine. Before suspend.") //暫停我們的線程,并開始執行一段耗時操作 val result: String = suspendCoroutine { continuation -> log("in suspend block.") continuation.resume(calcMd5(continuation.context[FilePath]!!.path)) log("after resume.") } log("in coroutine. After suspend. result = $result") } log("after coroutine") CommonPool.pool.awaitTermination(10000, TimeUnit.MILLISECONDS) } /** * 上下文,用來存放我們需要的信息,可以靈活的自定義 */ class FilePath(val path: String) : AbstractCoroutineContextElement(Key) { companion object Key : CoroutineContext.Key<FilePath> } fun calcMd5(path: String): String { log("calc md5 for $path.") //暫時用這個模擬耗時 Thread.sleep(1000) //假設這就是我們計算得到的 MD5 值 return System.currentTimeMillis().toString() }運行結果自然也沒什么好說的。
4.3 暫停協程
暫停協程這塊兒也太亂了,看著莫名其妙的,能不能直白一點兒呢?其實我們的代碼不過是想要獲取 Md5 的值,所以如果能寫成下面這樣就好了:
val result = calcMd5(continuation.context[FilePath]!!.path).await()毋庸置疑,這肯定是可以的。想一下,有哪個類可以支持我們直接阻塞線程,等到獲取到結果之后再返回呢?當然是 Future 了。
suspend fun <T> CompletableFuture<T>.await(): T { return suspendCoroutine { continuation -> whenComplete { result, e -> if (e == null) continuation.resume(result) else continuation.resumeWithException(e) } } }我們干脆就直接給 CompletableFuture 定義一個擴展方法,當中只是用來掛起協程,并在結果拿到之后繼續執行協程。這樣,我們的代碼可以進一步修改:
fun main(args: Array<String>) { log("before coroutine") //啟動我們的協程 val coroutineContext = FilePath("test.zip") + CommonPool launch(coroutineContext) { log("in coroutine. Before suspend.") //暫停我們的線程,并開始執行一段耗時操作 val result: String = calcMd5(coroutineContext[FilePath]!!.path).await() log("in coroutine. After suspend. result = $result") } log("after coroutine") CommonPool.pool.awaitTermination(10, TimeUnit.SECONDS) } fun calcMd5(path: String): CompletableFuture<String> = CompletableFuture.supplyAsync { log("calc md5 for $path.") //暫時用這個模擬耗時 Thread.sleep(1000) //假設這就是我們計算得到的 MD5 值 System.currentTimeMillis().toString() } ... 省略掉一些沒有修改的代碼 ...4.4 帶有 Receiver 的協程
不知道大家注意到沒有, 4.3 的代碼中有個地方比較別扭:
val coroutineContext = FilePath("test.zip") + CommonPool launch(coroutineContext) { ... //在協程內部想要訪問上下文居然需要用到外部的變量 val result: String = calcMd5(coroutineContext[FilePath]!!.path).await() ... }在協程內部想要訪問上下文居然需要用到外部的變量。這個上下文畢竟是協程自己的,自己居然沒有辦法直接獲取到,一點兒都不自然。
其實這也不是沒有辦法,startCoroutine 其實還有一個帶 receiver 的版本:
public fun <R, T> (suspend R.() -> T).startCoroutine( receiver: R, completion: Continuation<T>也就是說,我們不僅可以傳入一個獨立的函數作為協程的代碼塊,還可以將一個對象的方法傳入,也就是說,我們完全可以在啟動協程的時候為它指定一個 receiver:
fun <T> launch( receiver: T, context: CoroutineContext, block: suspend T.() -> Unit) = block.startCoroutine(receiver, StandaloneCoroutine(context))我們修改了 launch,加入了 receiver,于是我們的代碼也可以這么改:
val coroutineContext = FilePath("test.zip") + CommonPool //需要傳入 receiver launch(coroutineContext, coroutineContext) { ... //注意下面直接用 this 來獲取路徑 val result: String = calcMd5(this[FilePath]!!.path).await() ... }如果你覺得絕大多數情況下 receiver 都會是上下文那么上面的代碼還可以接著簡化:
fun launchWithContext( context: CoroutineContext, block: suspend CoroutineContext.() -> Unit) = launch(context, context, block) launchWithContext(FilePath("test.zip") + CommonPool) { log("in coroutine. Before suspend.") //暫停我們的線程,并開始執行一段耗時操作 val result: String = calcMd5(this[FilePath]!!.path).await() log("in coroutine. After suspend. result = $result") }截止到現在,我們對最初的代碼做了各種封裝,這些封裝后的代碼可以在各種場景下直接使用,于是我們的協程代碼也得到了大幅簡化。另外,不知道大家有沒有注意到,協程當中異常的處理也要比直接用線程寫回調的方式容易的多,我們只需要在 Continuation 當中覆寫 resumeWithException 方法就可以做到這一點。
5. 拿來主義:Kotlinx.Coroutine
https://github.com/Kotlin/kotlinx.coroutines“>Kotlinx.Coroutine 是官方單獨發出來的一個 Coroutine 的庫,這個庫為什么沒有隨著標準庫一并發出來,想必大家從其包名就能略窺一二:kotlinx.coroutines.experimental,experimental,還處于試驗階段。不過既然敢隨著 1.1 Beta 一并發出來,也說明后面的大方向不會太遠,大家可以直接開始嘗試其中的 API 了。
應該說,Kotlinx.Coroutine 做的事情跟我們在上一節做的事情是相同的,只不過它在這個方向上面走的更遠。有關它的一些用法和細節,我們將在下一期給大家介紹。
6. 小結
本文主要對 Kotlin 1.1Beta 標準庫的 Coroutine API 做了介紹,也給出了相應的示例向大家展示 Coroutine 能為我們帶來什么。
協程是干什么的?是用來讓異步代碼更具表現力的。如果運用得當,它將讓我們免于回調嵌套之苦,并發加鎖之痛,使我們能夠利用我們有限的時間寫出更有魅力的程序。
總結
以上是生活随笔為你收集整理的深入理解 Kotlin Coroutine (一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kotlin极简教程:第5章 集合类
- 下一篇: 深入理解 Kotlin coroutin