Multicasting for RxJava

在进入正题之前先让大家看看在 Reactive Programming 中的一种使用案例:

val studentObservable: Observable<Student> = getStudents()

studentObservable
    .filter { it.gender == "male" }
    .subscribe {
        // handle male logic
    }

studentObservable
    .filter { it.gender == "female" }
    .subscribe {
        // handle female logic
    }

// Don't do this
studentObservable
    .subscribe { student ->
        when(student.gender) {
            "male" -> { TODO() }
            "female" -> { TODO() }
        }
    }

在使用 reactive stream 的时候,有时候并不需要“一条通到底”,在必要的时候,可以采取这种“分流”的方式,分别处理各自的逻辑,也比较符合 Single Responsibility Principle。但是这样使用却会导致其他问题的发生,请看看下面这段范例:

val a = Observable.create<Int> { emitter ->
    for (i in (1..3)) {
        println("emit $i")
        emitter.onNext(i)
    }
    emitter.onComplete()
}

a.subscribe { number ->
    println("received 1 $number")
}
a.subscribe { number ->
    println("received 2 $number")
}

猜猜上面这段程序码的输出是什麽?

emit 1
received 1 1
emit 2
received 1 2
emit 3
received 1 3
emit 1
received 2 1
emit 2
received 2 2
emit 3
received 2 3

这个 Observable 由於被 subscribe 了两次,所以 emit 1emit 2emit 3 也都各执行了两次,这乍看之下没有什麽大问题,但万一这个 Observable 其实是一个很耗时的任务呢?同样的任务,同样的输出结果我却需要执行两次以上,不是很浪费机器效能吗?那有没有办法只执行一次呢?答案是有的:

val a = Observable.create<Int> { emitter ->
    for (i in (1..3)) {
        println("emit $i")
        emitter.onNext(i)
    }
    emitter.onComplete()
}.cache()

a.subscribe { number ->
    println("received 1 $number")
}
a.subscribe { number ->
    println("received 2 $number")
}

使用 cache() 这个 operator 可以让我们把所有跑过的结果都记起来,当有第二个或第三个 Observer subscribe 的时候,就可以直接拿到这结果,请看输出:

emit 1
received 1 1
emit 2
received 1 2
emit 3
received 1 3
received 2 1
received 2 2
received 2 3
// emit 1, emit 2, emit 3 都只跑一次了 

以上这种机制叫做 Multicasting :一个源头事件流,同时分发到不同的接收者而不需要重复执行已经运算过的内容。

强大而危险的工具

在写 Reactive Application 的时候,效能一直都会是一个很大的考验,因为我们偏好使用 Immutable object,在每一个 operator 运行的时候可能会产生一个新的物件出来,而要产生一个新物件就要将记忆体中的一个位置让出来给他,当事件越来越多时,记忆体消耗的速度就会越来越快,量大到一定程度的话,系统就会开启 Garbage Collection 的机制自动回收记忆体,而回收的次数太多就会有可能使得使用者感觉到使用者介面卡顿,这是一个很需要被意识到的问题。

因此,在使用 Reactive Programming 做“分流”时,要随时注意到是不是有不小心在什麽地方浪费了太多系统资源,如果发现了,就要适时的使用 multicasting 的方式来做优化。然而 multicasting 却也是一个双面刃,以下是官方在 cache() 这个 operator 中写的文件:

Note: You sacrifice the ability to dispose the origin when you use the cache operator, so be careful not to use this operator on Observables that emit an infinite or very large number of items that will use up memory. A possible workaround is to apply takeUntil with a predicate or another source before (and perhaps after) the application of cache().

大意是说,一但使用了 cache() ,就失去了对上游资料流的控制权,你将无法靠下游的 Disposable 来断掉整个资料流,就算下游的 Observable 已经被某人给结束,他的上游还是会永远存在,而且还会继续、默默的产生资料,你的记忆体将会被消耗殆尽!为了验证这样的说法,我们来试着做出永远不会死掉的事件源:

val observable = Observable.create<Int> { emitter ->
    var count = 0
    while (!emitter.isDisposed) {
        count +=1
        emitter.onNext(count)
        println("emit number: $count")
        Thread.sleep(1000)
    }
}.publish() // [1]

