Day22:Hot Flow - SharedFlow (Part II)

昨天我们使用了 shareIn 将 Flow 转成 SharedFlow, 我们来研究一下这个函式。

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T>

首先,我们可以知道这是一个 extension function,并且带有三个参数, scopestsrtedreplay ,我们看一下昨天的范例:

class Day22 {
    val scope = CoroutineScope(Job() + Dispatchers.Default)
    fun sharedFlow() = flow {
        repeat(10) {
            delay(100)
            emit(it)
        }
    }.shareIn(
        scope,
        replay = 5,
        started = SharingStarted.WhileSubscribed()
    )
}

scope 这边是带入我们在这个类别中所建立的 CoroutineScope,目的是用来定义共享开始的 Coroutine Scope。

replay 是带入一个整数值,目的是用来当新的订阅者(subscriber)加入时,会根据这个数值把最後的几个值发给它们。

started 则是用来决定 SharedFlow 什麽时候会开始启动。而在这边有三个选项可供选择:

  1. Eagerly:当 SharedFlow 建立起来之後,就会立刻起动,而且永不停止。
  2. Lazily:等到第一个监听者加入的时候,才会启动,同样也是永不停止。
  3. WhileSubscribed:预设是当第一个订阅者加入的时候就会启动,当最後一个监听者离开的时候就会停止。所以与前面两个的第一个不同点就是它会停止。
    • 在 WhileSubscribed 中,有另外两个参数可以使用:
      1. stopTimeoutMillis:当最後一个订阅者离开的时候,需要多久才会停止 Shared Flow。
      2. replayExpirationMillis:当有新的订阅者加入时,会从快取中取得最後的内容,预设是会永远保存这些内容,也就是不管多久有新订阅者加入,都可以取得内容。

注意到,这边所使用的 shareIn 是用来建立一个 SharedFlow,里面的内容是不可变的。会在每次加入新的订阅者之後把内容传给它们。

SharedFlow 是什麽?

SharedFlow 是一个介面,它里面只有定义一个函式: val replayCache: List<T> ,可以猜出 SharedFlow 的作用就是重播它的快取给新加入的订阅者。

interface SharedFlow<out T> : Flow<T> {
    val replayCache: List<T>
}

如果我们需要在已有订阅者的时候要在发射 (emit) 内容呢?

这时候我们可以使用 MutableSharedFlow ,MutableSharedFlow 顾名思义就是可以变动的 SharedFlow,也就是说 MutableSharedFlow 里面的资料,是可以更新的。

我们先看一下 MutableSharedFlow 的介面是什麽样子?

interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
    fun tryEmit(value: T): Boolean
    val subscriptionCount: StateFlow<Int>
    fun resetReplayCache()
}

在这个介面中,它实作了两个介面,一个是我们前面所提到的 SharedFlow 另一个则是 FlowCollector

SharedFlow 的功能如刚刚所说,是用来发送快取的内容给新加入的订阅者。那 FlowCollector 呢?

我们直接看他的定义

public interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}

嗯,它里面包含了 public suspend fun emit(value: T) ,所以 MutableSharedFlow 就可以任意的发射资料给订阅者。

介绍完这两个介面之後,是时候介绍一下另外三个函式。

  • tryEmit :跟 emit 最大的不同就是,它不是一个 suspend 函式,它会尝试发射资料,如果成功发射不需暂停,就会直接回传 true ,如果需要暂停,则回传 false 并且呼叫原本的 emit。
  • subscriptionCount :回传有多少个订阅者。
  • resetReplayCache :将所有快取内的资料清空。

使用范例

假设我们有一个类别名为 Company,储存资料类型 Employee,如果想要动态的发送 Employee 的更新给所有的订阅者,我们可以这麽设计:(参考)

class Company{
		private val _employees = MutableSharedFlow<Employee>()
		val employees = _employees.asSharedFlow()

		suspend fun addEmployee(employee: Employee){
			_employees.emit(employee)
		}
}

对外暴露的只有 employees 以及 fun addEmployee 这两个项目,在类别 Company 中,只有开放这两个方式来取用/更新 Employee 。对於使用者来说,只需要把 Employee 透过 addEmployee 来更新即可,而外面的使用者不能直接取得 _employees (MutbaleSharedFlow),而是只能取得它不可变动的版本。这样子我们就把资料的变动留在这个类别中。

而这样子的设计,在 Android 的 Best Practice 中到处可见。例如 ViewModel 里面的 LiveData 就是这麽的设计。话说回来,SharedFlow 可以用来取代 LiveData,接下来我先会先介绍 StateFlow,介绍完之後,如果还有时间,我将介绍如何使用 SharedFlow 以及 StateFlow 来替代 LiveData。

小结

SharedFlow 与 Flow 不同,因为它是属於 hot stream,也就是说 SharedFlow 的执行不是像 Flow 一样呼叫 collect 之後就会执行,而在执行完成之後就会立刻结束。 SharedFlow 可以同时有多个 collect,SharedFlow 可以同时将快取内的资料同时传送给所有 collect,在 SharedFlow 中 collect 称为 subscriber(订阅者)。

我们可以根据需求设定快取的大小,当有新的订阅者加入时,就会把最後几笔资料传送给订阅者,而当更新的资料传送完毕之後,就会跟其他的订阅者一起接收新的资料。

使用 shareIn 建立的是一个 SharedFlow,也就是里面的资料是不可更新的,如果要建立可以更新的 SharedFlow,必需要使用 MutableSharedFlow。

参考资料

Reactive Streams on Kotlin: SharedFlow and StateFlow

SharedFlow

特别感谢

Kotlin Taiwan User Group

Kotlin 读书会


<<:  [DAY 14] _最小系统板规划

>>:  [Day12 - React Native] 为你的 APP 加入 icon - Android

[Day 17] IOCP 实作

前言 今天简单实践了最基本的 IOCP http server , 原则上是用我第二天写的 one2...

[Day23] Angular 简介

呼~~终於写完所有躲在浏览器後面的东西了,现在我们已经架好了主机、布好了 API 程序、装好了 My...

用 Python 畅玩 Line bot - 06:Image Message

现在我们可以来尝试能对收到的 Image message 做怎样的操作,我们可以使用line_bot...

[Android Studio] -- Day 3 Activity练习

前言 今天将针对activity的跳转来复习复习 正文 这次采用bundle来传值,并区分start...

Day8 Android - 切换页面(Intent)

intent可以使一个Activity切换至另一个Activity,而一个application可能...