MultiThreading and Custom extension function.

除了方便好用的 operator 之外,RxJava 还有一个非常重要的机制:非同步处理。 RxJava 的非同步处理机制可以让我们很轻松的切换不同的执行绪,但是在使用上常常会有一些困难,像是某项任务没有执行在我预期的执行绪上,或是烦恼说切换执行绪的程序码应该要放在哪,今天会来跟大家分享我的用法以及看法。

执行绪切换

执行绪切换的语法只有两个:observeOnsubscribeOn,以一般的准则来说,想要指定上游的执行绪,就使用 subscribeOn ,相反的,想指定下游的执行绪,就使用 observeOn ,如果都没有指定,就是跑在现在正在运行的这条执行绪上。这些原则看起来很简单,但是有时候我们会想做一些比较复杂的操作,写完了之後才发现,这些任务并没有运行在我预想中的执行绪上面,下面来举一些我平常会遇到的使用案例:

注:以下这些程序码都是在单元测试的环境下执行的。

1. 切换多次执行绪

@Test
fun test() {
    printCurrentThread("Start")

    Observable.just(1)
        .map {
            printCurrentThread("first map")
            it + 1
        }
        .subscribeOn(Schedulers.computation())
        .map {
            printCurrentThread("second map")
            it + 1
        }
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.single())
        .subscribe {
            printCurrentThread("subscribe")
        }

    Thread.sleep(1000L)
}

private fun printCurrentThread(message: String) {
    println("Thread: ${Thread.currentThread().name}, $message")
}

根据上面的推论,如果我要指定上游的执行绪的话,就使用 subscribeOn,因此,first map 会执行在 Computation,second map 会执行在 IO ,但是结果如下:

Thread: Test worker, Start
Thread: RxComputationThreadPool-1, first map
Thread: RxComputationThreadPool-1, second map
Thread: RxSingleScheduler-1, subscribe

second map 还是执行在 Computation!这表示第二个 subscribe 是没用的,所以如果真的要让 second map 执行在 IO 的话,应该要这样写:

@Test
fun test() {
    printCurrentThread("Start")

    Observable.just(1)
        .map {
            printCurrentThread("first map")
            it + 1
        }
        .subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.io()) // 在前面安插一个 observeOn
        .map {
            printCurrentThread("second map")
            it + 1
        }
        .observeOn(Schedulers.single())
        .subscribe {
            printCurrentThread("subscribe")
        }

    Thread.sleep(1000L)
}

把原来的 subscribeOn 拿掉,并在更前面加上 observeOn 才能达到原来想做的事。

2. Observable 源头的执行绪

使用 subscribeOn 的话,照理来说 Observable 的源头就是会在我指定的执行绪上执行了对吧?但很遗憾的这个假设是错的:

@Test
fun test() {
    printCurrentThread("Start")

    Observable.just(createItem())
        .subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.single())
        .subscribe {
            printCurrentThread("subscribe")
        }

    Thread.sleep(1000L)
}

private fun createItem(): Int {
    printCurrentThread("create Item")
    return 1
}

createItem 是执行在哪条执行绪上呢?应该是 Computation 吧?但结果却是 Test worker !:

Thread: Test worker, Start
Thread: Test worker, create Item
Thread: RxSingleScheduler-1, subscribe

原因是因为在 Observable.just 是当下就会马上执行的函示,所以 createItem 会是现在正在运行的执行绪上执行,如果想要在 computation 上执行,请用 Observable.create。

3. flatMap 的执行绪切换

写到这边突然想到还没介绍到 flatMap,但是没关系,不会 flatMap 的话网路上有很多资源可以学习,或是之後没写满30天的话可以硬塞一天给它。

@Test
fun test() {
    printCurrentThread("Start")

    Observable.just(1)
        .flatMap { number ->
            createItemObservable()
                .map { number2 ->
                    printCurrentThread("inner map") // 这是在哪一个执行绪上执行呢?
                    number2 + number
                }
        }
        .subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.single())
        .subscribe {
            printCurrentThread("subscribe")
        }

    Thread.sleep(1000L)
}

