今天我们来认识一下 RxJS 的 Scheduler,虽然在一般使用 RxJS 开发应用程序时几乎不会用到 Scheduler,但 Scheduler 可以说是控制 RxJS 至关重要的角色,偶尔也有可能会需要使用 Scheduler 来调整事件发生时机!
到底什麽是 Scheduler?就让我们继续看下去吧!
Schedule 这个单字本身有「安排」的意思,因此 Scheduler 可以想像成是「负责安排」的人,具体来说安排什麽呢?就是安排 Observable 内「事件」该如何发生的时机点。
举个例子来说,请思考一下以下程序会以什麽样的顺序印出资料?
console.log('start');
of(1, 2, 3)
.subscribe({
next: result => console.log(result),
complete: () => console.log('complete')
});
console.log('end');
应该不难判断,由於 of(1, 2, 3)
事件是「同步执行」的,因此结果为:
start
1
2
3
complete
end
那麽有没有办法让 of(1, 2, 3)
变成「非同步执行」呢?当然是有的,我们可以在 of
参数的最後放上一个 Scheduler 来安排资料处理的顺序,以下范例透过 asyncScheduler
来帮助我们将 of(1, 2, 3)
的资料变成非同步执行的程序:
import { asyncScheduler, of } from 'rxjs';
console.log('start');
of(1, 2, 3, asyncScheduler)
.subscribe({
next: result => console.log(result),
complete: () => console.log('complete')
});
console.log('end');
程序码:https://stackblitz.com/edit/mastering-rxjs-make-observable-async
此时因为 of(1, 2, 3)
是非同步执行的,结果就变成:
start
end
1
2
3
complete
很简单吧!透过 Scheduler 可以帮助我们快速的将同步程序切换成非同步程序,当然实际上不只这样而已,还有许多 Scheduler 的使用技巧,但在介绍更多内容之前,先让我们简单(?)理解一下非同步处理的核心逻辑。
我们都知道可以使用 Promise
或 setTimeout
让一段程序变成非同步执行,那麽以下程序印出的结果为何?
setTimeout(() => {
console.log('A');
});
Promise.resolve('B')
.then(result => console.log(result));
既然都变成非同步了,理论上应该先变成非同步先处理吧,所以应该是先印出 A
再印出 B
吗?很可惜的不是,答案是先印出 B
再印出 A
,为什麽会这样呢?这跟 JavaScript 的非同步处理方式有关。
首先,我们必须先知道的是当 JavaScript 开始执行一段程序时,会产生一个「工作阶段 (task)」,并「同步」的执行相关的程序码,而当遇到 Promise
或 setTimeout
这类非同步呼叫时,会先将里面的程序码丢到一个「等待区 (task queue)」,然後继续处理其他同步的程序码,直到目前同步的程序码处理完後,再从「等待区」将程序码拿出来以「同步」的方式执行里面的程序码。
由於等待区是一个伫列 (queue) 的资料结构,伫列的特色就是先进先出,因此先进入等待区的程序会先被执行,以下两段非同步程序呼叫执行後会先印出 A
再印出 B
:
Promise.resolve('A')
.then(result => console.log(result));
Promise.resolve('B')
.then(result => console.log(result));
那麽为什麽稍早的程序中 setTimeout
执行顺序跟我们想的不一样呢?这是因为所谓的「等待区」在 JavaScript 处理中其实会有两种:
Promise
或 node.js 中的 process.nextTick
,都会丢到 microtask queue 中。setTimeout
或 requestAnimationFrame
,都会丢到 marcotask queue 中。JavaScript 在同步执行完毕时,会先将所有的 microtask queue 中的程序执行完毕,确认清空 microtask queue 的工作後,再处理下一个 macrotask queue 中的工作,也因此同时有 Promise
和 setTimeout()
呼叫时,promise
会进入 microtask queue 而 setTimeout
则进入 macrotask queue,所以 Promise
的程序会先进行处理,之後才处理 setTimeout
的程序。
为什麽要切成这样呢?中间有不少原因,还会牵扯到另一个大主题「event loop」,但这些不是今天的主题,因此就不花篇幅介绍了,有兴趣可以看看以下影片 (有中文字幕):
大方向是,在每次 macrotask 结束前,会清空 microtask 所有工作,当清空後才会进行画面渲染,接着处理下一次的 macrotask,因此 macrotask 作用在每次画面渲染的前後,而 microtask 则不是。
现在我们只要知道非同步运作有一个粒度小的 microtask 以及一个粒度大的 macrotask ,以及画面渲染时机的不同,就足以帮助我们更加理解 RxJS 的 Scheduler 罗。
接着我们再来仔细认识一下 RxJS 的 Scheduler 到底是什麽,Scheduler 实际上就是用来帮助我们决定程序要「同步」或是「非同步」执行的一个角色;在同步执行时,我们可以用来确保不同的 「同步 Observable」事件会在一致的时间点 (frame) 触发;而在非同步执行时,则可以用来控制使用 microtask 还是 macrotask 处理事件。
从文字来看稍微有点抽象,之後我们会有更多程序码解释。
Scheduler 依照运作逻辑分成以下几类:
null
:也就是不指定 Scheduler,那们就是同步执行的。queueScheduler
:也是同步处理的,但在执行时 RxJS 会将所有同步的 Observable 资料放到 queue 内,再依序执行,稍後我们会说明这和 null
有什麽区别。asapScheduler
:非同步处理,与使用 Promise
一样的非同步处理层级,也就是使用 microtaskasyncScheduler
:非同步处理,处理方式同 setIntervael
,属於 macrotask 层级animationFrameScheduler
:非同步处理,处理方式同 requestAnimationFrame
,也是属於 macrotask 层级,但更适用於动画处理 (效能较优)在建立类型的 operators 如 of
、from
和 timer
等,都可以在最後一个参数指定要使用哪个 Scheduler 如:
of(1, 2, 3, asyncScheduler);
此时 operator 会帮我们将每个事件值用指定的处理方式来安排事件发生,以下举个例子来说明不同 Scheduler 执行的结果。
画面中有一个红色方块,我们将示范使用不同的 Scheduler 来移动这个方块,并看看执行过果,成式码如下:
const initPosition = () => {
const blockElement = document.querySelector("#block") as HTMLElement;
blockElement.style.left = "100px";
blockElement.style.top = "100px";
};
const updatePositionByScheduler = (scheduler: SchedulerLike) => {
initPosition();
setTimeout(() => {
console.log("start");
range(0, 100, scheduler).subscribe({
next: val => {
const blockElement = document.querySelector("#block") as HTMLElement;
blockElement.style.left = 100 + val + "px";
blockElement.style.top = 100 + val + "px";
},
complete: () => console.log("complete")
});
console.log("end");
}, 300);
};
fromEvent(document.querySelector("#goNull"), "click").subscribe(() => {
updatePositionByScheduler(null);
});
fromEvent(document.querySelector("#goQueue"), "click").subscribe(() => {
updatePositionByScheduler(queueScheduler);
});
fromEvent(document.querySelector("#goAsap"), "click").subscribe(() => {
updatePositionByScheduler(asapScheduler);
});
fromEvent(document.querySelector("#goAsync"), "click").subscribe(() => {
updatePositionByScheduler(asyncScheduler);
});
fromEvent(document.querySelector("#goAnimationFrame"), "click").subscribe(
() => {
updatePositionByScheduler(animationFrameScheduler);
}
);
updatePositionByScheduler
负责根据指定的 Scheduler 来移动红色方块,在程序开始时,先呼叫 initPosition
重置红色方块的起始位置,接着依照不同的 Scheduler 依序产生一个 0~99 数值,并订阅此 Observable 来更新画面。
程序码已经放在 StackBlitz 上:
https://stackblitz.com/edit/mastering-rxjs-schedulers
可以尝试按按看每个按钮,并比较一下画面更新的方式以及 console.log
输出的顺序,以下简单说明一下执行结果:
由於 range()
本身是同步执行的,因此会在一个工作阶段 (task) 中全部跑完,可以直接用同步执行的思维去想就好,输出结果为:
start
complete
end
由於执行完才会渲染画面,因此红色框框会从左上角立刻出现在右下角。
使用 queueScheduler
时,资料依然是「同步执行」的,因此结果与使用 null
完全一样,但在一个同步工作阶段中,会再使用 queue 将资料包装起来处理; queueScheduler
做这件事情的目的是什麽?我们在後续说明。
asapScheduler
会将每次 Observable 事件值都用「非同步」的方式处理,因此执行顺序为:
start
end
complete -> 因为是非同步执行
asapScheduler
的非同步行为会进入 microtask,而再 microtask 阶段是不会处理画面渲染的,因此画面中的红色方块虽然会「非同步的」被更新座标,但会在最後「直接出现在右下角」。
asyncScheduler
也会将每次 Observable 事件值使用「非同步」的方式处理,所以执行顺序一样是:
start
end
complete
不过 asyncScheduler
是使用 macrotask 处理非同步呼叫,而画面渲染行为会发生在每次 macrotask 结束之间,因此每次 Observable 的事件跟事件发生之间会产生画面渲染,结果就是可以看到红色方块往右下角移动的动画。
animationFrameScheduler
触发的时机点和画面重绘 (repaint) 定义的时机点一样,就跟我们使用 JavaScript 的 requestAnimationFrame
一样,基本上是 1/60 秒发生一次,使用 requestAnimationFrame
的时机通常是使用 JavaScript 处理动画,可以避免使用 setTimeout((), 1)
运算太频繁,但画面跟新频率不需要这麽高的问题。
由於是非同步执行,因此执行结果与前面相同,但可以看到画面上的红色方块以比较慢的速度移动,原则上会是每 1/60 移动一次。
以下是可以在最後一个参数设定 Scheduler 的建立类型 operators:
bindCallback
bindNodeCallback
combineLatest
concat
empty
from
fromPromise
interval
merge
of
range
throw
timer
建立类型的 operators 有部分会有预设 operator,尤其是跟时间控制有关的,例如 interval
的方法签章如下:
interval(period: number = 0, scheduler: SchedulerLike = async): Observable<number>
可以看到是使用 asyncScheduler
,而有些则没有预设 scheduler,那麽就依照该 operator 预设的逻辑来决定同步或非同步,例如 of
预设的 Scheduler 是 null
,因此是同步处理,而 timer
预设也是使用 asyncScheduler
。
另外还有一个重点 of
和 from
里面的参数是不固定的,因此直接将 Scheduler 放在最後会相对不容易理解,也会有「无法将 Scheduler 物件本身当作事件值发送」的问题,因此 RxJS 还有一个建立类型 Operator - scheduled
来处理这个问题。
scheduled
只能放两个参数,第一个参数是资料,如果有多笔则使用阵列处理;第二个参数则是要使用哪个 Scheduler。因此以下程序不建议使用:
of(1, 2, asyncScheduler);
建议改使用
scheduled([1, 2], asyncScheduler);
除了在建立类型 operators 最後参数加上 Scheduler 外,如果想控制一个来源 Observable 发生的时机点,可以使用 observerOn
这个 operator,依照 Scheduler 来控制收到事件的时机点,如:
console.log('start');
of(1, 2)
.pipe(observeOn(asyncScheduler))
.subscribe({
next: result => console.log(result),
complete: () => console.log('complete')
});
console.log('end');
程序码:https://stackblitz.com/edit/mastering-rxjs-operator-observeon
由於我们将来源事件用非同步的方式接收处理,因此 start
和 end
会先印出,然後才依序印出 1
、2
和 complete
。
需要特别注意的是 of(1, 2)
依然是「同步处理」行为,只是在订阅时透过 observeOn(asyncScheduler)
将收到的资料放到 macrotask 内,因此以下两段程序码处理行为完全不同:
// (1)
of(1, 2, asyncScheduler);
// 注意:此用法已被标示弃用,这里纯粹拿来比较用
// 实际上建议写成 scheduled([1, 2], asyncScheduler)
// (2)
of(1, 2).pipe(observeOn(asyncScheduler));
第一段程序码是把「每一个值分别放入 macrotaask 中」。
第二段程序码则是「产生 1 和 2 後,再将这两个资料放入 macrotask 中」。
这个观念非常重要!如果搞错,很可能整个执行顺序都跟想的不同了。
observeOn
可以用来控制「处理来源 Observable 事件的时机」,而我们也可以用 subscribeOn
来控制「订阅来源 Observable 的时机」,以下程序一样是非同步执行的。
console.log('start');
of(1, 2)
.pipe(subscribeOn(asyncScheduler))
.subscribe({
next: result => console.log(result),
complete: () => console.log('complete')
});
console.log('end');
程序码:https://stackblitz.com/edit/mastering-rxjs-operator-subscribeon
执行结果:
start
end
1
2
complete
一样的,别忘记 of(1, 2)
本身依然是「同步执行」程序喔!
最後,我们回到「同步」程序,看看使用 null
与使用 queueScheduler
到底有什麽不同,先想想看以下程序会如何执行:
const sourceA$ = of(1, 2);
const sourceB$ = of(3, 4);
combineLatest([sourceA$, sourceB$])
.pipe(map(([a, b]) => a + b))
.subscribe(result => {
console.log(result);
});
我们已经知道 combineLatest
会订阅参数内的 Observables,当每个 Observables 发生事件时,将这个时与其他的 Observables 「最後一次事件组合在一起」,所以理论上过程如下:
sourceA$
发生事件 1
,此时 sourceB$
还没有任何事件发生sourceB$
发生事件 3
,此时跟 sourceA$
最後一次事件值 1
组合在一起,得到 3 + 1 = 4
sourceA$
发生事件 2
,此时跟 sourceB$
最後一次事件值 3
组合在一起,得到 2 + 3 = 5
sourceB$
发生事件 4
,此时跟 sourceA$
最後一次事件值 2
组合在一起,得到 4 + 2 = 6
sourceA$
结束sourceB$
结束因此印出的值应该是 4
、5
和 6
,实际上是这样吗?很可惜,实际结果是:只印出 5
和 6
!怎麽会这样呢?
别忘记了 of
是「同步执行」的,因此在使用 combineLatest
分别订阅两个 Observable 时,实际上会变成类似以下程序码的执行顺序:
const sourceA$ = of(1, 2);
const sourceB$ = of(3, 4);
sourceA$.subscribe(...);
sourceB$.subscribe(...);
看出问题了吗?因为「同步执行」的关系,在订阅 sourceA$
时会先同步产生 1
和 2
事件後结束;接着才是订阅 sourceB$
同步产生 3
和 4
事件然後结束,因此正确运作过程为:
sourceA$
发生事件 1
,此时 sourceB$
还没有任何事件发生sourceA$
发生事件 2
,此时 sourceB$
还没有任何事件发生sourceA$
结束sourceB$
发生事件 3
,此时跟 sourceA$
最後一次事件值 2
组合在一起,得到 3 + 2 = 5
sourceB$
发生事件 4
,此时跟 sourceA$
会後一次事件值 2
组合在一起,得到 4 + 2 = 6
sourceB$
结束以上过程都是「同步执行」的,也就是在一个执行阶段 (task) 内依序完成。
因此结果只印出 5
和 6
啦!那麽要怎麽达到我们预期的 4
、5
和 6
的结果呢?这时候就是使用 queueScheduler
的时机了。
queueScheduler
一样是同步处理的,但在产生资料时,会将资料存入一个伫列 (queue) 内,每个 Observable 都会有自己的 queue,而 queue 除了伫列本身概念外,也可以想像成是一个「虚拟的时间窗格 (frame)」,因此当订阅发生时,整个资料流就会依照这个 queue 内的虚拟时间窗格「一格一格的产生事件」。
因此上述的 sourceA$
和 sourceB$
若是使用了 queueScheduler
,则 sourceA$
的事件 1
和 sourceB$
的事件 2
就会同时产生,而要达到这个目的,combineLatest
也必须将资料用「时间窗格」的方式订阅。
最终的程序码为:
const sourceA$ = scheduled([1, 2], queueScheduler);
const sourceB$ = scheduled([3, 4], queueScheduler);
combineLatest([sourceA$, sourceB$])
.pipe(
subscribeOn(queueScheduler),
map(([a, b]) => a + b)
)
.subscribe(result => {
console.log(result);
});
程序码:
https://stackblitz.com/edit/mastering-rxjs-queuescheduler?file=index.ts
有了这些 Scheduler,控制时间就更容易啦!
今天我们学习了 JavaScript 处理非同步程序的基础原理,以及 RxJS 如何使用这些原理来设计出各种的 Scheduler,帮助我们「安排」事件发生的时机点!
queueScheduler
:同步处理事件,但有个虚拟的时间窗格概念asapScheduler
:非同步处理事件,使用 microtask,如 Promise
asyncScheduler
:非同步处理事件,使用 macrotask,如 setTimeout
animationFrameScheduler
:非同步处理事件,使用 macrotask,背後处理雷同 requestAnimationFrame
,通常主要用於动画处理另外我们也可以使用 observeOn
控制事件来源处理时机点,以及使用 subscribeOn
处理订阅事件时机点。
虽然一般开发应用程序时,不太会使用到 Scheduler 控制,但当遇到时,就不怕不知道如何是好罗!
<<: 强型闯入DenoLand[32] - 使用 Deno 串接 MongoDB
>>: Elastic Kibana Infographic: 资讯图像化可以炫技到什麽程度 (31)
对於资料库管理员而言, 另一项重要任务是异质平台之间的资料沟通. 接下来实作从SQL到NSQL的资料...
redisDB是一个快速轻量的key-value资料库,因为可以无状态的执行,我个人认为非常适合运行...
终於进入到介绍 SASS 特点的区块啦~ 第一个要介绍的就是大名鼎鼎的 变数 Variable 的部...
接下来几天会来模拟一下,实际合约的案例,来更深入了解一下 solidity 语法 首先我们先设定一个...
多行输入 or 单行多个数值 如果输入的行数或一行中输入的数值数量固定且非常多,可以用回圈达成 #i...