补充完了必要的知识後,现在再回头看一下之前遇到的问题吧!
整合完 Firebase 之後发现了两个问题:
针对问题一,有了之前补充的知识,马上就能知道这个问题就是 Backpressure 所导致的,所以我们可以使用 RxJava 所提供的 operator 来解决这问题。至於问题二,既然我们无法使用同步的资料,那就完全使用本地资料来更新吧!如此一来在移动便利贴的时候就不会卡了。
所以我们有两边的资料需要做更新,一边是上传到云端,其他一起编辑的使用者就可以看到我正在移动便利贴,另一个是本地端,让本地端的使用者可以做到即时的位置更新。 那如何将一个资料流分流到这二边呢?没错,这边就会用到上一篇所说的 Multicasting!使用 Multicasting 就能将原本一个 Observable 的资料流,分出来给其他两个 Observable 做使用,请看以下图示:
在原本 NoteRepository 的实作中, putNote()
会将所有的资料一个不漏的上传到 Firebase 中,但是因为这样会造成塞车,所以我们就可以使用 RxJava 所提供的 operator: throttle
,来减少事件的数量,而throttle
呢是一个很常用来解决 Backpressure 问题的 operator,上图可以看到原本输入事件的频率较高,一条箭头中有两个事件(橘色圆点),而在经过 throttle
之後,一个箭头中减少到只剩一个事件,请注意这边的数量差距比例并不是重点(比例不会是 2:1),上图的重点是事件数量有减少。事实上,throttle
的运作机制是以时间来当作丢掉事件的基准,而不是事件的数量。
接着来看到下方的箭头,这边的事件数量会跟外面输入的数量是一样的,我们将用它来更新本地端的资料。然後这边上下两个箭头呢,都是使用同一个 Observable 来当作事件的来源,只是後面再用不同的方式将源头转换成两个不一样的 Observable。在这边我们将采取的方式是使用 Subject 来当作源头:
private val updatingNoteSubject = BehaviorSubject.createDefault(Optional.empty<Note>())
override fun putNote(note: Note) {
updatingNoteSubject.onNext(Optional.of(note))
}
一个 Subject 可以同时给很多不同的 Observer 给 subscribe,也不会有重复执行同一个 function 的问题( Multicasting 的问题请看上一篇),这边到目前为止应该没什麽大问题,但这边为什麽还要再包一个 Optional
呢?请继续往下看:
init {
updatingNoteSubject
.throttleLast(300, TimeUnit.MILLISECONDS)
.toIO()
.subscribe { optNote ->
optNote.ifPresent { setNoteDocument(it) }
}
}
private fun setNoteDocument(note: Note) {
val noteData = hashMapOf(
FIELD_TEXT to note.text,
FIELD_COLOR to note.color.color,
FIELD_POSITION_X to note.position.x.toString(),
FIELD_POSITION_Y to note.position.y.toString()
)
firestore.collection(COLLECTION_NOTES)
.document(note.id)
.set(noteData)
}
上传到 Firebase 的部分,在经过了 throttleLast
之後,事件的数量就减少为每 300 毫秒才有一个,并且将 300 毫秒内的最後一个事件,使用 firestore 的 API 来覆写资料。
虽然说我们把资料分流了,但是对於 EditorViewModel 来说,他认识的只有 NoteRepository 的 getAllNotes()
所回传的 Observable ,所以我们还是要将资料整合起来,给 EditorViewModel 当作唯一的资料来源,在 NoteRepository 的实作中所做的任何事情 EditorViewModel 一率都不需要关心。
还记得吗?Firebase 在更新完资料後,会藉由 SnapshotListener
来通知最新的结果,这边的结果包含了其他使用者所做的任何操作,像是改变颜色拉,改变其他便利贴的位置等等,所以从 Firebase 获取资料是一个必要的动作。另外一方面,本地端的更新也是必要的资料,两边的任何事件都不能有遗漏。所以这时候就有一个很适合这种情况的 opeartor 就要登场了,它就是 combineLatest
。
// allNotesSubject 是从 firebase 来的资料
override fun getAllNotes(): Observable<List<Note>> {
return Observables.combineLatest(updatingNoteSubject, allNotesSubject)
.map { (optNote, allNotes) ->
optNote.map { note ->
val noteIndex = allNotes.indexOfFirst { it.id == note.id }
allNotes.subList(0, noteIndex) + note + allNotes.subList(noteIndex + 1, allNotes.size)
}.orElseGet { allNotes }
}
}
汇集这两个地方的资料的时候,会有一种资料不一致的情况:就是正在移动中的便利贴,跟从 Firebase 上面的便利贴,在同一个 id 的情况下他们的位置一定是不一样的,那这时候要选哪一个呢?答案很明显的当然是要选择本地的那一份资料,不然 ViewModel 拿到的就是过去的资料而无法即时拖拉便利贴了,所以上面的程序码大致上就是在做这一件事。
但是还有一个问题,万一我已经完成编辑有一段时间了,其他人在拖曳我上次编辑过的便利贴会因为这个机制而无法即时看到更新,因为 updatingNoteSubject
自从上次更新完位置後就没有改变过资料内容了,现在的这套机永远会以 updatingNoteSubject
中的内容为优先,就算使用者已经没有要编辑他了也一样。
为了解决这个问题,我设计了一个机制,当使用者有一段时间没有编辑了,就将 updatingNoteSubject
中的内容给清空,如此一来,就可以顺利的解决上述的问题了:
init {
updatingNoteSubject
.filter { it.isPresent }
.debounce(300, TimeUnit.MILLISECONDS) // debounce 也是其中一种解决 backpressure 问题的 operator
.subscribe {
updatingNoteSubject.onNext(Optional.empty<Note>())
}
}
上述的说明就是我使用 Optional
的目的,他可以解决我使用 combineLatest
时所遇到的问题。以下是本阶段完整的程序码:
class FirebaseNoteRepository: NoteRepository {
private val firestore = FirebaseFirestore.getInstance()
private val allNotesSubject = BehaviorSubject.create<List<Note>>()
private val updatingNoteSubject = BehaviorSubject.createDefault(Optional.empty<Note>())
private val query = firestore.collection(COLLECTION_NOTES)
.limit(100)
init {
query.addSnapshotListener { result, e ->
result?.let { onSnapshotUpdated(it) }
}
updatingNoteSubject
.throttleLast(300, TimeUnit.MILLISECONDS)
.toIO()
.subscribe { optNote ->
optNote.ifPresent { setNoteDocument(it) }
}
updatingNoteSubject
.filter { it.isPresent }
.debounce(300, TimeUnit.MILLISECONDS)
.subscribe {
updatingNoteSubject.onNext(Optional.empty<Note>())
}
}
override fun getAllNotes(): Observable<List<Note>> {
return Observables.combineLatest(updatingNoteSubject, allNotesSubject)
.map { (optNote, allNotes) ->
optNote.map { note ->
val noteIndex = allNotes.indexOfFirst { it.id == note.id }
allNotes.subList(0, noteIndex) + note + allNotes.subList(noteIndex + 1, allNotes.size)
}.orElseGet { allNotes }
}
}
override fun putNote(note: Note) {
updatingNoteSubject.onNext(Optional.of(note))
}
private fun onSnapshotUpdated(snapshot: QuerySnapshot) {
val allNotes = snapshot
.map { document -> documentToNotes(document) }
allNotesSubject.onNext(allNotes)
}
private fun setNoteDocument(note: Note) {
val noteData = hashMapOf(
FIELD_TEXT to note.text,
FIELD_COLOR to note.color.color,
FIELD_POSITION_X to note.position.x.toString(),
FIELD_POSITION_Y to note.position.y.toString()
)
firestore.collection(COLLECTION_NOTES)
.document(note.id)
.set(noteData)
}
private fun documentToNotes(document: QueryDocumentSnapshot): Note {
val data: Map<String, Any> = document.data
val text = data[FIELD_TEXT] as String
val color = YBColor(data[FIELD_COLOR] as Long)
val positionX = data[FIELD_POSITION_X] as String? ?: "0"
val positionY = data[FIELD_POSITION_Y] as String? ?: "0"
val position = Position(positionX.toFloat(), positionY.toFloat())
return Note(document.id, text, position, color)
}
companion object {
const val COLLECTION_NOTES = "Notes"
const val FIELD_TEXT = "text"
const val FIELD_COLOR = "color"
const val FIELD_POSITION_X = "positionX"
const val FIELD_POSITION_Y = "positionY"
}
}
改完了程序码之後,再重新建置运行在手机上,就会发现之前的问题已经解决了!
不知道你有没有发现到,上述的程序码中没有用到之前介绍的 Flowable,那这边为什麽使用 Observable 还是可以用 throttleLast
解决卡顿的问题呢?还有在 throttleLast
之後为什麽型别没有转成 Flowable 呢?这个问题留给大家去想想,明天再来解答!
<<: 【Day04】Git 版本控制 - Git 安装与设定(Windows、macOS、Ubuntu)
1189. Maximum Number of Balloons https://leetcode....
Fluent bit回顾 Log Agent - Fluent Bit 简介 Log Agent -...
团队目标的建立 提供对应的元素,催化IT团队与事业体相呼应的目标 身为产品经理,需要清楚知道公司目标...
写在前面 test for placeholder test for placeholder tes...
Keychain Apple Keychain 是一个非常流行且功能强大的 Swift 工具,每个 ...