Flow 是属於 coroutine 范围项目,coroutine 中一个重要的特点可以轻易的切换执行绪,在 Flow 也有这项功能吗?
我们知道要在 Coroutine 中使用不同的执行绪,我们需要切换不同的 Dispatchers ,使用不同的 Dispatchers Coroutine 会在其内部帮我们切换到适当的执行绪/执行绪池。而我们在使用 Dispatchers 的时机点有两个,一个是使用 Coroutine builder (ex: launch, async),另一种方法则是使用 withContext
。
假设我们想与 launch、async 一样在建立的时候就把 Dispatchers 带入,那麽 flow 的 builder 应该就要包含 CoroutineContext 的参数。
fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T>
不过很可惜的,并没有这个参数在其中,所以我们没有办法在建立 flow 的时候同时带入 Dispatchers。
如果第一条路不通,我们试试看第二条路如何?
我们尝试在 flow 里面使用 withContext
将 emit 包起来。
class Day20 {
fun flow(): Flow<Int> = flow {
withContext(Dispatchers.Default) {
repeat(10) {
emit(it)
}
}
}
}
fun main() = runBlocking {
val day20 = Day20()
day20.flow().collect { println(it) }
println("done")
}
尝试执行之後,喷出错误了
错误的内容:Flow invariant is violated。
讯息还告诉我们请使用
flowOn
替代。
我们将上面的范例改成
class Day20 {
fun flow(): Flow<Int> = flow {
repeat(10) {
emit(it)
}
}.flowOn(Dispatchers.Default)
}
再次执行,
0
1
2
3
4
5
6
7
8
9
done
OH~YEAH 通过了。
我们来研究一下 flowOn ,首先我们先看的是它的签名
fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
没错,这边的确就如我们在前面所说的,必须要带入 CoroutineContext。所以如果我们必须要在 flow 中切换执行绪,那麽就必须要使用 flowOn
。
flowOn 除了在建立 flow 的时候可以使用外,我们观察 flowOn 的定义,嗯,它应该也能在 Terminal operators 之前呼叫,因为它的回传值是 Flow
将上方的范例改回,并把 flowOn 加到 collect 之前
class Day20 {
fun flow(): Flow<Int> = flow {
repeat(10) {
emit(it)
}
}
}
fun main() = runBlocking {
val day20 = Day20()
val flow = day20.flow()
flow.map {
println("map: ${Thread.currentThread().name}")
it * 3
}
.flowOn(Dispatchers.Default)
.filter {
println("filter: ${Thread.currentThread().name}")
it % 2 == 0
}
.flowOn(Dispatchers.IO)
.collect {
println("collect: ${Thread.currentThread().name}")
println(it)
}
println("done")
}
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
map: DefaultDispatcher-worker-2
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
collect: main
0
collect: main
6
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
filter: DefaultDispatcher-worker-1
collect: main
12
filter: DefaultDispatcher-worker-1
collect: main
18
collect: main
24
done
跟我们所想的一样,我们可以在 Terminal operator 之前使用 flowOn,而 flowOn 只会对它之前的操作有作用,如上方的 map 以及 filter 都是属於不同的 DefaultDispatchers-worker,然後在 collect 又回到 main。
如果我们没有在 flow 里面呼叫 flowOn 来更改 Dispatchers ,那麽它预设是会使用外层 coroutine 的 Dispatchers。如下:
class Day20 {
val scope = CoroutineScope(Job() + Dispatchers.Default)
...
fun flow2() = scope.launch {
flow {
println("emit: ${Thread.currentThread().name}")
emit(10)
}.collect {
println("collect: ${Thread.currentThread().name} $it")
}
}
}
fun main() = runBlocking {
val day20 = Day20()
day20.flow2()
delay(100)
println("done")
}
emit: DefaultDispatcher-worker-1
collect: DefaultDispatcher-worker-1 10
done
flowOn 的 执行绪是从呼叫它的点带入,预设的执行绪是跟外层相同,如果我们希望能够在使用 flow 的时候更改执行绪,那麽我们就必须要使用 flowOn。
flowOn 它带入的参数一样是 CoroutineContext,所以我们可以带入 Dispatchers,根据不同的 Dispatchers, Flow 会切换至不同的执行绪/执行绪池来执行。 flowOn 除了可以在建立 flow 的时候使用,我们也可以在 Terminal operators 之前呼叫,它会修改在它之前的所有操作的执行绪。使用 flowOn 我们就可以轻松切换执行绪,而不会发生错误。
<<: [Day12] - Django REST Framework 专案建立
我选了python当作主要开发语言 因为我以前有用过python而且很潮 框架部分我选比较主流的Dj...
前言 我们经常会说换位思考,也就是说,身为工程师,如果能清楚知道管理阶级的想法,就能聪明有效率的做事...
我们知道写程序有个阶段就是一个输入、运算处理、输出 网页是由HTML、CSS、Javascript三...
每个人的学习方法各有差异。 这只是我的选择。 Why v-model 在 S3E5 | Vue.j...
请问有没有剪辑软件自带文字转语音(GOOGLE小姐或SIRI小姐)配音功能?不用另外录制。 ...