Day17:Flow,一个非同步的资料流。 First Look

What is Flow?

Flow 是用来处理非同步的资料流的一种方式,它会按照发射 (emit) 的顺序来执行。

An asynchronous data stream that sequentially emits values and completes normally or with an exception.

将资料透过 Flow 的方式发送,在资料接收之前,这笔资料都不会被执行、运算,我们甚至可以在执行之前透过一些函式来将这些资料转化成我们所希望的样子。

跟 Channel 不太一样的地方是,Channel 一次取出一个值,而 Flow 取出的是一个流(Stream)。

换句话说,使用 Channel 时,我们必须要呼叫多次的 receive() 来接收 send() 传送出来的值,而 Flow 只需要使用 collection{} 就可以处理所有在这个资料流。

Simple Flow

Kotlin 提供多种方式建立 Flow ,在这边我们使用 flow{ ... } 来建立。

fun flow1(): Flow<Int> = flow {
      println("Flow started")
      repeat(10){
        delay(100)
        emit(it)
    }
}

我们可以发现 flow{ ... } 建立出来的函式不是一个 suspend 函式,原因就是 Flow 是等到我们接收的时候才会去执行,所以接收的函式才会是一个 suspend 函式。

另外,不知道你有没有注意到,在 flow{} 里面包含着一个 suspend 函式 (delay()),也就是说这个 lambda 函式也是一个 suspend 函式。flow{} 的签名如下:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

可以发现在 flow{} 包含了一个 suspend 函式 FlowCollector<T>.() -> Unit

在这个 flow {} 中,我们最後使用了 emit() 将整数传进 stream 中。

这边的 emit() 其实就是 FlowCollector 的函式

public interface FlowCollector<in T> {

    /**
     * Collects the value emitted by the upstream.
     * This method is not thread-safe and should not be invoked concurrently.
     */
    public suspend fun emit(value: T)
}

如何取得 Flow 的资料呢?

从上方的范例我们知道,回传的是一个 Flow<T> 的值,那麽我们来看一下这个 Flow 的定义:

public interface Flow<out T> {

    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}

在 Flow 中,里面只有一个函式 ( collect() ),其中有一个参数为 FlowCollector<T> ,这个型别就是我们在 flow{} 中建立的 lambda 的型别。

所以到这边我们就大致了解了, Flow 是利用 FlowCollector<T> 来传送资料,用 emit() 把资料塞进去,然後用 collect 把 资料取出。

取出资料的范例如下:

fun main() = runBlocking {
    val flow = flow1()
    flow.collect { value -> println(value)}
}
Flow started
0
1
2
3
4
5
6
7
8
9

Flow is cold stream

我们在前面有提到, Flow 的资料是只有在接收的时候才会执行,也就是说,同样的资料如果存放在 stream 中,我们可以多次接收,在每次呼叫接收的时候,都会重新执行一次 flow{} 里面的程序码。

我们上例稍作修改:

fun main() = runBlocking {
    val flow = flow1()
    flow.collect { value -> println(value)}
		flow.collect { value -> println(value)}
}
Flow started
0
1
2
3
4
5
6
7
8
9
Flow started
0
1
2
3
4
5
6
7
8
9

则结果的确是会得到两次一样的资料。

Flow Builder

Flow 有提供多种建立的方法,如下:

  • flow{}

这就是我们在上面范例所使用的 Builder,我们在这个 builder 中,使用 emit() 把资料传进 stream 中。

  • flowOf(...)

flowOf 有两种不同的实作,简单来看就是一个是一个值,另外一个是处理多个值。重点是,flowOf 其实在里面的实作也是使用 flow{} 来建立 Flow。

public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    for (element in elements) {
        emit(element)
    }
}
public fun <T> flowOf(value: T): Flow<T> = flow {
    /*
     * Implementation note: this is just an "optimized" overload of flowOf(vararg)
     * which significantly reduces the footprint of widespread single-value flows.
     */
    emit(value)
}

使用方式:

flowOf(1,2,3,4)
  • asFlow()

如果你的资料是一个 Collection,那麽我们就可以使用 asFlow() 来将 Collection 转成 Flow

底下为 asFlow() 的实作,可以看到有各式各样的实作

public fun <T> Array<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

public fun IntArray.asFlow(): Flow<Int> = flow {
    forEach { value ->
        emit(value)
    }
}

public fun LongArray.asFlow(): Flow<Long> = flow {
    forEach { value ->
        emit(value)
    }
}

public fun IntRange.asFlow(): Flow<Int> = flow {
    forEach { value ->
        emit(value)
    }
}

public fun LongRange.asFlow(): Flow<Long> = flow {
    forEach { value ->
        emit(value)
    }
}

@FlowPreview
public fun <T> (() -> T).asFlow(): Flow<T> = flow {
    emit(invoke())
}

@FlowPreview
public fun <T> (suspend () -> T).asFlow(): Flow<T> = flow {
    emit(invoke())
}

public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

public fun <T> Iterator<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

public fun <T> Sequence<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

小结

Flow 是一种非同步处理资料的方式,使用 Flow 其实就是把资料丢进一个流(Stream)中,而 Flow 里面的程序码只有在呼叫接收的时候才会调用,所以建立 Flow 的函式不需要为 suspend 函式。

本篇文章我们简单介绍了Flow ,下一篇文章我们会多讲一些深入的内容。

参考资料

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html

特别感谢

Kotlin Taiwan User Group
Kotlin 读书会


<<:  Day 22 - Tailwind Plugin 使用 (一) => Aspect Ratio、Line Clamp

>>:  [day7]呼叫永丰API及流程串接整理

DAY19 浅谈深度学习

我们在前面算是完整的介绍了使用机器学习的方法来做资料分析,在剩下最後11天的时间,我想把自己在暑假所...

[13th][Day1] 前言

今次参与战斗是为了挑战自我。 在生活中挤出时间,利用下班的时间好好充实自己。 在加入新团队後,con...

Day 30 - 结语 : 从"预见到坚持"

终於来到今年铁人赛的最後一篇了~虽然都有预先写稿的习惯与准备, 但这次还真的是忙到最後一天才能抽空...

[Day27] Esp32 + IFTTT + Google Sheet

1.前言 今天要讲解如标题一样,Google Sheet是Google所开发的试算表,所以我们要用G...

Day19:SwiftUI—Button

前言 今天来学习SwiftUI 的按钮 — Button。 实作 宣告一个 text 按钮 打开一个...