private fun createItemObservable(): Observable<Int> {
    printCurrentThread("create Item")
    return Observable.just(2)
        .subscribeOn(Schedulers.io())
}

你觉得 Inner map 会执行在哪条 Thread 上呢? computation 吗?因为 subscribeOn 是影响所有的上游,所以照这样推论的话 flatMap 会因此跑在 computation 上,flatMap 里面的 map 也是同理,但是执行结果是:

Thread: Test worker, Start
Thread: RxComputationThreadPool-1, flatMap
Thread: RxComputationThreadPool-1, create Item
Thread: RxCachedThreadScheduler-1, inner map
Thread: RxSingleScheduler-1, subscribe

inner map 是跑在 “RxCachedThreadScheduler”上,也就是 io,为什麽呢?因为 createItemObservable() 在建立 Observable 的时候已经抢先执行了 subscribeOn(Scheduler.io()) ,所以後来的 subscribeOn(Scheduler.computation()) 就无法影响这个 Observable 了。

为什麽要特别示范这一段呢?我们其实很常会在不同的类别之间传递 Observable ,然而,在对该 Observable 做 flatMap 时要小心,在後面接 subscribeOn 的话有时并不会影响该 Observable 的执行绪,执行在哪条执行绪上是要看情况而定的。

建议大家可以试试看拿掉 subscribeOn(Scheduler.io()) ,会有不同的结果喔!

Custom Extension functions for RxJava

“RxJava” 是一个为 Java 这语言设计的 “Reactive extension library”,所以在使用上会比较贴近 Java 这个程序语言。但是我们现在写的语言是 Kotlin!Kotlin 有着很多 Java 没有的语言特性,因此可以突破原本的限制,进而让 RxJava 更好使用。其中我最喜欢的就是 Extension function 搭配上 RxJava 了,他可以让我们的语法更加简洁,增加我们的生产力。

执行绪切换

@Test
fun test() {
    // These two are the same
    Observable.just(1)
        .subscribeOn(Schedulers.io())

    Observable.just(1)
        .fromIO()
}

fun <T> Observable<T>.fromIO(): Observable<T> {
    return this.subscribeOn(Schedulers.io())
}

藉由 extension function 的帮助,我们可以让冗长的 subscribeOn(Schedulers.io()),变成只有六个字母的 fromIO()。因此 observeOn 也是同理:

@Test
fun test() {
    // These two are the same
    Observable.just(1)
        .observeOn(Schedulers.io())

    Observable.just(1)
        .toIO()
}

fun <T> Observable<T>.toIO(): Observable<T> {
    return this.observeOn(Schedulers.io())
}

filterInstance

觉得 List 的 filterInstance 很好用吗?怎麽不为 Observable 也做一个呢?

inline fun <reified T> Observable<in T>.filterInstance(): Observable<T> {
    return this.filter { it is T }
        .map { it as T }
}

注:这边经过了两个 Operator ,所以是有优化空间的,如果非常注重效能的话,需要自己去实作 Custom Observable 或是 Costum ObservableTransformer ,然後再使用 extension function 来串接,但是通常来说,这样的解法是够用的。

pairwise

有时候会需要跟前一个资料组合起来,两个资料一起送出去做其他运算,但是 RxJava 没有这样的 operator ,所以我们也自己做了一个:

fun <T> Observable<T>.pairwise(): Observable<Pair<T, T>> {
    return Observable.zip(this, this.skip(1),
                          BiFunction { a, b -> Pair(a, b) })
}

这边举个例子来让大家比较好理解,假如有一个 Observable 会送出 1, 2, 3, 4, 5,这五个资料,那经过 pairwise 之後,就会送出 (1, 2), (2, 3), (3, 4), (4, 5)

在 MVVM pattern 中的执行绪切换

在 Android 开发中,主要分成两种类型的执行绪:Main thread, background thread

  • Main thread: 主要用来做 UI 显示,接受按钮事件,这条执行绪非常重要!要是处理太多任务的话,App 会开始卡顿,情况更严重的话还会有 ANR。
  • Background thread: 网路连线、资料库操作、档案读写都是属於这个类型,请注意这边的 background thread 并不是只能有一个执行绪,而是可以依不同目的而去建立并使用各自的执行绪。

