Day14:内建的 suspend 函式,好函式不用吗? (3)

这是我们内建的 suspend 函式第三篇,让我们看看有哪些吧:


joinAll()

还记得 join() 的功能是什麽吗?join() 可以让目前的 coroutine 暂停直到它完成。简单的示范如下:

fun main() = runBlocking {
    val job = launch {
        delay(100)
        println("inner")
    }
    job.join()
    println("outer")
}

→ 呼叫子 coroutine 的 join() 让原本应该要先列印 outer 在列印 inner 的动作,改成子 coroutine 的内容先执行,再执行下面的 println("outer")

所以 join() 的意思就是让该 job 能够加入至目前的执行顺序中。

joinAll() 则是可以同时执行多个 job 的 join() ,也就是说等效於 jobs.forEach{ it.join()}

suspend fun joinAll(vararg jobs: Job)
suspend fun Collection<Job>.joinAll()

→ joinAll() 有两种格式,一种是使用 vararg ,另一种是 Collection 的。

fun main() = runBlocking {
    val jobs = mutableListOf<Job>()
    repeat(10) {
        jobs.add(launch {
            delay(100)
            println("inner: $it: ${Thread.currentThread().name}")
        })
    }
    jobs.joinAll()
    println("outer: ${Thread.currentThread().name}")
}
inner: 0: main
inner: 1: main
inner: 2: main
inner: 3: main
inner: 4: main
inner: 5: main
inner: 6: main
inner: 7: main
inner: 8: main
inner: 9: main
outer: main

→ 如我们所预期的,会先执行全部的子 coroutine ,完成之後才会做执行外层的。

那麽这个就有衍生一个问题,如果其中一个 Job 发生例外,或被取消了呢?

Job 被取消

我们把上面的范例改成:

fun main() = runBlocking {
    val jobs = mutableListOf<Job>()
    repeat(10) {
        jobs.add(launch {
            withTimeout(500) {
                delay(100L * it)
                println("inner: $it: ${Thread.currentThread().name}")
            }
        })
    }
    jobs.joinAll()
    println("outer: ${Thread.currentThread().name}")
}

子 coroutine 每次呼叫 delay() 的时间会随着次数增加,我们使用 withTimeout(500) 将这段程序码包起来,也就是说当超过 500 毫秒的 job 都会被取消。

所以我们可以试想一下上面这个范例会是怎麽的结果,如果每次延迟 100毫秒 * it,那麽会在 it = 6 的时候开始被取消。

inner: 0: main
inner: 1: main
inner: 2: main
inner: 3: main
inner: 4: main
inner: 5: main
outer: main

Job 发生例外

我们将上面这段稍作修改,

class Day14 {
    private val scope = CoroutineScope(Job() + CoroutineExceptionHandler { _, e -> println(e) })
    suspend fun tryException() {
        val jobs = mutableListOf<Job>()
        repeat(10) {
            jobs.add(scope.launch {
                delay(100L)
                yield()
                println("inner: $it: ${Thread.currentThread().name}")
                if (it == 2) {
                    throw RuntimeException("Incorrect")
                }
            })
        }
        jobs.joinAll()
        println("outer: ${Thread.currentThread().name}")
    }
}

→ 当 it = 2 的时候,就会抛出一个 RuntimeException

在 coroutine 中,我们可以使用 CoroutineExceptionHandler 拦截例外。

inner: 5: DefaultDispatcher-worker-3
inner: 6: DefaultDispatcher-worker-6
inner: 4: DefaultDispatcher-worker-7
inner: 7: DefaultDispatcher-worker-5
inner: 0: DefaultDispatcher-worker-8
inner: 3: DefaultDispatcher-worker-2
inner: 2: DefaultDispatcher-worker-4
inner: 1: DefaultDispatcher-worker-1
inner: 8: DefaultDispatcher-worker-6
inner: 9: DefaultDispatcher-worker-7
java.lang.RuntimeException: Incorrect
outer: main

awaitAll()

无独有偶,coroutine 也有针对所有的 Deferred 一起呼叫的 awaitAll()

