Day8:结构化并发 (Structured Concurrency)

还记得我们第一个 Coroutine 程序吗?

suspend fun showContents() = coroutineScope {
    launch {
        val token = login(userName, password)
        val content = fetch(token)
        withContext(Dispatchers.Main) {
            showContent(content)
        }
    }
}

launch 里面的 suspend 函式 login() 以及 fetch() 会继承 launch 的 coroutine context,所以会这两个 suspend 函式都会用相同的 coroutine context 来执行,会使用相同的调度器(Dispatchers),由於我们没有指定,所以这边我们会使用 Dispatchers.Default ,也就是背景执行。

Kotlin 的 Coroutine 是采用结构化的并发,在这段程序码的呼叫,会有一段起始点,在最外层的我称为父 Coroutine ,在内层的我称为子 Coroutine。父 Coroutine 会等到所有的子 Coroutine 完成之後才会结束。

所以上面的执行时间,会从第一个 login() 的执行时间到 showContent() 的执行时间。

在上一篇文章中,我们知道我们可以呼叫 Job 的 cancel() 来取消当下的任务,但是如果同时执行多个任务,也就是并发任务,那我们针对某个 Job 呼叫 cancel() 会发生什麽事呢?

取消 Job

我们知道 Kotlin 的 suspend 函式必须要在 Coroutine Scope 内才能执行,有的时候我们可能需要在一个范围内执行多个 Coroutine,如下:

fun main() = runBlocking {
    val job1 = launch {
        delay(100)
        println("Job1 done")
    }

    val job2 = launch {
        delay(1000)
        println("Job2 done")
    }
    println("Outer coroutine done")
}

Concurrency

第一个 launch 耗时 100 毫秒,第二个 launch 耗时 1000 毫秒,如同我们之前提到的,因为这两个 launch 是在 runBlocking 所产生的 Coroutine Scope 内,所以会先执行外层的程序,并且外层会等到子 Coroutine 都完成任务时才结束。

所以上方范例的结果会先显示出 Outer coroutine done ,接着才是 Job1 done以及 Job2 done

假如 job2 所花费的时间已经超出预期,我们可以主动呼叫 job2.cancel() 来把 job2 给停掉。

fun main() = runBlocking {
    val job1 = launch {
        delay(100)
        println("Job1 done")
    }

    val job2 = launch() {
        delay(1000)
        println("Job2 done")
    }
    println("Outer coroutine done")
    delay(300)
    job2.cancel()
}

cancel

→ 只显示了 Outer coroutine done 以及 Job1 done ,job2 则是被取消。

假设所有的子 job 需要被取消呢?我们该怎麽处理?

方法1:呼叫所有 job 的 cancel()

fun main() = runBlocking {
    val job1 = launch {
        delay(1000)
        println("Job1 done")
    }

    val job2 = launch() {
        delay(1000)
        println("Job2 done")
    }
    println("Outer coroutine done")
    delay(300)
    job1.cancel()
    job2.cancel()
}

Cancel all

→ 如同我们前面介绍的,我们可以呼叫 job 的 cancel()来取消 job,所以需要取消所有的 job,就呼叫所有的 cancel()

方法2:取消父 coroutine

用一个 launch 包住这两个 launch让这原本的 coroutine 成为这个 launch 的 子 coroutine,调用父 coroutine 的 cancel()

fun main() = runBlocking {
    val job = launch {
        val job1 = launch {
            delay(1000)
            println("Job1 done")
        }
        val job2 = launch {
            delay(1000)
            println("Job2 done")
        }
    }
    println("Outer coroutine done")
    delay(300)
    job.cancel()
}

→因为 Kotlin 的 coroutine 是有阶层的,当父 coroutine 被取消後,子 coroutine 也会同时被取消。

Nested coroutine

runBlocking 所产生的 Coroutine 为 BlockingCoroutine。

→ 所以我们只要呼叫 CoroutineScope 的 cancel() 就能够取消所有在这个 CoroutineScope 的 Job了。


小结

在结构化并发的架构下,父 Coroutine 会等到全部的子 Coroutine 都结束之後才会结束。而在这样的架构之下,如果没有特别设定子 Coroutine 的 coroutine context,就会继承父 Coroutine 的 context。父 Coroutine 被取消之後,所有的子 Coroutine 也会一并被取消,这样子的设计就不会发生当较高的阶层被取消後,较低的阶层还在运行,然後发生错误。

同样的,如果同一层的 Coroutine 有一个 Job 被取消,在後面尚未执行完成的 Job 也会同时被取消。

参考

(Blog)Structured Concurrency - Roman Elizarov
(Youtube)Structured Concurrency - Roman Elizarov

特别感谢

Kotlin Taiwan User Group
Kotlin 读书会


<<:  D12: 工程师太师了: 第6.5话

>>:  Day 12 - var 、let、const

第二十三天:再探 Gradle Plugin

今天要继续撰写 Gradle Plugin,我们会延续昨天的范例 - 档案差异比对 Plugin。 ...

【Day11】- 递回Recursion

递回(Recursion)的概念是将一个大的问题,分割成许多小问题去解决。而从程序设计角度来看,函式...

DAY 7 Big Data 5Vs – Volume(容量) — Lake House & Database

如果有疑惑说:资料湖与资料仓储该如何选择呢? 其实它们并不冲突。 因为存放的资料不同,可以做的分析也...

Unity与Photon的新手相遇旅途 | Day13-攻击优化

今天的内容为把之前做的攻击有个完整的效果和解决了判断的Bug。 更正EnemyAI程序码的 87为O...

Day 24 | 在flutter 中串接 restful api - MobX的非同步操作

那今天就来让这个非同步资料透过MobX 来串接到画面上: 首先一样建立一个 UsersViewMod...