一般来说,Model 层处理的事情主要是关於网路连线以及资料库操作。所以在 Model 层中是不能使用 Main thread 的,再加上在 Android app 中,没有特别指定的话,程序码都会跑在 Main thread 上。所以,我们必须在某个地方做执行绪切换,来确保说 Model 层的所有任务都不会跑在 Main thread 上。

那麽问题来了,如果要在 MVVM 中使用 RxJava 来实现的话,要在哪里做执行绪切换呢(也就是 subscribeOn 跟 observeOn)?有一种可能性是,在 Model 层中使用 subscribeOn(Schedulers.io()) ,那 ViewModel 层就可以放心的继续使用从 Model 层来的 Observable,而不用担心这个 Observable 被 subscribe 之後会不会卡到 UI thread 。然後呢,在 View 层要 Subscribe 的时候再使用 observeOn(AndroidSchedulers.mainThread()) 切换回 Main thread,就可以确保更新 UI 的动作是在 Main thread 完成的。

在同一个专案或是团队里,遵守同样的程序码风格是一件好事,所以上面的规则我们可以套用到整个专案,对吧? 先来看看这个调查:下面是我在 “Android Developer开发读书会”社团发起的投票,用意是调查大家偏好在哪里使用 subscribeOn ,哪里使用 observeOn ,结果分为两大阵营,第三个选项只有四个人投票:

Screen Shot 2021-09-01 at 10.49.00 PM.png

连结:https://www.facebook.com/groups/523386591081376/permalink/4285325838220747/

这统计结果可能代表了大家都有一些预设的偏好,在他们的专案开发中有一样的程序码风格。但其中很有趣的是,有人一次选了两个选项,这是为什麽呢?我没有实际问本人的意见,但我猜他们想表达的是“It depends”,事实上,第一个留言的人就是说看情况而定了。

为什麽是看情况呢?举例来说,要是我们真的想要在不同的情况下决定 Model 层的执行绪,使用我在这篇文章中的作法做得到吗?好像做不太到对吧?因为第一个 subscribeOn 就已经定好了执行绪,之後不管怎麽做都无法切换了。所以如果要在後来做切换的话,可能就要在 ViewModel 层根据条件来决定 subscribeOn 中的 scheduler 要放哪一个。那更之後的任务呢?都只能执行在该执行绪上吗?好像也不应该这麽做,我们很有可能还要视情况使用 observeOn 再切换到 computation 或是其他的 scheduler。

在这个问题的讨论上又回到了一个结论,就是架构设计是没有绝对的,一个完整的架构,其实是很多小小、不同的设计决策所累积出来的,在 Clean architecture 中有一段话是这麽说的:

I see all the little details that support all the high-level decisions. I also see that those low-level details and high-level decisions are part of the whole design of the house.

low-level detail 跟 high-level decision 是相辅相成的,世界上不存在一个完美的架构能够解决所有的问题。


<<:  Day 14 「不残而废」单元测试、Code Smell 与重构 - Data Class 篇

>>:  AI ninja project [day 14] 文字处理--分类

DAY 4 - 牛头怪

大家好~ 我是五岁~ 今天来画牛头怪~ 今天会尝试卡通风格~ 目标是一只跟人一样站立的牛头怪,武器是...

IOT 组别

IOT https://wolkesau.medium.com/ddf91d896fd8 Maker...

Docker in docker .解决技术环境问题

缘由 很多时候我们会使用docker作为环境的控管,确保服务执行时环境是一致的。 但某些时候我们可能...

Day30 - 回顾

踏入区块链一年多,回想起参加过的线下聚会、线上课程,都让我觉得很充实。 虽然觉得区块链是一个当前有点...

[第16天]理财达人Mx. Ada-已实现损益

前言 本文说明帐户已实现损益资讯。 程序实作 程序 # 特定时间已实现损益 profit_loss ...