Day19:Flow 准备好输出了吗?使用 Terminal operators 产生结果吧。

Flow 经过 Intermediate operators 将资料经过处理之後,最後一步则是要把资料输出,而将资料输出则是要透过 Terminal operators。

而除了我们在之前所介绍的 collect 外,Terminal operators 还有其他的成员。那麽,本篇文章将一一介绍这些 Terminal operators。

Terminal operators (终端运算子)

首先,Terminal operators 我这边先暂时翻译成「终端运算子」,如果有更好的建议可以跟我说喔。

这边一样使用这个简单的范例:

fun flow(): Flow<Int> = flow {
      println("Flow started")
      repeat(10){
        delay(100)
        emit(it)
    }
}

collect

我们在前面的范例中, collect 使用蛮多次的,我们看一下它的签名:

inline suspend fun <T> Flow<T>.collect(crossinline action: suspend (T) -> Unit)

在 collect 的参数中,只有一个 actioin: suspend(T) -> Unit ,在调用 collect 的时候可以同时执行这里面的内容。这边的 action 是一个 suspend 函式。

使用范例如下:

我们可以直接使用 collect 来把 Flow 里面所有的资料拿出来做最後的处理。

fun main() = runBlocking {
    val flow = flow()
    flow.collect { value -> println(value)}
}

另外,有另外一种 collect 的方式,定义如下:

suspend fun Flow<*>.collect()

从上面的定义得知,这种的 collect 函式不需要带任何的参数。当我们调用 collect 的时候,一样会把 Flow 上的所有资料按照我们的设定来处理。

但是,这个函式是没有回传值的,那麽我们该如何使用这个函式呢?

Flow 提供了四个函式 onStartonEachonCompletecatch 。这几个函式能够在执行 collect 之前执行,其实它们也是 Intermediate operators ,将上面的范例改成:

fun main() = runBlocking {
    val flow = flow()
		flow.onEach { println(it) }
        .onCompletion { println("done") }
        .collect()
}
Flow started
0
1
2
3
4
5
6
7
8
9
10
done

我们看一下这几个函式:

onStart

在 Flow 的最前面执行,也就是说当执行 collect 的时候,那麽就会执行 onStart

这边的重点是,不管 onStart 的顺序,它都是会在第一个执行。

public fun <T> Flow<T>.onStart(
  action: suspend FlowCollector<T>.() -> Unit
): Flow<T>
fun main() = runBlocking {
    val flow = flow()
		flow.onStart{ println("start") }
        .onEach { println(it) }
        .onCompletion { println("done") }
        .collect()
}
fun main() = runBlocking {
    val flow = flow()
		flow.onEach { println(it) }
        .onCompletion { println("done") }
				.onStart{ println("start") }
        .collect()
}

这两段都会输出:

start
Flow started
0
1
2
3
4
5
6
7
8
9
10
done

onCompletion

与 onStart 相反,onCompletion 是会在所有的动作执行完毕之後才会呼叫的。

public fun <T> Flow<T>.onCompletion(
    action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T>

在 onCompletion 中,action 的型别为 suspend FlowCollector<T>.(cause: Throwable?) -> Unit

所以我们也可以在 onCompletion 中 使用 emit

fun main() = runBlocking {
    val flow = flow()
		flow.onStart{ println("start") }
        .onEach { println(it) }
        .onCompletion { emit("done") }
        .collect{ println(it) }
}
start
Flow started
0
1
2
3
4
5
6
7
8
9
10
done

onEach:它是用来走访每一个 Flow 的元素的,如同我们上方的范例

这边要特别注意的, onEach 会根据摆放的位置会接收到不同的元素,所以摆放的位置可能会影响其结果。

public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>

范例1: onEach 在 map 前方

fun main() = runBlocking {
    val flow = flow()
		flow.onEach { println(it) }
        .map { it * 3 }
        .collect()
}
Flow started
0
1
2
3
4
5
6
7
8
9

范例2: onEach 在 map 後方


fun main() = runBlocking {
    val flow = flow()
		flow.map { it * 3 }
				.onEach { println(it) }
        .collect()
}
Flow started
0
3
6
9
12
15
18
21
24
27

从这两个结果我们可以得知,onEach 会根据摆放的位置而有不同的结果,在每个 Intermediate operator 执行过後, Flow 里面的内容就会更改,所以使用 onEach 也会有不同的值。

catch:用来捕捉异常状态的函式。

public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(Throwable) -> kotlin.Unit): Flow<T>

直接看一下这个范例:

fun main() = runBlocking {
    val flow = flow()
		flow.map { it * 3 }
				.onEach { 
						println(it)
						if(it>10) throw RuntimeException("large than 10")
				 }
        .collect()
}

在这个范例中,我们在 onEach 的地方加上了 if(it>10) throw RuntimeException("large than 10") 也就是说,当 Flow 里面的元素大於 10 的时候,就会喷 RuntimeException。如果我们直接执行这段程序码,会发现的确在 12 的时候抛出了 RuntimeException,而程序也就中断了。

Flow started
0
3
6
9
12
Exception in thread "main" java.lang.RuntimeException: large than 10
...

我们可以使用 catch 来捕捉异常,如下:

fun main() = runBlocking {
    val flow = flow()
		flow.map { it * 3 }
				.onEach { 
						println(it)
						if(it>10) throw RuntimeException("large than 10")
				 }
				.catch{ println(it) }
        .collect()
}
**Flow started
0
3
6
9
12
java.lang.RuntimeException: large than 10**

single

collect 会把所有在 Flow 里面的元素都列出来, single 则是相反,它只能列出一个元素。

如果 Flow 里面没有元素,会抛出 NoSuchElementException 。如果元素超过 1 个,则是会抛出 IllegalStateException

suspend fun <T> Flow<T>.single(): T

所以我们可以搭配 take(1) 来确保 Flow 里面的元素只有一个。

fun main() = runBlocking {
		val flow = flow
    val value = flow
        .map { it * 3 }
        .take(1)
        .single()
    println(value)
}
Flow started
0

reduce

在这边的 reduce 的意思是渐少元素,这是什麽意思呢?在 reduce 中我们会有两个值,一个是前一个运算得来的值,另一个是现在的值,会一个一个元素走访过。随着 index 往後移动,剩余的元素也就越来越少了,直到全部的元素都走过了。

suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (S, T) -> S): S

范例:

fun main() = runBlocking {
		val flow = flow
    val reduce = flow
        .reduce { it, it2 ->
            println("$it + $it2")
            it + it2
        }
    println(reduce)
}

Flow started
0 + 1
1 + 2
3 + 3
6 + 4
10 + 5
15 + 6
21 + 7
28 + 8
36 + 9
45

reduce


fold

与 reduce 类似,不过它有一个初始值。

inline suspend fun <T, R> Flow<T>.fold(initial: R, crossinline operation: suspend (R, T) -> R): R

范例如下:

fun main() = runBlocking{
	val fold = (1..4).asFlow()
	        .fold(1) { it1, it2 -> it1 * it2 }
	    println(fold)
}
24

toList

最後介绍的是 toList ,我们可以在最後将 Flow 转成 Collection

suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>

范例如下:

fun main() = runBlocking{
		val reversed = flow().map { it * 3 }
        .catch { println(it) }
        .onCompletion { println("Done") }
        .toList(ArrayList())
				.reversed()
    println(reversed)
}
Flow started
Done
[27, 24, 21, 18, 15, 12, 9, 6, 3, 0]

小结

Flow 在 Terminal operators 之前都不会执行,我们可以在 Terminal operators 之前使用不同的 Intermediate operators ,而 Intermediate operators 可以使用多个,但是最後的 Terminal operators 只能呼叫一次。 Terminal operators 针对不同的用途有提供不同的函式,有直接传回所有 Flow 内容的 collect ,有只能传回一个值的 single ,如果希望在最後可以针对所有的 Flow 元素进行处理则可以使用 reduce 以及 fold ,最後我们当然可以将 Flow 转成 list,只要呼叫 toList 即可。

特别感谢

Kotlin Taiwan User Group

Kotlin 读书会


<<:  [Day23] Esp32 + LINE

>>:  [ Day 09 ] State 是什麽?

30天程序语言研究

今天是30天程序语言研究的第二十一天,由於资料库开发的部分我是负责前端所以想说顺便多练习一下其他开发...

Day32. 使用Decorator Pattern 实作摊提

当我们要做开立发票、发票折让的时候,或者对第三方如 POS 整合系统要同步资料时,可能会遇到需要使用...

Day 20-制作购物车系统之建立Routes&Controller

购物车後端的部分终於要结束啦~ 以下内容有参考教学影片,底下有附网址。 (内容包括我的不专业解说分析...

Day22 [实作] 一对一视讯通话(2): Signaling server

今天我们要实作 Signaling server 的部分: 建立文件 # 进入要放专案的路径 ❯ c...

Dungeon Mizarka 001

第一人称地城冒险游戏介绍 第一人称地城冒险游戏(FP Dungeon Crawler, FPDC)类...