RxJava operators && Java.Optional as a type class

本篇是用来补充 RxJava 的基础知识跟 functional programming 的应用,最後将会说明上一篇没有解释到的实作细节。

Observable Creation

建立 Observable 的各种 factory method。

Observable.just

非常直觉简单,看下面的范例就知道怎麽用了:

val a = Observable.just(1, 2, 4)
a.subscribe { number -> println(number) }

// 1 
// 2 
// 4

Observable.create

它的特性是,只有在呼叫 subscribe 的时候,lambda 里面的内容才会被执行,也就是说是 lazy 的行为。而 emitter 呢,有三个主要会被使用的方法: onNext, onComplete, onError,onNext 可以发出新的资料,呼叫 onComplete 代表这个 Observable 已经结束了,不会有新的内容,onError 则是发出一个例外错误,错误发生之後也依样不会有新的内容。

val a = Observable.create<Int> { emitter: ObservableEmitter ->
    println("start")
    emitter.onNext(1)
    emitter.onNext(2)
    emitter.onComplete()
    emitter.onNext(4) // This one never be sent
    println("end")
}
println("before subscribe")
a.subscribe { number -> println(number) }

// before subscribe
// start
// 1
// 2
// end

Basic operation

map

用来做资料转换

val a = Observable.just(1, 2, 4)
val b = a.map { it * 2 }
b.subscribe { number -> println(number) }

// 2
// 4
// 8

filter

用来做资料过滤

val a = Observable.just(1, 2, 4)
val b = a.filter { it % 2 == 0 }
val c = a.filter { it % 2 != 0 }
println("subscribe b")
b.subscribe { number -> println(number) }
println("subscribe c")
c.subscribe { number -> println(number) }

// subscribe b
// 2
// 4
// subscribe c
// 1

reduce

用来做资料整合。

val a = Observable.just(1, 2, 4)
val b = a.reduce { t1, t2 ->
    println("t1: $t1, t2: $t2")
    t1 * t2
}

b.subscribe { number -> println("result: $number") }

// t1: 1, t2: 2
// t1: 2, t2: 4
// result: 8

scan

与 reduce 类似,但差别是每一次的结果都会再发送出来,但是 reduce 只会发送最後一个

val a = Observable.just(1, 2, 4)
val b = a.scan { t1, t2 ->
    println("t1: $t1, t2: $t2")
    t1 * t2
}

b.subscribe { number -> println("result: $number") }

// result: 1
// t1: 1, t2: 2
// result: 2
// t1: 2, t2: 4
// result: 8

Subject

Subject 一种比较特别的 Observable,他同时是 Observable 也是 Observer,也就是说他可以透过 subscribe 其他 Observable 来获得资料:

val a = BehaviorSubject.create<Int>()// BehaviorSubject 是其中一种 Subject 的实作
val b = Observable.just(1, 2, 3)

a.subscribe { number -> println("result: $number") }
b.subscribe(a)

// result: 1
// result: 2
// result: 3

在执行 b.subscribe(a) 之後,所有在 b 这个 Observable 中的元素将会传给 a 这个 Subject ,然後因为 a 在前面已经有被别人 subscribe 了,所以会执行里面的 println(),其运作机制如下图所示:

Screen Shot 2021-08-27 at 2.41.33 PM.png

也可以直接透过 Subject 来发送资料,如下面范例:

val a = BehaviorSubject.create<Int>()

a.subscribe { number -> println("result: $number") }

a.onNext(1)
a.onNext(2)
a.onComplete()
a.onNext(4)

// result: 1
// result: 2

三种不同类型的 Subject

  • BehaviorSubject:不管是任何 Observer 在任何时间 subscribe,一开始都会收到最新的一笔资料,跟 LiveData 的行为一样。
  • PublishSubject:永远只会收到当下传送的资料,在 subscribe 之前的任何资料都是不会收到的。
  • ReplaySubject:记得所有传输过的资料,每一个新的 Observer 都会完整的收到历史资料,但是使用上要特别注意,记忆体的量不是无限的!

下面的范例示范了这三个 Subject 的差别,建议大家自己动手做实验,实际下去玩玩看有助於你理解喔!

val behavior = BehaviorSubject.create<Int>()
val publish = PublishSubject.create<Int>()
val replay = ReplaySubject.create<Int>()

behavior.onNext(1)
publish.onNext(1)
replay.onNext(1)
behavior.onNext(2)
publish.onNext(2)
replay.onNext(2)

behavior.subscribe { number -> println("behavior: $number") }
publish.subscribe { number -> println("publish: $number") }
replay.subscribe { number -> println("replay: $number") }

behavior.onNext(4)
publish.onNext(4)
replay.onNext(4)

// behavior: 2
// replay: 1
// replay: 2
// behavior: 4
// publish: 4
// replay: 4

