day22 热流sharedFlow

在flow那篇我们了解到flow的特性,尤其是每次collect都会创建新的实例,但在某些use case却不适用,而kotlin为此推出了shareflow和stateflow

可以先了解到stateflow是继承自shareflow的,所以我会先讲shareflow,再聊到stateflow

相对於flow,shareFlow和stateFlow都是hot flow,意即flow需要有人collect才会执行且每次都会建立新的实例,而shareflow不同,它可以让多追踪者接收同一则讯息
A SharedFlow that represents a read-only state with a single updatable data value that emits updates to the value to its collectors.

定义

A hotFlow that shares emitted values among all its collectors in a broadcast fashion, so that all collectors get all emitted values. A shared flow is called hot because its active instance exists independently of the presence of collectors.

首先,SharedFlow 可以同时有多个 collecter,在 SharedFlow /StateFlow中 collecter 称为 subscriber(订阅者)。而所有的订阅者可以追踪同一个实例,如此一来,他们都可以收到值。

shareflow作为热流,会向所有订阅者发送值,让全部的订阅者能收到全部的值,而他作为热流的特性在於,他可以独立於collector存在

正文

要建立shareflow有两种方式

private val _mShareFlow = MutableSharedFlow<Int>()
val mShareFlow = _mShareFlow.asSharedFlow()

//or
val extensionShareFlow = flow{
    emit(1)
}.shareIn(viewModelScope, SharingStarted.WhileSubscribed(),replay = 0)

可以注意到,在shareIn多几个值要设定,带他其实就是在里面实作MutableSharedFlow

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T> {
    val config = configureSharing(replay)
    val shared = MutableSharedFlow<T>(
        replay = replay,
        extraBufferCapacity = config.extraBufferCapacity,
        onBufferOverflow = config.onBufferOverflow
    )
    @Suppress("UNCHECKED_CAST")
    val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
    return ReadonlySharedFlow(shared, job)
}

那我们来讲讲那些东西到底什麽意思

replay

A shared flow keeps a specific number of the most recent values in its replay cache.
shareflow可以设置回拨快取,每个追踪者会先拿到快取资讯,再拿到最新的值
我们可以透过建构子的replay设置replay cahce max size

private val _dsd = MutableSharedFlow<Int>(replay = 100)
val dsd = _dsd.asSharedFlow()

我们同样也可以透过replayCache拿到整个回拨快取资料

_dsd.replayCache // return List<Int>

replayCache:List< T> ,当有一个新的 collector 加入时,就会根据设定的 replay 数量来把最後的项目广播给新的 collector 上。

也能够清空回拨快取

_dsd.resetReplayCache()

scope

这边是带入我们在这个类别中所建立的 CoroutineScope,目的是用来定义共享开始的 Coroutine Scope。

started 开始

started 则是用来决定 SharedFlow 什麽时候会开始启动。而在这边有三个选项可供选择

  • Eagerly:当 SharedFlow 建立起来之後,就会立刻起动,而且永不停止。
  • Lazily:等到第一个追踪者加入的时候,才会启动,同样也是永不停止。
  • WhileSubscribed:预设是当第一个订追踪者加入的时候就会启动,当最後一个追踪者取消的时候就会停止。所以与前面两个的第一个不同点就是它会停止。

假设需求总是监听位置更新并在应用程序来自後台时在屏幕上显示最後 10 个位置,我们使用replay值 10 将最後 10 个发出的项目保留在内存中,并在每次收集器观察流时重新发出这些项目。为了保持底层流始终处於活动状态并发出位置更新,SharingStarted.Eagerly即使没有收集器,也可以使用策略来监听更新。

WhileSubscribed参数

  • stopTimeoutMillis:没有追踪者时,多久後停止

  • replayExpirationMillis:几秒後reset replay

最开始的时候讲了,shareflow会一直独立於订阅者存在,但我们能够透过WhileSubscribed让shareflow在没有订阅者的时候停止,但问题来了,如果我们在fragment追踪,萤幕旋转了怎麽处理?

根据官方博文,他们建议以WhileSubscribed(5000L),让shareflow变成在没有订阅者的5秒才停止

extraBufferCapacity

A replay cache also provides buffer for emissions to the shared flow, allowing slow subscribers to get values from the buffer without suspending emitters.
The buffer space determines how much slow subscribers can lag from the fast ones. When creating a shared flow, additional buffer capacity beyond replay can be reserved using the extraBufferCapacity parameter.

回播快取也发射提供缓冲区,让慢速订阅者可以从buffer换取资料,而不必让发送区挂起

缓冲区会决定慢速订阅者和快速订阅者的差距,建立shareflow时,超过replay的资料可以用buffer缓冲。

BufferOverflow

