RxJava - Backpressure

不知道大家有没有手冲咖啡的经验?如果没有的话,应该也看过或用过滤挂式咖啡,在冲咖啡时,水不能一次倒太多,因为滤网的消化速度没这麽快,要是你不管它的消化速度一直倒水进去的话,最後就会满出来!

上面这个手冲咖啡的例子也会发生在 Reactive programming 上面,之前的章节中有讲过,Reactive programming 的其中一个大特点是,事件流可以在不同的阶段依据需求来去切换执行绪,所以万一上游的事件发出太多,切换执行绪之後,下游无法即时处理时会发生什麽事呢?依咖啡的例子来说,热水可以到的最大容量就是滤纸的大小,那以电脑来说就当然是记忆体的容量了,所以当下游来不及处理的话,是会发生 OutOfMemoryException 的!

这个现象在 RxJava 中有一个名词来表示它: Backpressure。既然 Backpressure 有这个问题,RxJava 当然就提供了相对应的解法,接下来就来介绍它吧!

Flowable

跟 Observable 其实是几乎一模一样的,有各式各样的 operator: mapflatMapfilterreducecombineLatestzip等等,也可以使用 observeOnsubscribeOn 来切换执行绪。Flowable 提供更多的是,在建立实体时提供了各种不同处理 Backpressure 的策略,像是丢掉最新的一笔、或是丢掉最後的一笔等等,下面举个例子:

val f = Flowable.fromIterable((1..10000).toList())
f.subscribe { number ->
    println("number : $number")
}

运行结果:

number : 1
number : 2
number : 3
....
number : 10000

奇怪!全部都有跑出来啊,怎麽没看到有任何资料被丢掉?但其实用法错了,应该要用下面这种方式来建立 Flowable :

val f = Flowable.create(FlowableOnSubscribe<Int> { emitter ->
    for (i in 1..10000) {
        emitter.onNext(i)
    }
    emitter.onComplete()
}, BackpressureStrategy.DROP)
f.subscribe { number ->
    println("number : $number")
}

使用 Flowable.create 才能成功建立出有 BackpressureStrategy 的 Flowable 喔!现在的 BackpressureStrategy 为 DROP ,也就是丢掉多余的资料,那我们来看看他是怎麽丢掉的吧!(根据下图,数字应该只会跑到 128,因为预设的 buffer size 就是 128)

Screen Shot 2021-09-05 at 9.31.36 PM.png

number : 1
number : 2
number : 3
....
number : 10000

果不其然,结果是.....?不对啊,怎麽还是跑完 10000 了呢?那我们再做一个实验,在 onNext 上面再留个 log:

val f = Flowable.create(FlowableOnSubscribe<Int> { emitter ->
    for (i in 1..10000) {
        **println("onNext : $i") // 多加这一行**
        emitter.onNext(i)
    }
    emitter.onComplete()
}, BackpressureStrategy.DROP)
f.subscribe { number ->
    println("number : $number")
}

运行结果如下:

onNext : 1
number : 1
onNext : 2
number : 2
onNext : 3
number : 3
...
onNext : 10000
number : 10000

原来从头到尾都没有在累积资料!每一笔资料送出之後马上就被处理了,因为他们都是在同一条执行绪上运行啊!所以为了模拟 Backpressure ,就需要去切换执行绪:

val f = Flowable.create(FlowableOnSubscribe<Int> { emitter ->
    for (i in 1..10000) {
        emitter.onNext(i)
    }
    emitter.onComplete()
}, BackpressureStrategy.DROP)

f
    **.observeOn(Schedulers.computation()) // 多了这一行**
    .subscribe { number ->
        println("number : $number")
    }

Thread.sleep(1000L)

再跑最後一次,终於成功了!

number : 1
number : 2
number : 3
...
number : 128

BackpressureStrategy 还有其他四种,想了解的就交给各位读者自己去玩了!

关於上回的 Bug

相信聪明的你已经知道上一篇的最後发生了什麽事了,没错!就是发生了 Backpressure!请看下图说明:

Screen Shot 2021-09-05 at 9.47.17 PM.png

当最右边的 Repository 要更新资料到 Firebase 时,是有一些网路延迟的,网路更新事件这边是以橘色的小圆点来表示,这边假设每一个小圆点都要花差不多 300 毫秒好了,过了 300 毫秒之後才能再处理下一个事件。接着我们再看到上图的左边,有一个手势的事件(红色小圆点)传送到 ViewModel 了,而且下一个手势事件可能会在 30 毫秒内又送出另一个,而这每一个手势事件最後又会转为右边的橘色小圆点,30 毫秒对上 300 毫秒...那这不就造成塞车了吗?这边的 Backpressure 就是这麽发生的!

那麽只要我将中间的事件都丢掉就没问题了吗?让 Firebase 处理完再给他下一个更新事件,但...好像也不能丢掉,丢掉的话手势事件的连贯性就没了,使用者就因此无法再手机上体验到很流畅的便利贴拖曳体验(毕竟至少 300 毫秒之後才会看到拖曳之後的结果)。所以这要怎麽办呢?接着我们就要靠下一个 Reactive Programming 的机制来帮我们解决这问题了 - Multicasting。


<<:  【在 iOS 开发路上的大小事-Day05】透过闭包 (Closure) 来传值

>>:  DAY5-PHP这是什麽老东西

Day 15-制作购物车系统之安装及资料夹结构(四)

今天要说设定script命令句的部分 还记得前几天的文章有说到package.json里script...

Day25 Let's ODOO: System Parameters

顾名思义就是系统参数,里面存放着许多系统用的资料,如资料库创立时间、UUID、预设template等...

Kotlin Android 第21天,从 0 到 ML - Retrofit and Repository

前言: 说到呼叫 API 的方法,那就一定会提到 Retrofit 这个无人不知,无人不晓的第三方的...

好用的线上IDE分享

在开发程序时,有时候想要测试一点小功能,确认说这个功能可不可以使用,如果说每次都要为了测试这点功能就...

【设计+切版30天实作】|Day14 - 简约CTA的用处及设计的注意事项

设计大纲 设计CTA的用意一方面是让结尾不会来得太突然,另一方面是想在网页的最後再来一个「Call ...