day18 kotlin - flow基本操作

我会从文档中挑几个重要的出来讲,但不会是全部,我会着重一些基本的中间操作,异常,取消,dispatcher
文档
doc

首先,之前看过的影片分享了RxJava和Flow相同功能操作符的简化,可以发现真的可以少记很多东西

今天就是要来讲讲,flow的操作符

producer

(1..3).asFlow() 

或是

flow{
    emit(1)   
    emit(2)   
    emit(3)   
}

或是

listOf(1,3,6).asFlow()

或是

flowOf(1,4,6)

建构式其实没啥好讲的,快速带过

intermediary

transform

他做为intermediary,我觉得蛮特别的是它可以emit其他值

(1..3).asFlow() // 一个请求流
    .transform { request ->
        emit("Making request $request")
        emit(performRequest(request))
    }
    .collect { response -> Timber.d(response) }
Making request 0
response 0
Making request 1
response 1
Making request 2
response 2

但有个小小混淆的点

flow{ //producer
    emit(0)
    emit(1)
    emit(2)
}
    .transform { request -> //intermediary
        emit("Making request $request")
        emit(performRequest(request))
    }
    .collect { response -> Timber.d(response) } //consumer

这样的话会collect几个results呢?
和上面的范例一样6个,请记住,尽管你看到了很多emit,但producer和intermediary的emit操作并不会重复

buffer

为了方便比较时间,我打delay从100改到1000
before

flow {
    for (i in 1..3) {
        delay(1000) 
        emit(i) 
    }
}
.collect { response ->
    delay(1000)
    Timber.d(response.toString())
}
//15:24:16.819
//15:24:18.825
//15:24:20.831

after

flow {
    for (i in 1..3) {
        delay(1000)
        emit(i)
    }
}
.buffer()
.collect { response ->
    delay(1000)
    Timber.d(response.toString())
}
//15:25:18.267
//15:25:19.275 
//15:25:20.282


加了buffer後,collect和emit会在不同的coroutine运作,而中间的emit会透过channel的send在不同的coroutine交流,但这些都不需要我们实作,我们只需要加buffer就好,棒棒

jast

flowOn

lifecycleScope.launch {
    (0..3).asFlow()
        .collect { response ->
            Timber.d(response.toString())
        }
}

由於 collect 是在main thread调用的,那整个flow也会在main thread执行。 这是快速运行或异步代码的理想默认形式,它不关心执行的上下文并且不会阻塞调用者。

尽管他不会阻塞调用者,但我们也会在某些情况下切换dispatcher,例如需要cpu的大量计算,我们可以用flowOn,而不是withContext
这篇有讲到会报什麽错

(1..10).asFlow()
    .map { Timber.d( "map ${Thread.currentThread().name}" ) }
    .collect { 
        Timber.d( "collect ${Thread.currentThread().name}" )
    }
// map main
// collect main
// map main
// collect main
// map main
// collect main

(1..10).asFlow()
    .map { Timber.d( "map ${Thread.currentThread().name}" ) }
    .flowOn(Dispatchers.Default)//只影响这行以上的代码
    .collect { 
        Timber.d( "collect ${Thread.currentThread().name}" )
    }
//map DefaultDispatcher-worker-2
// collect main
//map DefaultDispatcher-worker-2
// collect main
//map DefaultDispatcher-worker-2
// collect main

catch

非常重要的异常处理来了,尽管try/ catch能处理collect的Exception,但在emit却不应使用try catch

Flows must be transparent to exceptions and it is a violation of the exception transparency to emit values in the flow { ... } builder from inside of a try/catch block. This guarantees that a collector throwing an exception can always catch it using try/catch as in the previous example.

简单说在建构式用try/catch违反flow对异常的公开性,使用catch,可以保证Exception的公开性,并允许封装异常处理

开发者可以在catch里面重新throw Exception,用emit传送值出去,或看你想怎麽处理

(1..10).asFlow()
    .catch {  } //只能处理这行以上的代码
    .collect {  }

那如果在collect时跑出异常怎麽办?
不怕,我直接借文档来告诉你,透过将collect的动作搬到onEach,就可以用catch处理了

simple()
    .onEach { value ->
        check(value <= 1) { "Collected $value" }                 
        println(value) 
    }
    .catch { e -> println("Caught $e") }
    .collect()//注意这里不是lambda了

如果不喜欢这种做法,请记得collect可以用try/catch

val ff = (0..3).asFlow()
try{
    ff.collect{}
} catch {

}

cancellable

从前面的范例大约可以看出有不同的flow建构式,而他们在ensureActive的处理上有些许不同

这个会在每次emit都ensureActive一次

flow { 
    for (i in 1..5) {
        println("Emitting $i") 
        emit(i) 
    }
}

但这个不会

(1..10).asFlow()

意思你在某个条件下想要提早结束flow,第二个不仅会全部跑完,还会在跑完後才告诉你这个flow被取消了
但我们也不需要自己动写ensureActive,我们只需要加个cancellable()

//跑到结束
(1..5).asFlow().collect {}

//会检查是否取消
(1..5).asFlow().cancellable().collect {}

consumer

  • collect-最基本的
  • toList-将结果转成list
  • single-只拿到一个,0个或超过一个都会报错
  • first-只拿第一个

reduce

可以透过中间计算,并将值约束到单个

val sum = (1..5).asFlow()
    .map { it * it } // 数字 1 至 5 的平方                        
    .reduce { a, b -> 
        println(a)
        a + b 
    } // 求和(末端操作符)
println(sum)
//a 1
//a 5
//a 14
//a 30

//sum 55

其他的其实也很重要,但我能介绍的有限,而且以基本操作已经足够了,生鱼的我认为有空时或是要佣到时,再去了了解就好

连结

必看

文档
doc

jast


<<:  Day18-Session 管理(二)

>>:  予焦啦!Golang 执行期的锁

[经典回顾]未知病毒入侵金融业事件纪录

防毒软件不断推陈出新,病毒也变化多端... 站在 Blueteam 位置,守住资讯安全防线的难度也飞...

Day_14 : 让 Vite 来开启你的Vue 之 Composition API

Hi Dai Gei Ho~ 我是Winnie~ 在今天的内容中,此篇文章将要来说 Composit...

Computer Typing

The WPM stands for words per minute, and it is a m...

Day 1 : 前言与DevOps

前言 大家好,我是Lufor,第一次参加铁人赛。这是我的主要Blog网址: https://lufo...

Day 9. Compare × Final

Conclusion 呼~到今天为止 9 天过去了,Libraries 之间的比较篇章也到今天告一...