A shared flow with a buffer can be configured to avoid suspension of emitters on buffer overflow using the onBufferOverflow parameter, which is equal to one of the entries of the BufferOverflow enum. When a strategy other than SUSPENDED is configured, emissions to the shared flow never suspend.

有缓存的shaerflow可以设置是否要再缓存区溢出时suspend,当设置了SUSPEND以外的列举时,缓存溢出将不会挂起

SUSPEND - the upstream that is sending or is emitting a value is suspended while the buffer is full. 当有个订阅者会执行耗时任务,且再缓存满了的时候还在执行,这时上流的emit会被挂起,直到他准备好接收新值。

DROP_OLDEST - 丢弃最旧的值

DROP_LATEST- 丢弃最新的值
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-buffer-overflow/index.html

缓存溢出仅会在至少一个追踪者还没准备好接收最新值的时候溢出,如果没有追踪者,仅会REPLAY存储最新的值,并且永远不会发生缓存溢出,本质上在没有追踪者的情况下,行为类似於DROP.OLDEST, but the buffer is just of replay size (without any extraBufferCapacity).

缓冲和快取行为


SharedFlow 可以同时将快取内的资料同时传送给所有 collect,
我们可以根据需求设定快取的大小,当有新的订阅者加入时,就会把最後几笔资料传送给订阅者,而当更新的资料传送完毕之後,就会跟其他的订阅者一起接收新的资料。

//VIEWMDOEL
private val _mShareFlow = MutableSharedFlow<Int>(
    replay = 2,
    extraBufferCapacity = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val mShareFlow = _mShareFlow.asSharedFlow()

init{
    viewModelScope.launch {
        var count = 1
        while(true){
            _mShareFlow.emit(count)
            count++
            delay(1000L)
        }

    }
}
lifecycleScope.launch {
    launch  (Dispatchers.IO){
        repeatOnLifecycle(Lifecycle.State.STARTED){
            viewModel.mShareFlow.collect {
                Timber.d("FIRST collect $it")
            }
        }
    }
    delay(5000L)
    launch (Dispatchers.IO){
        repeatOnLifecycle(Lifecycle.State.STARTED){
            viewModel.mShareFlow.collect {
                Timber.d("SECOND collect $it")
            }
        }
    }

}

上面用看的应该是看不懂,我写了个范例

FIRST collect 1
FIRST collect 2
FIRST collect 3
FIRST collect 4
FIRST collect 5
SECOND collect 4//只有REPLAY的值
SECOND collect 5
FIRST collect 6
SECOND collect 6
FIRST collect 7
SECOND collect 7
lifecycleScope.launch {
    launch  (Dispatchers.IO){
        repeatOnLifecycle(Lifecycle.State.STARTED){
            viewModel.mShareFlow.collect {
                Timber.d("FIRST collect $it")
            }
        }
    }
    launch {
        repeatOnLifecycle(Lifecycle.State.STARTED){
            viewModel.mShareFlow.collect {
                delay(5000L)
                Timber.d("STUCK collect $it")
            }
        }
    }

}
FIRST collect 1
FIRST collect 2
FIRST collect 3
FIRST collect 4
FIRST collect 5
STUCK collect 1//collect收到1才delay
FIRST collect 6
FIRST collect 7
FIRST collect 8
FIRST collect 9
FIRST collect 10
STUCK collect 3//collect印出1之後,收到extrabuffer的3再delay,此时extrabuffer 3 replay 4 5 emit 6
FIRST collect 11
FIRST collect 12
FIRST collect 13
FIRST collect 14
FIRST collect 15
STUCK collect 8//同上emit 11

连结

Kotlin github io shareflow

jast

should-we-choose-kotlins-stateflow-or-sharedflow-to-substitute-for-android-s-livedata


<<:  DAY25-问答页面设计

>>:  Day25-D3 基础图表:折线图+ d3.bisector( )与 d3.defined( )

Day12 Sideproject(作品集) from 0 to 1 -docker後端

昨天介绍完前端使用 今天来简介一下後端 平时公司还是维持 根据建置手顺去建置专案,导致环境问题光解决...

番外篇 - NestJs - Guard

NestJs - Guard 验证分为两种,登入权限验证以及角色验证 举例说明:我们将 API 分为...

[Day24] 求值策略

先 po 文.. Call by Value 传值 Call by Reterence 传参照 Ca...

Day 06 | Dart基本介绍 - private & static

在昨天的文章中,讲到了类别基础用法包含了「宣告」、「建构子」及「实体化」,今天会继续说明Dart c...

Day 7 情报收集 - Information Gathering (Network & Port scanners)

今天子标题是Network & Port scanner,其实跟前面介绍的几个工具功能好像有...