val disposable = observable
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io())
    .subscribe { number ->
        println("observable: $number") // [3]
    }

Completable.complete()
    .observeOn(Schedulers.single())
    .subscribe {
        observable.connect()  // [2]
    }

Thread.sleep(5000)  // [4]

disposable.dispose()  // [5]

Thread.sleep(5000) 

我们建立了一个永远都不会结束的 Observable ,每发出一个 item ,就会 sleep 一秒钟的时间,一直到这个 observable 被 dispose 为止。然後在 [1] 这个地方有一个 publish() ,这个 operator 的目的是为了要将他变成一个 Hot Observable,接着,为了让这个 hot Observable 能够开始发出第一个资料,我们必须要在 [2] 这个地方呼叫 connect() 这个 function 。接着在 [3] 这个地方 subscribe 了这个 observable ,而且使用不同的执行绪来接收结果来确保这些执行的顺序都是独立的。[4] 强制目前这条执行绪停止五秒钟,观察一下结果,并且在 [5] 这个地方取消了在 [3] 的订阅,照理说,一但取消了订阅,observable 也要立即停止送出新的资料,不然在这边会有记忆体浪费的问题存在,我们来看看执行的结果:

observable: 1
emit number: 1
emit number: 2
observable: 2
emit number: 3
observable: 3
emit number: 4
observable: 4
emit number: 5
observable: 5
emit number: 6
emit number: 7
emit number: 8
emit number: 9
emit number: 10

Hot Observable: 跟我们之前熟悉的 Observable 不一样,即使没有 Observer,也会持续不断的送出新资料出来,当碰到地理位置更新或是蓝芽讯号时,会需要用到这种类型的 Observable

我们可以观察到,即使 dispose 已经触发了,源头的 observable 却还是没有停下来,持续的在丢出新的资料。事实上, cache() 在实作上就是 publishconnect 的结合。

以上是一个比较复杂的范例,主要是用来示范 Observable 在使用 Multicasting 的情况下是有可能无法操控源头的 Observable 的,为了这个示范我就用了比较多进阶的概念。如果没有完全懂的也没关系,下面我再举一个比较浅显易懂的例子:

val observable = Observable.create<Int> { emitter ->
    var count = 0
    while (!emitter.isDisposed) {
        count +=1
        emitter.onNext(count)
        println("emit number: $count")
        Thread.sleep(1000)
    }
}
    .doOnDispose { 
        println("This observable is disposed") //[1]
    }

val disposable = observable
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io())
    .subscribe { number ->
        println("observable: $number")
    }

Thread.sleep(5000)

disposable.dispose()

Thread.sleep(5000)

我把刚刚的 publish()connect() 拿掉了,并且在 [1] 这个地方加了一个 side effect operator - doOnDispose ,在这个 observable 被 dispose 掉的时候就会去呼叫它,来看看执行的结果:

observable: 1
emit number: 1
emit number: 2
observable: 2
emit number: 3
observable: 3
emit number: 4
observable: 4
emit number: 5
observable: 5
This observable is disposed

没有了 publish()connect() 之後,这个 observable 的确没有再继续跑了,看到 “This observable is disposed” 就是最直接的证据,那如果我再加上 cache() 会发生什麽事呢?

val observable = Observable.create<Int> { emitter ->
    var count = 0
    while (!emitter.isDisposed) {
        count +=1
        emitter.onNext(count)
        println("emit number: $count")
        Thread.sleep(1000)
    }
}
    .doOnDispose {
        println("This observable is disposed")
    }.cache() // [1]

val disposable = observable
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io())
    .subscribe { number ->
        println("observable: $number")
    }

Thread.sleep(5000)

disposable.dispose()

Thread.sleep(5000)

如上方程序码所示,我在 [1] 的这个地方加了 cache() ,执行的结果会是什麽呢?

emit number: 1
observable: 1
emit number: 2
observable: 2
emit number: 3
observable: 3
emit number: 4
observable: 4
emit number: 5
observable: 5

结果果然跟文件上说的一样,原来的 observable 并没有结束,而是一直静静的待在那边,永远没有人可以结束的了他的生命...那如果我们要结束这个使用过 cache 的 observable 要怎麽做呢?可以参考官方建议的作法:

