協程的狀態機
這一章會以下面的代碼為例解析一下協程啟動,掛起以及恢復的流程:
- private suspend fun getId(): String {
- return GlobalScope.async(Dispatchers.IO) {
- delay(1000)
- "hearing"
- }.await()
- }
- private suspend fun getAvatar(id: String): String {
- return GlobalScope.async(Dispatchers.IO) {
- delay(1000)
- "avatar-$id"
- }.await()
- }
- fun main() {
- GlobalScope.launch {
- val id = getId()
- val avatar = getAvatar(id)
- println("${Thread.currentThread().name} - $id - $avatar")
- }
- }
上面 main 方法中,GlobalScope.launch 啟動的協程體在執行到 getId 后,協程體會掛起,直到 getId 返回可用結果,才會 resume launch 協程,執行到 getAvatar 也是同樣的過程。協程內部實現使用狀態機來處理不同的掛起點,將 GlobalScope.launch 協程體字節碼反編譯成 Java 代碼,大致如下(有所刪減):
- BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null,
- (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
- int label;
- public final Object invokeSuspend(
- Object $result) {
- Object var10000;
- String id;
- label17: {
- CoroutineScope $this$launch;
- switch(this.label) {
- case 0: // a
- ResultKt.throwOnFailure($result);
- $this$launch = this.p$;
- this.label = 1; // label置為1
- var10000 = getId(this);
- if (var10000 == COROUTINE_SUSPENDED) {
- return COROUTINE_SUSPENDED;
- }
- // 若此時已經有結果,則不掛起,直接break
- break;
- case 1: // b
- ResultKt.throwOnFailure($result);
- var10000 = $result;
- break;
- case 2: // d
- id = (String)this.L$1;
- ResultKt.throwOnFailure($result);
- var10000 = $result;
- break label17; // 退出label17
- default:
- throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
- }
- // c
- id = (String)var10000;
- this.L$1 = id; // 將id賦給L$1
- this.label = 2; // label置為2
- var10000 = getAvatar(id, this);
- if (var10000 == COROUTINE_SUSPENDED) {
- return COROUTINE_SUSPENDED;
- }
- }
- // e
- String avatar = (String)var10000;
- String var5 = var9.append(var10001.getName()).append(" - ").append(id).append(" - ").append(avatar).toString();
- System.out.println(var5);
- return Unit.INSTANCE;
- }
- public final Continuation create(
- Object value,
- Continuation completion) {
- Intrinsics.checkParameterIsNotNull(completion, "completion");
- Function2 var3 = new <anonymous constructor>(completion);
- var3.p$ = (CoroutineScope)value;
- return var3;
- }
- public final Object invoke(Object var1, Object var2) {
- return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
- }
- }
這里我們根據上面的注釋以及字母標簽來看一下執行流程(invokeSuspend 方法會在協程體中的 suspend 函數得到結果后被調用,具體是在哪里被調用的稍后會講到):
- a: launch 協程體剛執行到 getId 方法時,getId 方法的返回值將是 COROUTINE_SUSPENDED, 此時直接 return, 則 launch 協程體中 getId 后面的代碼暫時不會執行,即 launch 協程體被掛起(非阻塞, 該線程依舊會做其它工作)。這里將 label 置為了 1. 而若此時 getId 已經有結果(內部沒有調用 delay 之類的 suspend 函數等),則不掛起,而是直接 break。
- b: 若上面 a 中 getId 返回 COROUTINE_SUSPENDED, 則當 getId 有可用結果返回后,會重新執行 launch 協程體的 invokeSuspend 方法,根據上面的 label==1, 會執行到這里檢查一下 result 沒問題的話就 break, 此時 id 賦值給了 var10000。
- c: 在 a 中若直接 break 或 在 b 中得到 getId 的結果然后 break 后,都會執行到這里,得到 id 的值并把 label 置為2。然后調用 getAvatar 方法,跟 getId 類似,若其返回 COROUTINE_SUSPENDED 則 return,協程被掛起,等到下次 invokeSuspend 被執行,否則離開 label17 接著執行后續邏輯。
- d: 若上面 c 中 getAvatar 返回 COROUTINE_SUSPENDED, 則當 getAvatar 有可用結果返回后會重新調用 launch 協程體的 invokeSuspend 方法,此時根據 label==2 來到這里并取得之前的 id 值,檢驗 result(即avatar),然后break label17。
- e: c 中直接返回了可用結果 或 d 中 break label17 后,launch 協程體中的 suspend 函數都執行完畢了,這里會執行剩下的邏輯。
suspend 函數不會阻塞線程,且 suspend 函數不一定會掛起協程,如果相關調用的結果已經可用,則繼續運行而不掛起,例如 async{} 返回值 Deferred 的結果已經可用時,await()掛起函數可以直接返回結果,不用再掛起協程。
這一節看了一下 launch 協程體反編譯成 Java 后的代碼邏輯,關于 invokeSuspend 是何時怎么被調用的,將會在下面講到。
協程的創建與啟動
這一節以 CoroutineScope.launch {} 默認參數為例,從源碼角度看看 Kotlin 協程是怎樣創建與啟動的:
- public fun CoroutineScope.launch(
- context: CoroutineContext = EmptyCoroutineContext,
- start: CoroutineStart = CoroutineStart.DEFAULT,
- block: suspend CoroutineScope.() -> Unit
- ): Job {
- val newContext = newCoroutineContext(context)
- val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true)
- coroutine.start(start, coroutine, block)
- return coroutine
- }
- // AbstractCoroutine.kt
- // receiver: StandaloneCoroutine
- // block: suspend StandaloneCoroutine.() -> Unit
- // private open class StandaloneCoroutine(...) : AbstractCoroutine<Unit>(...) {}
- // public abstract class AbstractCoroutine<in T>(...) : JobSupport(active), Job, Continuation<T>, CoroutineScope {}
- public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
- // 調用 CoroutineStart 中的 invoke 方法
- start(block, receiver, this)
- }
- public enum class CoroutineStart {
- // block - StandaloneCoroutine.() -> Unit
- // receiver - StandaloneCoroutine
- // completion - StandaloneCoroutine<Unit>
- public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
- when (this) {
- // 根據 start 參數的類型調用不同的方法
- DEFAULT -> block.startCoroutineCancellable(receiver, completion)
- ATOMIC -> block.startCoroutine(receiver, completion)
- UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
- LAZY -> Unit // will start lazily
- }
- }
接下來看看 startCoroutineCancellable 方法:
- // receiver - StandaloneCoroutine
- // completion - StandaloneCoroutine<Unit>
- internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
- runSafely(completion) {
- createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
- }
createCoroutineUnintercepted 方法創建了一個 Continuation 類型(協程)的實例,即創建了一個協程:
- public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
- receiver: R, completion: Continuation<T>
- ): Continuation<Unit> {
- return if (this is BaseContinuationImpl) create(receiver, completion) else // ...
- }
調用的是 (suspend (R) -> T) 的 createCoroutineUnintercepted 方法,(suspend (R) -> T) 就是協程體。直接看上面示例代碼中 GlobalScope.launch 編譯后的字節碼,可以發現 CoroutineScope.launch 傳入的 lambda 表達式被編譯成了繼承 SuspendLambda 的子類:
- final class Main$main$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2
其繼承關系為: SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation, 因此走 create(receiver, completion) 方法,從上面反編譯出的 Java 代碼可以看到 create 方法創建了一個 Continuation 實例,再看一下 Kotlin 代碼編譯后的字節碼(包名已省略):
- public final create(Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
- // ...
- NEW Main$main$1
從上面可以看到,create 方法創建了 Main$main$1 實例,而其繼承自 SuspendLambda, 因此 create 方法創建的 Continuation 是一個 SuspendLambda 對象。
即 createCoroutineUnintercepted 方法創建了一個 SuspendLambda 實例。然后看看 intercepted 方法:
- public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
- // 如果是ContinuationImpl類型,則調用intercepted方法,否則返回自身
- // 這里的 this 是 Main$main$1 實例 - ContinuationImpl的子類
- (this as? ContinuationImpl)?.intercepted() ?: this
- // ContinuationImpl
- public fun intercepted(): Continuation<Any?> =
- // context[ContinuationInterceptor]是 CoroutineDispatcher 實例
- // 需要線程調度 - 返回 DispatchedContinuation,其 continuation 參數值為 SuspendLambda
- // 不需要線程調度 - 返回 SuspendLambda
- intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also { intercepted = it }
- // CoroutineDispatcher
- // continuation - SuspendLambda -> ContinuationImpl -> BaseContinuationImpl
- public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
- DispatchedContinuation(this, continuation)
接下來看看 resumeCancellableWith 是怎么啟動協程的,這里還涉及到Dispatchers線程調度的邏輯:
- internal class DispatchedContinuation<in T>(
- val dispatcher: CoroutineDispatcher,
- val continuation: Continuation<T>
- ) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
- public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) {
- // 進行線程調度,最后也會執行到continuation.resumeWith方法
- is DispatchedContinuation -> resumeCancellableWith(result)
- // 直接執行continuation.resumeWith方法
- else -> resumeWith(result)
- }
- inline fun resumeCancellableWith(result: Result<T>) {
- val state = result.toState()
- // 判斷是否需要線程調度
- if (dispatcher.isDispatchNeeded(context)) {
- _state = state
- resumeMode = MODE_CANCELLABLE
- // 需要調度則先進行調度
- dispatcher.dispatch(context, this)
- } else {
- executeUnconfined(state, MODE_CANCELLABLE) {
- if (!resumeCancelled()) {
- // 不需要調度則直接在當前線程執行協程
- resumeUndispatchedWith(result)
- }
- }
- }
- }
- inline fun resumeUndispatchedWith(result: Result<T>) {
- withCoroutineContext(context, countOrElement) {
- continuation.resumeWith(result)
- }
- }
- }
- 當需要線程調度時,則在調度后會調用 DispatchedContinuation.continuation.resumeWith 來啟動協程,其中 continuation 是 SuspendLambda 實例;
- 當不需要線程調度時,則直接調用 SuspendLambda.resumeWith 來啟動協程。
resumeWith 方法調用的是父類 BaseContinuationImpl 中的 resumeWith 方法:
- internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?) : Continuation<Any?>, CoroutineStackFrame, Serializable {
- public final override fun resumeWith(result: Result<Any?>) {
- // ...
- val outcome = invokeSuspend(param)
- // ...
- }
- }
因此,協程的啟動是通過 BaseContinuationImpl.resumeWith 方法調用到了子類 SuspendLambda.invokeSuspend 方法,然后通過狀態機來控制順序運行。
協程的掛起和恢復
Kotlin 編譯器會為 協程體 生成繼承自 SuspendLambda 的子類,協程的真正運算邏輯都在其 invokeSuspend 方法中。上一節介紹了 launch 是怎么創建和啟動協程的,在這一節我們再看看當協程代碼執行到 suspend 函數后,協程是怎么被掛起的 以及 當 suspend 函數執行完成得到可用結果后是怎么恢復協程的。
Kotlin 協程的內部實現使用了 Kotlin 編譯器的一些編譯技術,當 suspend 函數被調用時,都有一個隱式的參數額外傳入,這個參數是 Continuation 類型,封裝了協程 resume 后執行的代碼邏輯。
- private suspend fun getId(): String {
- return GlobalScope.async(Dispatchers.IO) {
- delay(1000)
- "hearing"
- }.await()
- }
- // Decompile成Java
- final Object getId(
- Continuation $completion) {
- // ...
- }
其中傳入的 $completion 參數,從上一節可以看到是調用 getId 方法所在的協程體對象,也就是一個 SuspendLambda 對象。Continuation的定義如下:
- public interface Continuation<in T> {
- public val context: CoroutineContext
- public fun resumeWith(result: Result<T>)
- }
將 getId 方法編譯后的字節碼反編譯成 Java 代碼如下(為便于閱讀,刪減及修改了部分代碼):
- final Object getId(
- Continuation $completion) {
- // 新建與啟動協程
- return BuildersKt.async$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
- int label;
- public final Object invokeSuspend(
- Object $result) {
- switch(this.label) {
- case 0:
- ResultKt.throwOnFailure($result);
- this.label = 1;
- if (DelayKt.delay(1000L, this) == COROUTINE_SUSPENDED) {
- return COROUTINE_SUSPENDED;
- }
- break;
- case 1:
- ResultKt.throwOnFailure($result);
- break;
- default:
- throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
- }
- return "hearing";
- }
- // ...
- }), 2, (Object)null).await($completion); // 調用 await() suspend 函數
- }
結合協程的狀態機一節,當上面的 launch 協程體執行到 getId 方法時, 會根據其返回值是否為 COROUTINE_SUSPENDED 來決定是否掛起,由于 getId 的邏輯是通過 async 啟動一個新的協程,協程體內調用了 suspend delay 方法,然后通過 await suspend 函數等待結果,當 async 協程沒完成時, await 會返回 COROUTINE_SUSPENDED, 因此 launch 協程體的 invokeSuspend 方法直接 return COROUTINE_SUSPENDED 值執行完成,此時 launch 啟動的協程處于掛起狀態但不阻塞所處線程,而 async 啟動的協程開始執行。
我們看一下 async 的源碼:
- public fun <T> CoroutineScope.async(...): Deferred<T> {
- val newContext = newCoroutineContext(context)
- val coroutine = if (start.isLazy) LazyDeferredCoroutine(newContext, block) else
- DeferredCoroutine<T>(newContext, active = true)
- coroutine.start(start, coroutine, block)
- return coroutine
- }
默認情況下,上面的 coroutine 取 DeferredCoroutine 實例,于是我們看一下其 await 方法以及在 async 協程執行完成后,是怎么恢復 launch 協程的:
- private open class DeferredCoroutine<T>(
- parentContext: CoroutineContext, active: Boolean
- ) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
- override suspend fun await(): T = awaitInternal() as T
- }
- // JobSupport
- internal suspend fun awaitInternal(): Any? {
- while (true) { // lock-free loop on state
- val state = this.state
- if (state !is Incomplete) {
- // 已經完成,則直接返回結果
- if (state is CompletedExceptionally) { // Slow path to recover stacktrace
- recoverAndThrow(state.cause)
- }
- return state.unboxState()
- }
- // 不需要重試時直接break,執行awaitSuspend
- if (startInternal(state) >= 0) break
- }
- return awaitSuspend() // slow-path
- }
- // suspendCoroutineUninterceptedOrReturn: 獲取當前協程,且掛起當前協程(返回COROUTINE_SUSPENDED)或不掛起直接返回結果
- private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
- val cont = AwaitContinuation(uCont.intercepted(), this)
- cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler))
- cont.getResult()
- }
上面 awaitInternal 的大致邏輯是當掛起函數已經有結果時則直接返回,否則掛起父協程,然后 invokeOnCompletion 方法將 ResumeAwaitOnCompletion 插入一個隊列(state.list)中,源碼就不再貼出了。接著看看在 async 執行完成后是怎么調用 ResumeAwaitOnCompletion 來 resume 被掛起的協程的。注意:不要繞進 async 協程體中 delay 是怎么掛起和恢復 async 協程的這一邏輯,我們不需要關注這一層!
接著 async 協程的執行往下看,從前面可知它會調用 BaseContinuationImpl.resumeWith 方法來執行協程邏輯,我們詳細看一下這個方法,在這里會執行該協程的 invokeSuspend 函數:
- internal abstract class BaseContinuationImpl(
- public val completion: Continuation<Any?>?
- ) : Continuation<Any?>, CoroutineStackFrame, Serializable {
- public final override fun resumeWith(result: Result<Any?>) {
- var current = this
- var param = result
- while (true) {
- with(current) {
- val completion = completion!! // fail fast when trying to resume continuation without completion
- val outcome: Result<Any?> =
- try {// 調用 invokeSuspend 方法執行協程邏輯
- val outcome = invokeSuspend(param)
- // 協程掛起時返回的是 COROUTINE_SUSPENDED,即協程掛起時,resumeWith 執行結束
- // 再次調用 resumeWith 時協程掛起點之后的代碼才能繼續執行
- if (outcome === COROUTINE_SUSPENDED) return
- Result.success(outcome)
- } catch (exception: Throwable) {
- Result.failure(exception)
- }
- releaseIntercepted() // this state machine instance is terminating
- if (completion is BaseContinuationImpl) {
- // unrolling recursion via loop
- current = completion
- param = outcome
- } else {
- // top-level completion reached -- invoke and return
- completion.resumeWith(outcome)
- return
- }
- }
- }
- }
- }
我們從上面的源碼可以看到,在 createCoroutineUnintercepted 方法中創建的 SuspendLambda 實例是 BaseContinuationImpl 的子類對象,其 completion 參數為下:
- launch: if (isLazy) LazyStandaloneCoroutine else StandaloneCoroutine
- async: if (isLazy) LazyDeferredCoroutine else DeferredCoroutine
上面這幾個類都是 AbstractCoroutine 的子類。而根據 completion 的類型會執行不同的邏輯:
- BaseContinuationImpl: 執行協程邏輯
- 其它: 調用 resumeWith 方法,處理協程的狀態,協程掛起后的恢復即與它有關
在上面的例子中 async 啟動的協程,它也會調用其 invokeSuspend 方法執行 async 協程邏輯,假設 async 返回的結果已經可用時,即非 COROUTINE_SUSPENDED 值,此時 completion 是 DeferredCoroutine 對象,因此會調用 DeferredCoroutine.resumeWith 方法,然后返回,父協程的恢復邏輯便是在這里。
- // AbstractCoroutine
- public final override fun resumeWith(result: Result<T>) {
- val state = makeCompletingOnce(result.toState())
- if (state === COMPLETING_WAITING_CHILDREN) return
- afterResume(state)
- }
在 makeCompletingOnce 方法中,會根據 state 去處理協程狀態,并執行上面插入 state.list 隊列中的 ResumeAwaitOnCompletion.invoke 來恢復父協程,必要的話還會把 async 的結果給它,具體代碼實現太多就不貼了,不是本節的重點。直接看 ResumeAwaitOnCompletion.invoke 方法:
- private class ResumeAwaitOnCompletion<T>(
- job: JobSupport, private val continuation: CancellableContinuationImpl<T>
- ) : JobNode<JobSupport>(job) {
- override fun invoke(cause: Throwable?) {
- val state = job.state
- assert { state !is Incomplete }
- if (state is CompletedExceptionally) {
- // Resume with with the corresponding exception to preserve it
- continuation.resumeWithException(state.cause)
- } else {
- // resume 被掛起的協程
- continuation.resume(state.unboxState() as T)
- }
- }
- }
這里的 continuation 就是 launch 協程體,也就是 SuspendLambda 對象,于是 invoke 方法會再一次調用到 BaseContinuationImpl.resumeWith 方法,接著調用 SuspendLambda.invokeSuspend, 然后根據 label 取值繼續執行接下來的邏輯!
suspendCoroutineUninterceptedOrReturn
接下來我們看一下怎么將一個基于回調的方法改造成一個基于協程的 suspend 方法,要實現這個需求,重點在于 suspendCoroutineUninterceptedOrReturn 方法,根據注釋,這個方法的作用是: Obtains the current continuation instance inside suspend functions and either suspends currently running coroutine or returns result immediately without suspension. 即獲取當前協程的實例,并且掛起當前協程或不掛起直接返回結果。函數定義如下:
- public suspend inline fun <T> suspendCoroutineUninterceptedOrReturn(crossinline block: (Continuation<T>) -> Any?): T {
- // ...
- }
根據 block 的返回值,有兩種情況:
- 如果 block 返回 COROUTINE_SUSPENDED, 意味著 suspend 函數會掛起當前協程而不會立即返回結果。這種情況下, block 中的 Continuation 需要在結果可用后調用 Continuation.resumeWith 來 resume 協程。
- 如果 block 返回的 T 是 suspend 函數的結果,則協程不會被掛起, block 中的 Continuation 不會被調用。
調用 Continuation.resumeWith 會直接在調用者的線程 resume 協程,而不會經過 CoroutineContext 中可能存在的 ContinuationInterceptor。建議使用更安全的 suspendCoroutine 方法,在其 block 中可以同步或在異步線程調用 Continuation.resume 和 Continuation.resumeWithException:
- public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T {
- contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
- return suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
- // 調用攔截器
- val safe = SafeContinuation(c.intercepted())
- block(safe)
- safe.getOrThrow()
- }
- }
此外除了 suspendCoroutine 方法,還有 suspendCancellableCoroutine, suspendAtomicCancellableCoroutine, suspendAtomicCancellableCoroutineReusable 等方法都可以用來將異步回調的方法封裝成 suspend 函數。
下面來看一個例子來介紹怎么將異步回調函數封裝成 suspend 函數:
- class NetFetcher {
- // 將下面的 request 方法封裝成 suspend 方法
- suspend fun requestSuspend(id: Int): String = suspendCoroutine { continuation ->
- request(id, object : OnResponseListener {
- override fun onResponse(response: String) {
- continuation.resume(response)
- }
- override fun onError(error: String) {
- continuation.resumeWithException(Exception(error))
- }
- })
- }
- fun request(id: Int, listener: OnResponseListener) {
- Thread.sleep(5000)
- if (id % 2 == 0) {
- listener.onResponse("success")
- } else {
- listener.onError("error")
- }
- }
- interface OnResponseListener {
- fun onResponse(response: String)
- fun onError(error: String)
- }
- }
- object Main {
- fun main() {
- requestByCoroutine()
- }
- // 使用回調
- private fun requestByCallback() {
- NetFetcher().request(21, object : NetFetcher.OnResponseListener {
- override fun onResponse(response: String) {
- println("result = $response")
- }
- override fun onError(error: String) {
- println("result = $error")
- }
- })
- }
- // 使用協程
- private fun requestByCoroutine() {
- GlobalScope.launch(Dispatchers.Main) {
- val result = withContext(Dispatchers.IO) {
- try {
- NetFetcher().requestSuspend(22)
- } catch (e: Exception) {
- e.message
- }
- }
為加深理解,再介紹一下 Kotlin 提供的兩個借助 suspendCancellableCoroutine 實現的掛起函數: delay & yield。
delay
delay 方法借助了 suspendCancellableCoroutine 方法來掛起協程:
- public suspend fun delay(timeMillis: Long) {
- if (timeMillis <= 0) return // don't delay
- return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
- cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
- }
- }
- override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
- postDelayed(Runnable {
- with(continuation) { resumeUndispatched(Unit) }
- }, timeMillis)
- }
可以看出這里 delay 的邏輯類似于 Handle 機制,將 resumeUndispatched 封裝的 Runnable 放到一個隊列中,在延遲的時間到達便會執行 resume 恢復協程。
yield
yield 方法作用是掛起當前協程,這樣可以讓該協程所在線程運行其他邏輯,當其他協程執行完成或也調用 yield 讓出執行權時,之前的協程可以恢復執行。
- launch(Dispatchers.Main) {
- repeat(3) {
- println("job1 $it")
- yield()
- }
- }
- launch(Dispatchers.Main) {
- repeat(3) {
- println("job2 $it")
- yield()
- }
- }
- // output
- job1 0
- job2 0
- job1 1
- job2 1
- job1 2
- job2 2
看一下 yield 的源碼:
- public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
- val context = uCont.context
- // 如果協程沒有調度器,或者像 Unconfined 一樣沒有進行調度則直接返回
- val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
- if (cont.dispatcher.isDispatchNeeded(context)) {
- // this is a regular dispatcher -- do simple dispatchYield
- cont.dispatchYield(context, Unit)
- } else {
- // This is either an "immediate" dispatcher or the Unconfined dispatcher
- // ...
- }
- COROUTINE_SUSPENDED
- }
- // DispatchedContinuation
- internal fun dispatchYield(context: CoroutineContext, value: T) {
- _state = value
- resumeMode = MODE_CANCELLABLE
- dispatcher.dispatchYield(context, this)
- }
- public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block)
可知 dispatchYield 會調用到 dispatcher.dispatch 方法將協程分發到調度器隊列中,這樣線程可以執行其他協程,等到調度器再次執行到該協程時,會 resume 該協程。
總結
通過上面協程的工作原理解析,可以從源碼中發現 Kotlin 中的協程存在著三層包裝:
- 第一層包裝: launch & async 返回的 Job, Deferred 繼承自 AbstractCoroutine, 里面封裝了協程的狀態,提供了 cancel 等接口;
- 第二層包裝: 編譯器生成的 SuspendLambda 子類,封裝了協程的真正執行邏輯,其繼承關系為 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl, 它的 completion 參數就是第一層包裝實例;
- 第三層包裝: DispatchedContinuation, 封裝了線程調度邏輯,它的 continuation 參數就是第二層包裝實例。
這三層包裝都實現了 Continuation 接口,通過代理模式將協程的各層包裝組合在一起,每層負責不同的功能,如下圖:
原文地址:https://ljd1996.github.io/2021/05/19/Kotlin%E7%AC%94%E8%AE%B0%E4%B9%8B%E5%8D%8F%E7%A8%8B%E5%B7%A5%E4%BD%9C%E5%8E%9F%E7%90%86/?utm_source=tuicool&utm_medium=referral