Merge Observable

Observable 有很多不一样的组合方式,在这里只会提到几个常用的,跟本专案有用到的其中几个,其他就要靠读者自己看文件了。

CombineLatest

只要任何一个 Observable 有新的值产生出来,就会被触发,并且与其他的 Observable 上一次发送出来的值一起被送出来,如下方范例,a 在一开始连续送出了两个资料,但是由於 b 还没有任何资料产生,所以 a 的第一个资料在这边就没有任何用处。

val a = PublishSubject.create<Int>()
val b = PublishSubject.create<Int>()

Observables.combineLatest(a, b)
    .map { (first, second) ->
        println("first: $first, second: $second")
        first + second
    }
    .subscribe { number -> println("result: $number") }

a.onNext(1)
a.onNext(2)
b.onNext(1)
b.onNext(3)
a.onNext(5)
b.onNext(8)

// first: 2, second: 1
// result: 3
// first: 2, second: 3
// result: 5
// first: 5, second: 3
// result: 8
// first: 5, second: 8
// result: 13

Zip

跟 CombineLatest 用法类似,但是不一样的是,他是组合出所有的 Observable 发出第一个资料,第二个资料,第三个等等,是一个在意 index 一致时用的 operator。

val a = PublishSubject.create<Int>()
val b = PublishSubject.create<Int>()

Observables.zip(a, b)
    .map { (first, second) ->
        println("first: $first, second: $second")
        first + second
    }
    .subscribe { number -> println("result: $number") }

a.onNext(1) // [1] []
a.onNext(2) // [1, 2] []
b.onNext(1) // [2] [] emit (1, 1)
b.onNext(3) // [] [] emit (2, 3)
a.onNext(5) // [5] []
b.onNext(8) // [] [] emit (5, 8)

// first: 1, second: 1
// result: 2
// first: 2, second: 3
// result: 5
// first: 5, second: 8
// result: 13

withLatestFrom

这个其实算是比较难理解的一个 operator ,但是它很有用!因为他的特性是让其中一个 Observable 是处於“被动”状态,所以不会发送太多不想要的资讯,请看下方范例:

val a = PublishSubject.create<Int>()
val b = PublishSubject.create<Int>()

a.withLatestFrom(b) { first, second ->
    println("first: $first, second: $second")
    first + second
}
    .subscribe { number -> println("result: $number") }

a.onNext(1) // b 没有值,所以不会触发
a.onNext(2) // b 没有值,所以不会触发
b.onNext(1) // b 有值了,但是主动权是在 a 身上,所以一样不会触发
a.onNext(3) // b 有值了,由 a 主动触发,得到 3+1 = 4
a.onNext(5) // b 有值了,由 a 主动触发,而且会拿到一样的 b,得到 5+1 = 6
b.onNext(8) // 只是更新 b ,不会有新的资讯产生出来

// first: 3, second: 1
// result: 4
// first: 5, second: 1
// result: 6

Optional

在 Java 中有一个很有用的类别: Optional ,他存在的目的就是为了能够妥善的处理 null 的这个状态,而且跟 List 一样,是一个有泛型行为的类别。在 Functional programming 中,像这种可以使用“泛型”这种形式,而且还可以使用 map , flatmap, filter 这些高阶函式的类别,称之为 Type class 。为什麽 Optional 很有用呢?因为有了它,可以让我们在忽略 Null 的情况下进行逻辑运算,请看下面的范例:

fun main() {
  val a: String? = null
  val b: String? = "Hihi"
  val optA = Optional.ofNullable(a)
  val optB = Optional.ofNullable(b)

  printOptStringLength(optA)
  printOptStringLength(optB)
}

private fun printOptStringLength(optString: Optional<String>) {
  optString.map { it.length }.ifPresent { length ->
      println("length: $length")
  }
}

a 跟 b 的型别一样都是 nullable ,如果我现在有个需求是想知道这字串的长度,要怎麽做呢?使用 Kotlin 的作法可能是用 ?. 这种语法来做安全的呼叫,但是使用 Optional 的话,就可以去除掉使用 ?. 这样的语法,取而代之的是 map

看到这边,应该有人很想改程序码了,这样看起来没有比较好啊!程序码反而更长了不是吗?的确,这个案例不是很需要用到 Optional ,但是为了让大家做初步的理解,请再看看下面这个范例:

fun main() {
    multiplyNumberAndStringLen(4, null)
    multiplyNumberAndStringLen(null, "Hihi")
    multiplyNumberAndStringLen(4, "Hihi")
}

private fun multiplyNumberAndStringLen(a: Int?, b: String?) {
    val optA = Optional.ofNullable(a)
    val optB = Optional.ofNullable(b)

    optA.flatMap { number -> optB.map { str -> number * str.length } }
        .ifPresent { number ->
            println("Multiply result: $number")
        }
}

