协程的创建

先来看一个简单的示例代码:

fun main() {
    /**
     * 创建协程
     * 一个 suspend Lambda 可以通过 createCoroutine 方法创建一个协程
     */
    val continuation = suspend {
        println("suspend 匿名函数")
        fun1()
        5
    }.createCoroutine(object : Continuation<Int> {
        override val context: CoroutineContext = EmptyCoroutineContext

        override fun resumeWith(result: Result<Int>) {
            println("协程执行完毕,result = $result")
        }
    })

    /**
     * 启动协程
     */
    continuation.resume(Unit)
}

suspend fun fun1() = suspendCoroutine<Unit> {
    thread {
        Thread.sleep(1000)
        println("suspend fun1")
        it.resume(Unit)
    }
}

在这个例子中,我们通过 createCoroutine 创建了一个协程。它接收一个 Continuation<T> 参数,用来接收协程执行完毕的结果,并返回一个 Continuation<Unit>,我们可以通过调用它的 resume(Unit) 来启动协程。

源码中 createCoroutine 的定义如下:

public fun <T> (suspend () -> T).createCoroutine(
    completion: Continuation<T>
): Continuation<Unit>

可以看到,这个函数是 suspend function 的扩展函数,作用就是将 suspend { ... } 这样的协程体编译成协程框架可识别的对象。

挂起与恢复

我们通过一个 suspend Lambda 去创建一个协程,这个 suspend Lambda 就是协程具体执行的逻辑,或者称之为协程体

suspend { ... } 其实会被编译成一个 SuspendLambda 的实现类(位于 kotlin.coroutines.jvm.internal 包),它继承自 ContinuationImpl(基类是 BaseContinuationImpl),并实现了核心的 invokeSuspend() 方法。

这个方法就是协程的“状态机”,它会根据 label 字段决定从哪一步继续执行:

int label;

public final Object invokeSuspend(Object $result) {
    Object suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
    switch (this.label) {
       case 0:
          ResultKt.throwOnFailure($result);
          System.out.println("suspend 匿名函数");
          Continuation cont = (Continuation)this;
          this.label = 1;
          if (StartCoroutineKt.fun1(cont) == suspended) {
             return suspended; // 挂起
          }
          break;
       case 1:
          ResultKt.throwOnFailure($result);
          break;
       default:
          throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }
    return Boxing.boxInt(5);
}

可以看到:

  1. label = 0:表示初次进入协程,执行第一段逻辑,遇到 fun1() 时可能挂起。

  2. label = 1:表示从 fun1() 恢复后继续往下执行。

  3. 每一个 suspend 调用点,都会编译成一个 case,挂起点就是 switch 状态机的跳转点。

为什么能恢复?

因为挂起函数本质上接收一个 Continuation 参数,当 fun1 内部调用 it.resume(Unit) 时,会触发外部 ContinuationresumeWith,重新进入 invokeSuspend(),并根据 label 跳到上次挂起的位置。

Kotlin 的基类 BaseContinuationImpl 就实现了这种恢复逻辑:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        var param = result
        while (true) {
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!!
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param) // 继续执行状态机
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted()
                if (completion is BaseContinuationImpl) {
                    // 尾递归优化,继续向上层恢复
                    current = completion
                    param = outcome
                } else {
                    // 到达顶层,返回结果
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
}

协程的启动

回到最开始的 createCoroutine。它返回的其实是一个 SafeContinuation,这是对 suspend { ... } 编译后实现类的装饰。SafeContinuation 内部保存了一个 delegate,就是前面提到的 SuspendLambda 实现类。

当我们调用 resume(Unit) 时,会进入 SafeContinuation.resumeWith()

public actual override fun resumeWith(result: Result<T>) {
    while (true) { // lock-free loop
        val cur = this.result // atomic read
        when {
            cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
            cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
                delegate.resumeWith(result) // 启动协程体
                return
            }
            else -> throw IllegalStateException("Already resumed")
        }
    }
}

也就是说:

  • 首次调用时,SafeContinuation 会把调用转发给 delegate,也就是 SuspendLambdaresumeWith

  • 后者再调用 invokeSuspend(),从状态机的 case 0 开始执行协程体。

  • 如果中途遇到挂起点,就返回 COROUTINE_SUSPENDED 并保存状态;恢复时再继续执行下一个 case。

到这里,Kotlin 协程的 “创建 → 挂起 → 恢复 → 启动” 的完整闭环就串起来了。