public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T>
suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T>

→ awaitAll() 有两种格式,一种是使用 vararg ,另一种是 Collection 的。

awaitAll() 会输出一个 List,其中 T 是 Deferred 的回传值。

下面的范例,将两个 async 利用 awaitAll() 计算。

fun main() = runBlocking{
	val deferred = mutableListOf(
	        async {
	            delay(100)
	            println("inner: async1")
	            1
	        },
	        async {
	            delay(150)
	            println("inner: async2")
	            2
	        }
	    )
	    val result = deferred.awaitAll().sum()
	    println("outer: $result")
}

→ 由於 awaitAll() 回传的是 List,所以如果想要把所有 async 里面的值合成一个在输出,就要看你要怎麽合成。如上例,我使用 sum() 来将 awaitAll() 输出的所有整数加总起来。所以最後的结果是:

inner: async1
inner: async2
outer: 3

例外处理

如同 await() ,我们一样是使用 try-catch 做例外处理。

class Day14 {
    private val scope = CoroutineScope(Job()) 

    suspend fun tryAsyncException(): Int {
        val deferred = mutableListOf(
            scope.async {
                delay(100)
                println("inner: async1")
                1
            },
            scope.async {
                delay(150)
                println("inner: async2")
                throw RuntimeException("Incorrect")
                2
            }
        )
        return deferred.awaitAll().sum()
    }
}

fun main() = runBlocking{
	val day14 = Day14()

	try {
        day14.tryAsyncException()
    } catch (e: RuntimeException) {
        println(e)
    }
    println("outer")
}
inner: async1
inner: async2
java.lang.RuntimeException: Incorrect
outer

cancelAndJoin()

还记得上一篇的范例吗?

fun main() = runBlocking {
    val job = launch {
        println("inside: ${Thread.currentThread().name}")
        withContext(Dispatchers.Default) {
            delay(200)
            println("Default: ${Thread.currentThread().name}")
        }
    }
    delay(100)
    job.cancel()
    job.join()
    println("outer: ${Thread.currentThread().name}")
}

→在这边我们使用了 job.cancel() 接着是 job.join()

coroutine 提供了一个更简单 suspend 函式,那就是

cancelAndJoin()

public suspend fun Job.cancelAndJoin() {
    cancel()
    return join()
}

所以我们可以将上面的范例改成:

fun main() = runBlocking {
    val job = launch {
        println("inside: ${Thread.currentThread().name}")
        withContext(Dispatchers.Default) {
            delay(200)
            println("Default: ${Thread.currentThread().name}")
        }
    }
    delay(100)
    job.cancelAndJoin()
    println("outer: ${Thread.currentThread().name}")
}
inside: main
outer: main

小结

coroutine 不愧是 Kotlin 团队开发的,为了让我们的呼叫更精简,有了很多复合的函式,了解这些函式可以让我们的程序更简洁一些。

参考资料

joinAll

awaitAll

cancelAndJoin

特别感谢

Kotlin Taiwan User Group

Kotlin 读书会


<<:  入门魔法 - 变数宣告 let、const、var

>>:  【5】超参数 Batch size 与 Learning rate 的关系实验

Day 14:凯撒密码之Shifting Letters

在开始今天题目之前,先来认识一下凯撒密码 (Caesar cipher) 凯撒密码是一种替换加密技术...

[Day24]-开发GUI程序使用tkinter

建立视窗 视窗元件配置管理员 Pack() 方法 Drid() 方法 功能纽 button 变数类...

[DAY8]制作容器(七)

会发生css路径的问题可能是因为override的部分没有设定好,所以再重作一个container ...

[EXCEL]危险的SUMIFS函数

前言 SUMIFS函数从EXCEL 2010开始出现之後,我很庆幸可以少写一些阵列公式。 这个函数可...

IT铁人DAY 28-Observer 观察者模式

  今天要学习的是观察者模式,它主要的作用是设定一个订阅机制,当被订阅的物件有发生事件时就会去通知所...