val endSignal = PublishSubject.create<Unit>()
val observable = Observable.create<Int> { emitter ->
    var count = 0
    while (!emitter.isDisposed) {
        count +=1
        emitter.onNext(count)
        println("emit number: $count")
        Thread.sleep(1000)
    }
}
    .doOnDispose {
        println("This observable is disposed")
    }
    .takeUntil(endSignal) // [1]
    .onTerminateDetach()
    .cache()

val disposable = observable
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io())
    .subscribe { number ->
        println("observable: $number")
    }

Thread.sleep(5000)

endSignal.onNext(Unit) // [2]
disposable.dispose()

Thread.sleep(5000)

在 [1] 这个地方插入一个 takeUntil ,并且在预计要结束的地方触发结束的条件,也就是在 [2] 这边。

Share

除了 cache() 之外,还有一个功能类似,但是没有那麽危险的 multicasting operator - share,跟 cache 最主要的差别是,cache 在有多个 observer 的情况下,每一个 observer 都能收到最完整的资讯,但是 share() 新的 observer 却只能收到最新的结果,无法收到之前的所有资料,请看下面的范例:

val observable = Observable.create<Int> { emitter ->
    var count = 0
    while (!emitter.isDisposed) {
        count +=1
        emitter.onNext(count)
        println("emit number: $count")
        Thread.sleep(1000)
    }
}
    .cache()

observable
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io())
    .subscribe { number ->
        println("observer1 : $number")
    }

Thread.sleep(3000)

observable
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io())
    .subscribe { number ->
        println("observer2 : $number")
    }

Thread.sleep(1000)

以上这段程序码使用了 cache() ,而且第一个 observer 跟第二个 observer subscribe 的时机点中间差了 3 秒钟,以下是执行的结果:

emit number: 1
observer1 : 1
emit number: 2
observer1 : 2
emit number: 3
observer1 : 3
observer2 : 1
observer2 : 2
observer2 : 3
observer1 : 4
emit number: 4
observer2 : 4

observer2 一执行 subscribe 就马上收到 1, 2, 3 三个资料,那如果将 cache() 换成 share() 呢?

observer1 : 1
emit number: 1
emit number: 2
observer1 : 2
emit number: 3
observer1 : 3
observer1 : 4
emit number: 4
observer2 : 4

observer2 没收到 1, 2, 3 这三个资料,但是在之後发送的资料却还是收得到,如果是不需要拿到所有资料的情况下,使用这个 operator 是比较好的选择。

Subject

其实 Subject 也可以用来做 multicasting 喔!只要把计算产生的结果再送给 Subject ,就可以避免掉重复的计算以及浪费的记忆体:

val subject = BehaviorSubject.create<Int>()
  
val observable: Observable<Int> = getExpensiveObservable()
observable.subscribe { number -> 
    subject.onNext(number)
}

// multicast 1
subject.subscribe {  }

// multicast 2
subject.subscribe {  }

小结

今天看到了 multicasting 的一些 operator ,以及使用上要注意的地方,还有使用 subject 来当作 multicasting 的一种实作方式,实际上在专案上使用时,可能会看情况来使用其中一种。

在大型专案开发上,如果没有特别注意是会产生很严重的後果的,接下来的篇幅中,将会使用实际的案例来让大家更深刻体会到 multicasting 使用的必要性,大家明天见!


<<:  [Day-18] R语言 - 分群应用(一) k - prototype类别补值 - 上 ( Fill.NA with k - prototype in R.Studio )

>>:  pure function

Day 24 Compose UI Test

今年的疫情蛮严重的,希望大家都过得安好, 希望疫情快点过去,能回到一些线下技术聚会的时光~ 今天目标...

LeetCode - 8 String to Integer (atoi)

本篇同步发布於Blog:[解题] LeetCode - 8 String to Integer (a...

命令提示字元--CMD那麽好用你不学一下吗?

这是算是一个被我忽略的工具,赛程中时不时提起,直到铁人赛快结束我才想起来,这项工具也是该介绍的,他就...

Day24-介接 API(二)Google Calendar(II)Events——Read、Update、Delete

大家好~ 昨天成功在日历上新增 Event 了, 今天来对日历的 Event 做其他操作吧~ Rea...

第27天~CRUD

C-新增 R-查询 U-更改 D-删除 流程: 建置 设定 show 4.一定会有key:value...