协程的创建
先来看一个简单的示例代码:
fun main() {
/**
* 创建协程
* 一个 suspend Lambda 可以通过 createCoroutine 方法创建一个协程
*/
val continuation = suspend {
println("suspend 匿名函数")
fun1()
5
}.createCoroutine(object : Continuation<Int> {
override val context: CoroutineContext = EmptyCoroutineContext
override fun resumeWith(result: Result<Int>) {
println("协程执行完毕,result = $result")
}
})
/**
* 启动协程
*/
continuation.resume(Unit)
}
suspend fun fun1() = suspendCoroutine<Unit> {
thread {
Thread.sleep(1000)
println("suspend fun1")
it.resume(Unit)
}
}
在这个例子中,我们通过 createCoroutine
创建了一个协程。它接收一个 Continuation<T>
参数,用来接收协程执行完毕的结果,并返回一个 Continuation<Unit>
,我们可以通过调用它的 resume(Unit)
来启动协程。
源码中 createCoroutine
的定义如下:
public fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
): Continuation<Unit>
可以看到,这个函数是 suspend function
的扩展函数,作用就是将 suspend { ... }
这样的协程体编译成协程框架可识别的对象。
挂起与恢复
我们通过一个 suspend Lambda 去创建一个协程,这个 suspend Lambda 就是协程具体执行的逻辑,或者称之为协程体。
suspend { ... }
其实会被编译成一个 SuspendLambda
的实现类(位于 kotlin.coroutines.jvm.internal
包),它继承自 ContinuationImpl
(基类是 BaseContinuationImpl
),并实现了核心的 invokeSuspend()
方法。
这个方法就是协程的“状态机”,它会根据 label
字段决定从哪一步继续执行:
int label;
public final Object invokeSuspend(Object $result) {
Object suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
System.out.println("suspend 匿名函数");
Continuation cont = (Continuation)this;
this.label = 1;
if (StartCoroutineKt.fun1(cont) == suspended) {
return suspended; // 挂起
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return Boxing.boxInt(5);
}
可以看到:
label = 0
:表示初次进入协程,执行第一段逻辑,遇到fun1()
时可能挂起。label = 1
:表示从fun1()
恢复后继续往下执行。每一个
suspend
调用点,都会编译成一个case
,挂起点就是switch
状态机的跳转点。
为什么能恢复?
因为挂起函数本质上接收一个 Continuation
参数,当 fun1
内部调用 it.resume(Unit)
时,会触发外部 Continuation
的 resumeWith
,重新进入 invokeSuspend()
,并根据 label
跳到上次挂起的位置。
Kotlin 的基类 BaseContinuationImpl
就实现了这种恢复逻辑:
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param) // 继续执行状态机
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted()
if (completion is BaseContinuationImpl) {
// 尾递归优化,继续向上层恢复
current = completion
param = outcome
} else {
// 到达顶层,返回结果
completion.resumeWith(outcome)
return
}
}
}
}
}
协程的启动
回到最开始的 createCoroutine
。它返回的其实是一个 SafeContinuation
,这是对 suspend { ... }
编译后实现类的装饰。SafeContinuation
内部保存了一个 delegate
,就是前面提到的 SuspendLambda
实现类。
当我们调用 resume(Unit)
时,会进入 SafeContinuation.resumeWith()
:
public actual override fun resumeWith(result: Result<T>) {
while (true) { // lock-free loop
val cur = this.result // atomic read
when {
cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
delegate.resumeWith(result) // 启动协程体
return
}
else -> throw IllegalStateException("Already resumed")
}
}
}
也就是说:
首次调用时,
SafeContinuation
会把调用转发给delegate
,也就是SuspendLambda
的resumeWith
。后者再调用
invokeSuspend()
,从状态机的 case 0 开始执行协程体。如果中途遇到挂起点,就返回
COROUTINE_SUSPENDED
并保存状态;恢复时再继续执行下一个 case。
到这里,Kotlin 协程的 “创建 → 挂起 → 恢复 → 启动” 的完整闭环就串起来了。