Day21:Hot Flow - SharedFlow

Flow 是 cold stream,只有在呼叫 Terminal operator 的时候才会执行。也就是说每一个 Flow 都只有一次性的工作,只要呼叫一次 Terminal operator 就会完成这一次的呼叫。例如我们的 Terminal operator 选择 collector ,那麽当某个 Flow 执行 collector 之後,就会把 Flow 里面所有的资料根据我们所设定的动作来执行,最後经过 collector 把结果储存下来,然後就结束这一次的任务。

Flow 也有可以支援 hot stream 的方式,它的名称为 SharedFlow,它可以支援多个 Collector 共享 Flow 的发射的内容 (emitted value),所以对於所有的 Collector,只有一次的执行。

那麽,SharedFlow 是如何共享所有发射的内容呢?它是使用广播(broadcast)的方式。因为 SharedFlow 永远都不会结束等着广播发送内容给所有的 Collector,所以称为 hot stream。

SharedFlow 的介面 interface,如下:


interface SharedFlow<out T> : Flow<T> {
    val replayCache: List<T>
}

由上我们得知 SharedFlow 是继承 Flow 的,其中里面包含一个函式 replayCache:List<T> ,当有一个新的 collector 加入时,就会根据设定的 replay 数量来把最後的项目广播给新的 collector 上。

shareIn

我们可以使用 shareIn 让原本的 Flow 转变成 SharedFlow

class Day21 {
    val scope = CoroutineScope(Job())
    fun sharedFlow(): Flow<Int> = flow {
        println("Flow started")
        repeat(10) {
            delay(100)
            emit(it)
        }
    }.shareIn(
        scope,
        replay = 10,
        started = SharingStarted.WhileSubscribed()
    )
}

在上方中,我们在原本的 flow 底下使用 shareIn 来让这个 flow 转变成 sharedFlow,其中需要带三个参数,第一个是带入一个 CoroutineScope,也就是这个 Flow 所在的 Scope,第二个则是当有新的 collector 加入时,需要重播几项,最後一个参数 started 则是什麽时候开始启动。

  • SharingStarted.WhileSubscribed():表示当第一个 collector 出现的时候就启动,最後一个 collector 消失的时候就立即停止,保留重播的快取。

执行:

@OptIn(InternalCoroutinesApi::class)

fun main() = runBlocking {
    val day21 = Day21()
    val sharedFlow = day21.sharedFlow()

    launch {
        sharedFlow.collect {
            println("(1): $it")
        }
    }
    delay(500)
    launch {
        sharedFlow.collect {
            println("(2): $it")
        }
    }

    println("done")
}

外层的 coroutine 会先执行 500 毫秒,第一个 launch 执行 500 毫秒时外层 coroutine 会结束 delay,接着第二个 launch 也会跟着执行,但是因为第一个 launch 已经在 500 毫秒内接收了一堆内容,所以这时候 sharedFlow 就要把那些内容发给第二个 launch,等到发完之後,接下来的每一笔资料都会同时传给两个 launch。

结果如下:

Flow started
(1): 0
(1): 1
(1): 2
(1): 3
done
(2): 0
(2): 1
(2): 2
(2): 3
(1): 4
(2): 4
(1): 5
(2): 5
(1): 6
(2): 6
(1): 7
(2): 7
(1): 8
(2): 8
(1): 9
(2): 9

虽然我们的 flow 只有发送 10 个值,但是 sharedFlow 不会因为我们发完之後就停了,它会一直处於执行的状态,除非所有的 collector 都消失。(在这边的消失可能是取消或是发生 exception)

另外,这边的 collector 因为都是在执行之後就开始接收内容,所以 collector 在 sharedFlow 就称为 subscriber (订阅者)。

--- 接下篇 ---


<<:  Day 24 : Jenkins 在Build完通知与好用套件

>>:  let / const 细节版

Vue.js 从零开始:Vue CLI 环境说明

Webpack如何产生档案 经过上一篇对於Webpack的介绍後,相信大家都有一定的认识,这边我们用...

JavaScript 函数 | 一级函数

一级函数 (First Class Functions) Everything you can do...

DAY 16 『 改用 xib 进行界面创作 』

storyboard 有三个概念是最容易混淆: xib:实际是个xml文件,xib = XML n...

成为工具人应有的工具包-28 LastActivityView

LastActivityView 今天来认识这个看就知道是看这台电脑上一步做了啥动作的工具! 调查的...

JavaScript学习日记 : Day21 - 数组方法(一)

在前端的日常开发中,数组使用的频率非常高,所以充分熟悉各种常见的方法後,能提升工作的效率。 1. 基...