除了方便好用的 operator 之外,RxJava 还有一个非常重要的机制:非同步处理。 RxJava 的非同步处理机制可以让我们很轻松的切换不同的执行绪,但是在使用上常常会有一些困难,像是某项任务没有执行在我预期的执行绪上,或是烦恼说切换执行绪的程序码应该要放在哪,今天会来跟大家分享我的用法以及看法。
执行绪切换的语法只有两个:observeOn
跟 subscribeOn
,以一般的准则来说,想要指定上游的执行绪,就使用 subscribeOn
,相反的,想指定下游的执行绪,就使用 observeOn
,如果都没有指定,就是跑在现在正在运行的这条执行绪上。这些原则看起来很简单,但是有时候我们会想做一些比较复杂的操作,写完了之後才发现,这些任务并没有运行在我预想中的执行绪上面,下面来举一些我平常会遇到的使用案例:
注:以下这些程序码都是在单元测试的环境下执行的。
@Test
fun test() {
printCurrentThread("Start")
Observable.just(1)
.map {
printCurrentThread("first map")
it + 1
}
.subscribeOn(Schedulers.computation())
.map {
printCurrentThread("second map")
it + 1
}
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe {
printCurrentThread("subscribe")
}
Thread.sleep(1000L)
}
private fun printCurrentThread(message: String) {
println("Thread: ${Thread.currentThread().name}, $message")
}
根据上面的推论,如果我要指定上游的执行绪的话,就使用 subscribeOn,因此,first map 会执行在 Computation,second map 会执行在 IO ,但是结果如下:
Thread: Test worker, Start
Thread: RxComputationThreadPool-1, first map
Thread: RxComputationThreadPool-1, second map
Thread: RxSingleScheduler-1, subscribe
second map 还是执行在 Computation!这表示第二个 subscribe 是没用的,所以如果真的要让 second map 执行在 IO 的话,应该要这样写:
@Test
fun test() {
printCurrentThread("Start")
Observable.just(1)
.map {
printCurrentThread("first map")
it + 1
}
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io()) // 在前面安插一个 observeOn
.map {
printCurrentThread("second map")
it + 1
}
.observeOn(Schedulers.single())
.subscribe {
printCurrentThread("subscribe")
}
Thread.sleep(1000L)
}
把原来的 subscribeOn 拿掉,并在更前面加上 observeOn 才能达到原来想做的事。
使用 subscribeOn 的话,照理来说 Observable 的源头就是会在我指定的执行绪上执行了对吧?但很遗憾的这个假设是错的:
@Test
fun test() {
printCurrentThread("Start")
Observable.just(createItem())
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.subscribe {
printCurrentThread("subscribe")
}
Thread.sleep(1000L)
}
private fun createItem(): Int {
printCurrentThread("create Item")
return 1
}
createItem 是执行在哪条执行绪上呢?应该是 Computation 吧?但结果却是 Test worker !:
Thread: Test worker, Start
Thread: Test worker, create Item
Thread: RxSingleScheduler-1, subscribe
原因是因为在 Observable.just 是当下就会马上执行的函示,所以 createItem
会是现在正在运行的执行绪上执行,如果想要在 computation 上执行,请用 Observable.create。
写到这边突然想到还没介绍到 flatMap,但是没关系,不会 flatMap 的话网路上有很多资源可以学习,或是之後没写满30天的话可以硬塞一天给它。
@Test
fun test() {
printCurrentThread("Start")
Observable.just(1)
.flatMap { number ->
createItemObservable()
.map { number2 ->
printCurrentThread("inner map") // 这是在哪一个执行绪上执行呢?
number2 + number
}
}
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.subscribe {
printCurrentThread("subscribe")
}
Thread.sleep(1000L)
}
private fun createItemObservable(): Observable<Int> {
printCurrentThread("create Item")
return Observable.just(2)
.subscribeOn(Schedulers.io())
}
你觉得 Inner map 会执行在哪条 Thread 上呢? computation 吗?因为 subscribeOn 是影响所有的上游,所以照这样推论的话 flatMap 会因此跑在 computation 上,flatMap 里面的 map 也是同理,但是执行结果是:
Thread: Test worker, Start
Thread: RxComputationThreadPool-1, flatMap
Thread: RxComputationThreadPool-1, create Item
Thread: RxCachedThreadScheduler-1, inner map
Thread: RxSingleScheduler-1, subscribe
inner map 是跑在 “RxCachedThreadScheduler”上,也就是 io,为什麽呢?因为 createItemObservable()
在建立 Observable 的时候已经抢先执行了 subscribeOn(Scheduler.io())
,所以後来的 subscribeOn(Scheduler.computation())
就无法影响这个 Observable 了。
为什麽要特别示范这一段呢?我们其实很常会在不同的类别之间传递 Observable ,然而,在对该 Observable 做 flatMap 时要小心,在後面接 subscribeOn 的话有时并不会影响该 Observable 的执行绪,执行在哪条执行绪上是要看情况而定的。
建议大家可以试试看拿掉 subscribeOn(Scheduler.io())
,会有不同的结果喔!
“RxJava” 是一个为 Java 这语言设计的 “Reactive extension library”,所以在使用上会比较贴近 Java 这个程序语言。但是我们现在写的语言是 Kotlin!Kotlin 有着很多 Java 没有的语言特性,因此可以突破原本的限制,进而让 RxJava 更好使用。其中我最喜欢的就是 Extension function 搭配上 RxJava 了,他可以让我们的语法更加简洁,增加我们的生产力。
@Test
fun test() {
// These two are the same
Observable.just(1)
.subscribeOn(Schedulers.io())
Observable.just(1)
.fromIO()
}
fun <T> Observable<T>.fromIO(): Observable<T> {
return this.subscribeOn(Schedulers.io())
}
藉由 extension function 的帮助,我们可以让冗长的 subscribeOn(Schedulers.io()),变成只有六个字母的 fromIO()。因此 observeOn 也是同理:
@Test
fun test() {
// These two are the same
Observable.just(1)
.observeOn(Schedulers.io())
Observable.just(1)
.toIO()
}
fun <T> Observable<T>.toIO(): Observable<T> {
return this.observeOn(Schedulers.io())
}
觉得 List 的 filterInstance 很好用吗?怎麽不为 Observable 也做一个呢?
inline fun <reified T> Observable<in T>.filterInstance(): Observable<T> {
return this.filter { it is T }
.map { it as T }
}
注:这边经过了两个 Operator ,所以是有优化空间的,如果非常注重效能的话,需要自己去实作 Custom Observable 或是 Costum ObservableTransformer ,然後再使用 extension function 来串接,但是通常来说,这样的解法是够用的。
有时候会需要跟前一个资料组合起来,两个资料一起送出去做其他运算,但是 RxJava 没有这样的 operator ,所以我们也自己做了一个:
fun <T> Observable<T>.pairwise(): Observable<Pair<T, T>> {
return Observable.zip(this, this.skip(1),
BiFunction { a, b -> Pair(a, b) })
}
这边举个例子来让大家比较好理解,假如有一个 Observable 会送出 1, 2, 3, 4, 5,这五个资料,那经过 pairwise 之後,就会送出 (1, 2), (2, 3), (3, 4), (4, 5)
在 Android 开发中,主要分成两种类型的执行绪:Main thread, background thread
一般来说,Model 层处理的事情主要是关於网路连线以及资料库操作。所以在 Model 层中是不能使用 Main thread 的,再加上在 Android app 中,没有特别指定的话,程序码都会跑在 Main thread 上。所以,我们必须在某个地方做执行绪切换,来确保说 Model 层的所有任务都不会跑在 Main thread 上。
那麽问题来了,如果要在 MVVM 中使用 RxJava 来实现的话,要在哪里做执行绪切换呢(也就是 subscribeOn 跟 observeOn)?有一种可能性是,在 Model 层中使用 subscribeOn(Schedulers.io())
,那 ViewModel 层就可以放心的继续使用从 Model 层来的 Observable,而不用担心这个 Observable 被 subscribe 之後会不会卡到 UI thread 。然後呢,在 View 层要 Subscribe 的时候再使用 observeOn(AndroidSchedulers.mainThread())
切换回 Main thread,就可以确保更新 UI 的动作是在 Main thread 完成的。
在同一个专案或是团队里,遵守同样的程序码风格是一件好事,所以上面的规则我们可以套用到整个专案,对吧? 先来看看这个调查:下面是我在 “Android Developer开发读书会”社团发起的投票,用意是调查大家偏好在哪里使用 subscribeOn ,哪里使用 observeOn ,结果分为两大阵营,第三个选项只有四个人投票:
连结:https://www.facebook.com/groups/523386591081376/permalink/4285325838220747/
这统计结果可能代表了大家都有一些预设的偏好,在他们的专案开发中有一样的程序码风格。但其中很有趣的是,有人一次选了两个选项,这是为什麽呢?我没有实际问本人的意见,但我猜他们想表达的是“It depends”,事实上,第一个留言的人就是说看情况而定了。
为什麽是看情况呢?举例来说,要是我们真的想要在不同的情况下决定 Model 层的执行绪,使用我在这篇文章中的作法做得到吗?好像做不太到对吧?因为第一个 subscribeOn 就已经定好了执行绪,之後不管怎麽做都无法切换了。所以如果要在後来做切换的话,可能就要在 ViewModel 层根据条件来决定 subscribeOn 中的 scheduler 要放哪一个。那更之後的任务呢?都只能执行在该执行绪上吗?好像也不应该这麽做,我们很有可能还要视情况使用 observeOn 再切换到 computation 或是其他的 scheduler。
在这个问题的讨论上又回到了一个结论,就是架构设计是没有绝对的,一个完整的架构,其实是很多小小、不同的设计决策所累积出来的,在 Clean architecture 中有一段话是这麽说的:
I see all the little details that support all the high-level decisions. I also see that those low-level details and high-level decisions are part of the whole design of the house.
low-level detail 跟 high-level decision 是相辅相成的,世界上不存在一个完美的架构能够解决所有的问题。
<<: Day 14 「不残而废」单元测试、Code Smell 与重构 - Data Class 篇
>>: AI ninja project [day 14] 文字处理--分类
大家好~ 我是五岁~ 今天来画牛头怪~ 今天会尝试卡通风格~ 目标是一只跟人一样站立的牛头怪,武器是...
IOT https://wolkesau.medium.com/ddf91d896fd8 Maker...
缘由 很多时候我们会使用docker作为环境的控管,确保服务执行时环境是一致的。 但某些时候我们可能...
踏入区块链一年多,回想起参加过的线下聚会、线上课程,都让我觉得很充实。 虽然觉得区块链是一个当前有点...
前言 本文说明帐户已实现损益资讯。 程序实作 程序 # 特定时间已实现损益 profit_loss ...