// Multiply result: 16

遇到多个 nullable 这种案例的时候,使用 Optional 就会相对简洁。如果不喜欢 Java 的这个 Optional (因为 gradle 需要做另外的设定 Android Studio 才不会显示红字),其实你也可以自己做出一个这样的 Type class,实作并不难,还有想知道更多关於 Functional programming 的用法的话,可以参考我去年写的文章:https://ithelp.ithome.com.tw/articles/10240335

BoardViewModel moveNote 的实作

终於补充完所有的知识了,我们再回来看一下 moveNote 的实作是长什麽样吧!

fun moveNote(noteId: String, delta: Position) {
    Observable.just(Pair(noteId, delta))
        .withLatestFrom(allNotes) { (noteId, delta), notes ->
            val currentNote = notes.find { it.id == noteId }
            Optional.ofNullable(currentNote?.copy(position = currentNote.position + delta))
        }
        .mapOptional { it }
        .subscribe { newNote ->
            noteRepository.putNote(newNote)
        }
        .addTo(disposableBag)
}

这边使用到了 withLatestFrom ,根据本篇前面的解说,withLatestFrom 中的 Observable 是被动的,这代表什麽呢?

一个主动的便利贴移动事件 + 一个被动的的全部便利贴的最新状态

这样不就刚好能组合出最新的便利贴应该要放在什麽位置了吗?因为我能从全部便利贴 - notes 中,由 Id 找到相对应的便利贴,然後与 delta 的资料结合起来计算出新的位置,最後再丢给 noteRepository 去更新,问题解决!万一我现在用其他的 operator 会发生什麽事呢?例如 combineLatest 呢?

Observable.combineLatest(Observable.just(Pair(noteId, delta)), allNotes)

如果使用 combineLatest ,因为在 noteRepository 中 putNote 的更新将会触发 allNote 的更新,所以这个 combineLatest 的版本还会再一次的触发,因此 putNote 会再执行一次,把新的值藉由塞入 noteRepository ,产生了一个永无止境的无穷回圈,最後 app 将会因为 ANR 而闪退,所以 withLatestFrom 在这边就发挥了极大的作用,避免掉了这个无穷回圈。

而这边为什麽会用到 Optional 呢?因为 RxJava 不接受任何 Null 的值被传出去,所以这边使用 Optional 来做包装,然後下面的 mapOptional 将会解开这样的包装,如果 Optional 里面的值是空的话,就不会继续往下走。

NoteRepository putNote 的实作

class InMemoryNoteRepository(): NoteRepository {

    private val notesSubject = BehaviorSubject.create<List<Note>>()
    private val noteMap = ConcurrentHashMap<String, Note>()

    init {
        val initNote = Note.createRandomNote()
        noteMap[initNote.id] = initNote
        notesSubject.onNext(noteMap.elements().toList())
    }

    override fun getAll(): Observable<List<Note>> {
        return notesSubject.hide()
    }

    override fun putNote(newNote: Note) {
        noteMap[newNote.id] = newNote
        notesSubject.onNext(noteMap.elements().toList())
    }
}

相信大家在看到这麽多使用 Subject 的范例後,理解上面的程序码就不是困难的事了,这边我就不多加解说了。

小结

今天看到了 merge observable 在这专案中的其中一个应用,如果没有 withLatestFrom,使用 RP 将会困难重重,一不小心就会造成无穷回圈,或是因为不熟悉 RP 而多了很多额外变数,让程序码变得难以阅读,如果额外的变数的同时又会造成 side effect。在写 RP 时,应该尽量避免 side effect,才能好好地确保程序执行的正确性。

补充 side effect(又置入了一篇文章XD): https://ithelp.ithome.com.tw/articles/10236884


<<:  [Day 5] SRE - 发动测试左移之术,预视未来的机制

>>:  [Day 13] SCSS 结合 Bootstrap 网页制作

自动化 End-End 测试 Nightwatch.js 之踩雷笔记:getCssProperty()

getCssProperty() 是 Nighwatch 中用来取得 DOM element 的 c...

Day 15: 人工神经网路初探 激活函数(下)

激活函数 Activation Function Scaled Exponential Linear...

[D24] 物件侦测(5)

前一篇物件侦测(4)停在 YOLOv1 的缺点上,现在就要来说 YOLOv2了! YOLOv2 YO...

Kotlin Android 第15天,从 0 到 ML - Android Jetpack

前言: 前两遍的基础activity 和 fragment 就可以作出不错的app了,但功能愈来愈多...

Day15看鱿鱼游戏就要搭上鱿鱼料理-琉球菜鱿鱼小封

Netflix上的鱿鱼游戏正夯,雪伦也是一集接一集的看完了 上次看机智医生生活搭配辣炒年糕,那这次看...