认识 RxJS 的 Scheduler

今天我们来认识一下 RxJS 的 Scheduler,虽然在一般使用 RxJS 开发应用程序时几乎不会用到 Scheduler,但 Scheduler 可以说是控制 RxJS 至关重要的角色,偶尔也有可能会需要使用 Scheduler 来调整事件发生时机!

到底什麽是 Scheduler?就让我们继续看下去吧!

快速认识 Scheduler

Schedule 这个单字本身有「安排」的意思,因此 Scheduler 可以想像成是「负责安排」的人,具体来说安排什麽呢?就是安排 Observable 内「事件」该如何发生的时机点。

举个例子来说,请思考一下以下程序会以什麽样的顺序印出资料?

console.log('start');
of(1, 2, 3)
  .subscribe({
    next: result => console.log(result),
    complete: () => console.log('complete')
  });
console.log('end');

应该不难判断,由於 of(1, 2, 3) 事件是「同步执行」的,因此结果为:

start
1
2
3
complete
end

那麽有没有办法让 of(1, 2, 3) 变成「非同步执行」呢?当然是有的,我们可以在 of 参数的最後放上一个 Scheduler 来安排资料处理的顺序,以下范例透过 asyncScheduler 来帮助我们将 of(1, 2, 3) 的资料变成非同步执行的程序:

import { asyncScheduler, of } from 'rxjs';

console.log('start');
of(1, 2, 3, asyncScheduler)
  .subscribe({
    next: result => console.log(result),
    complete: () => console.log('complete')
  });
console.log('end');

程序码:https://stackblitz.com/edit/mastering-rxjs-make-observable-async

此时因为 of(1, 2, 3) 是非同步执行的,结果就变成:

start
end
1
2
3
complete

很简单吧!透过 Scheduler 可以帮助我们快速的将同步程序切换成非同步程序,当然实际上不只这样而已,还有许多 Scheduler 的使用技巧,但在介绍更多内容之前,先让我们简单(?)理解一下非同步处理的核心逻辑。

认识 JavaScript 处理非同步的原理

我们都知道可以使用 PromisesetTimeout 让一段程序变成非同步执行,那麽以下程序印出的结果为何?

setTimeout(() => {
  console.log('A');
});
Promise.resolve('B')
  .then(result => console.log(result));

既然都变成非同步了,理论上应该先变成非同步先处理吧,所以应该是先印出 A 再印出 B 吗?很可惜的不是,答案是先印出 B 再印出 A,为什麽会这样呢?这跟 JavaScript 的非同步处理方式有关。

认识 Microtask 与 Macrotask

首先,我们必须先知道的是当 JavaScript 开始执行一段程序时,会产生一个「工作阶段 (task)」,并「同步」的执行相关的程序码,而当遇到 PromisesetTimeout 这类非同步呼叫时,会先将里面的程序码丢到一个「等待区 (task queue)」,然後继续处理其他同步的程序码,直到目前同步的程序码处理完後,再从「等待区」将程序码拿出来以「同步」的方式执行里面的程序码。

由於等待区是一个伫列 (queue) 的资料结构,伫列的特色就是先进先出,因此先进入等待区的程序会先被执行,以下两段非同步程序呼叫执行後会先印出 A 再印出 B

Promise.resolve('A')
  .then(result => console.log(result));

Promise.resolve('B')
  .then(result => console.log(result));

那麽为什麽稍早的程序中 setTimeout 执行顺序跟我们想的不一样呢?这是因为所谓的「等待区」在 JavaScript 处理中其实会有两种:

  • Microtask queue:如 Promise 或 node.js 中的 process.nextTick,都会丢到 microtask queue 中。
  • Macrotask queue:如 setTimeoutrequestAnimationFrame,都会丢到 marcotask queue 中。

JavaScript 在同步执行完毕时,会先将所有的 microtask queue 中的程序执行完毕,确认清空 microtask queue 的工作後,再处理下一个 macrotask queue 中的工作,也因此同时有 PromisesetTimeout() 呼叫时,promise 会进入 microtask queue 而 setTimeout 则进入 macrotask queue,所以 Promise 的程序会先进行处理,之後才处理 setTimeout 的程序。

影片支援

为什麽要切成这样呢?中间有不少原因,还会牵扯到另一个大主题「event loop」,但这些不是今天的主题,因此就不花篇幅介绍了,有兴趣可以看看以下影片 (有中文字幕):

大方向是,在每次 macrotask 结束前,会清空 microtask 所有工作,当清空後才会进行画面渲染,接着处理下一次的 macrotask,因此 macrotask 作用在每次画面渲染的前後,而 microtask 则不是。

现在我们只要知道非同步运作有一个粒度小的 microtask 以及一个粒度大的 macrotask ,以及画面渲染时机的不同,就足以帮助我们更加理解 RxJS 的 Scheduler 罗。

再次认识 Scheduler

接着我们再来仔细认识一下 RxJS 的 Scheduler 到底是什麽,Scheduler 实际上就是用来帮助我们决定程序要「同步」或是「非同步」执行的一个角色;在同步执行时,我们可以用来确保不同的 「同步 Observable」事件会在一致的时间点 (frame) 触发;而在非同步执行时,则可以用来控制使用 microtask 还是 macrotask 处理事件。

从文字来看稍微有点抽象,之後我们会有更多程序码解释。

Scheduler 的种类

Scheduler 依照运作逻辑分成以下几类:

  • null:也就是不指定 Scheduler,那们就是同步执行的。
  • queueScheduler:也是同步处理的,但在执行时 RxJS 会将所有同步的 Observable 资料放到 queue 内,再依序执行,稍後我们会说明这和 null 有什麽区别。
  • asapScheduler:非同步处理,与使用 Promise 一样的非同步处理层级,也就是使用 microtask
  • asyncScheduler:非同步处理,处理方式同 setIntervael ,属於 macrotask 层级
  • animationFrameScheduler:非同步处理,处理方式同 requestAnimationFrame,也是属於 macrotask 层级,但更适用於动画处理 (效能较优)

建立类型 Operators 使用 Scheduler

在建立类型的 operators 如 offromtimer 等,都可以在最後一个参数指定要使用哪个 Scheduler 如:

of(1, 2, 3, asyncScheduler);

此时 operator 会帮我们将每个事件值用指定的处理方式来安排事件发生,以下举个例子来说明不同 Scheduler 执行的结果。

https://ithelp.ithome.com.tw/upload/images/20201019/200206171QUYTGXDBW.jpg

画面中有一个红色方块,我们将示范使用不同的 Scheduler 来移动这个方块,并看看执行过果,成式码如下:

const initPosition = () => {
  const blockElement = document.querySelector("#block") as HTMLElement;
  blockElement.style.left = "100px";
  blockElement.style.top = "100px";
};

const updatePositionByScheduler = (scheduler: SchedulerLike) => {
  initPosition();

  setTimeout(() => {
    console.log("start");

    range(0, 100, scheduler).subscribe({
      next: val => {
        const blockElement = document.querySelector("#block") as HTMLElement;
        blockElement.style.left = 100 + val + "px";
        blockElement.style.top = 100 + val + "px";
      },
      complete: () => console.log("complete")
    });
    console.log("end");
  }, 300);
};

fromEvent(document.querySelector("#goNull"), "click").subscribe(() => {
  updatePositionByScheduler(null);
});

fromEvent(document.querySelector("#goQueue"), "click").subscribe(() => {
  updatePositionByScheduler(queueScheduler);
});

fromEvent(document.querySelector("#goAsap"), "click").subscribe(() => {
  updatePositionByScheduler(asapScheduler);
});

fromEvent(document.querySelector("#goAsync"), "click").subscribe(() => {
  updatePositionByScheduler(asyncScheduler);
});

fromEvent(document.querySelector("#goAnimationFrame"), "click").subscribe(
  () => {
    updatePositionByScheduler(animationFrameScheduler);
  }
);

updatePositionByScheduler 负责根据指定的 Scheduler 来移动红色方块,在程序开始时,先呼叫 initPosition 重置红色方块的起始位置,接着依照不同的 Scheduler 依序产生一个 0~99 数值,并订阅此 Observable 来更新画面。

程序码已经放在 StackBlitz 上:

https://stackblitz.com/edit/mastering-rxjs-schedulers

可以尝试按按看每个按钮,并比较一下画面更新的方式以及 console.log 输出的顺序,以下简单说明一下执行结果:

使用 null

由於 range() 本身是同步执行的,因此会在一个工作阶段 (task) 中全部跑完,可以直接用同步执行的思维去想就好,输出结果为:

start
complete
end

由於执行完才会渲染画面,因此红色框框会从左上角立刻出现在右下角。

https://ithelp.ithome.com.tw/upload/images/20201019/20020617aTLUeJ237n.jpg

使用 queueScheduler

使用 queueScheduler 时,资料依然是「同步执行」的,因此结果与使用 null 完全一样,但在一个同步工作阶段中,会再使用 queue 将资料包装起来处理; queueScheduler 做这件事情的目的是什麽?我们在後续说明。

https://ithelp.ithome.com.tw/upload/images/20201019/200206175PlrRLonrG.jpg

使用 asapScheduler

asapScheduler 会将每次 Observable 事件值都用「非同步」的方式处理,因此执行顺序为:

start
end
complete -> 因为是非同步执行 

asapScheduler 的非同步行为会进入 microtask,而再 microtask 阶段是不会处理画面渲染的,因此画面中的红色方块虽然会「非同步的」被更新座标,但会在最後「直接出现在右下角」。

https://ithelp.ithome.com.tw/upload/images/20201019/20020617MpTX85AGee.jpg

使用 asyncScheduler

asyncScheduler 也会将每次 Observable 事件值使用「非同步」的方式处理,所以执行顺序一样是:

start
end
complete

不过 asyncScheduler 是使用 macrotask 处理非同步呼叫,而画面渲染行为会发生在每次 macrotask 结束之间,因此每次 Observable 的事件跟事件发生之间会产生画面渲染,结果就是可以看到红色方块往右下角移动的动画。

https://ithelp.ithome.com.tw/upload/images/20201019/20020617oHwHz53ohc.jpg

使用 animationFrameScheduler

animationFrameScheduler 触发的时机点和画面重绘 (repaint) 定义的时机点一样,就跟我们使用 JavaScript 的 requestAnimationFrame 一样,基本上是 1/60 秒发生一次,使用 requestAnimationFrame 的时机通常是使用 JavaScript 处理动画,可以避免使用 setTimeout((), 1) 运算太频繁,但画面跟新频率不需要这麽高的问题。

由於是非同步执行,因此执行结果与前面相同,但可以看到画面上的红色方块以比较慢的速度移动,原则上会是每 1/60 移动一次。

https://ithelp.ithome.com.tw/upload/images/20201019/20020617gSOlhGilSB.jpg

不同建立类型 Operators 预设的 Scheduler

以下是可以在最後一个参数设定 Scheduler 的建立类型 operators:

  • bindCallback
  • bindNodeCallback
  • combineLatest
  • concat
  • empty
  • from
  • fromPromise
  • interval
  • merge
  • of
  • range
  • throw
  • timer

建立类型的 operators 有部分会有预设 operator,尤其是跟时间控制有关的,例如 interval 的方法签章如下:

interval(period: number = 0, scheduler: SchedulerLike = async): Observable<number>

可以看到是使用 asyncScheduler,而有些则没有预设 scheduler,那麽就依照该 operator 预设的逻辑来决定同步或非同步,例如 of 预设的 Scheduler 是 null,因此是同步处理,而 timer 预设也是使用 asyncScheduler

使用 scheduled operator 避免 of 和 from 模糊不清

另外还有一个重点 offrom 里面的参数是不固定的,因此直接将 Scheduler 放在最後会相对不容易理解,也会有「无法将 Scheduler 物件本身当作事件值发送」的问题,因此 RxJS 还有一个建立类型 Operator - scheduled 来处理这个问题。

scheduled 只能放两个参数,第一个参数是资料,如果有多笔则使用阵列处理;第二个参数则是要使用哪个 Scheduler。因此以下程序不建议使用:

of(1, 2, asyncScheduler);

建议改使用

scheduled([1, 2], asyncScheduler);

使用 Scheduler 控制来源 Observable

除了在建立类型 operators 最後参数加上 Scheduler 外,如果想控制一个来源 Observable 发生的时机点,可以使用 observerOn 这个 operator,依照 Scheduler 来控制收到事件的时机点,如:

console.log('start');
of(1, 2)
  .pipe(observeOn(asyncScheduler))
  .subscribe({
    next: result => console.log(result),
    complete: () => console.log('complete')
  });
console.log('end');

程序码:https://stackblitz.com/edit/mastering-rxjs-operator-observeon

由於我们将来源事件用非同步的方式接收处理,因此 startend 会先印出,然後才依序印出 12complete

需要特别注意的是 of(1, 2) 依然是「同步处理」行为,只是在订阅时透过 observeOn(asyncScheduler) 将收到的资料放到 macrotask 内,因此以下两段程序码处理行为完全不同:

// (1)
of(1, 2, asyncScheduler);
// 注意:此用法已被标示弃用,这里纯粹拿来比较用
// 实际上建议写成 scheduled([1, 2], asyncScheduler)
// (2)
of(1, 2).pipe(observeOn(asyncScheduler));

第一段程序码是把「每一个值分别放入 macrotaask 中」。

https://ithelp.ithome.com.tw/upload/images/20201019/20020617ob8Lxku3Fu.jpg

第二段程序码则是「产生 1 和 2 後,再将这两个资料放入 macrotask 中」。

https://ithelp.ithome.com.tw/upload/images/20201019/20020617c8ocN04sfC.jpg

这个观念非常重要!如果搞错,很可能整个执行顺序都跟想的不同了。

使用 Scheduler 控制订阅时机

observeOn 可以用来控制「处理来源 Observable 事件的时机」,而我们也可以用 subscribeOn 来控制「订阅来源 Observable 的时机」,以下程序一样是非同步执行的。

console.log('start');
of(1, 2)
  .pipe(subscribeOn(asyncScheduler))
  .subscribe({
    next: result => console.log(result),
    complete: () => console.log('complete')
  });
console.log('end');

程序码:https://stackblitz.com/edit/mastering-rxjs-operator-subscribeon

执行结果:

start
end
1
2
complete

一样的,别忘记 of(1, 2) 本身依然是「同步执行」程序喔!

null 与 queueScheduler 的差别

最後,我们回到「同步」程序,看看使用 null 与使用 queueScheduler 到底有什麽不同,先想想看以下程序会如何执行:

const sourceA$ = of(1, 2);
const sourceB$ = of(3, 4);

combineLatest([sourceA$, sourceB$])
  .pipe(map(([a, b]) => a + b))
  .subscribe(result => {
    console.log(result);
  });

我们已经知道 combineLatest 会订阅参数内的 Observables,当每个 Observables 发生事件时,将这个时与其他的 Observables 「最後一次事件组合在一起」,所以理论上过程如下:

  • sourceA$ 发生事件 1,此时 sourceB$ 还没有任何事件发生
  • sourceB$ 发生事件 3,此时跟 sourceA$ 最後一次事件值 1 组合在一起,得到 3 + 1 = 4
  • sourceA$ 发生事件 2,此时跟 sourceB$ 最後一次事件值 3 组合在一起,得到 2 + 3 = 5
  • sourceB$ 发生事件 4,此时跟 sourceA$ 最後一次事件值 2 组合在一起,得到 4 + 2 = 6
  • sourceA$ 结束
  • sourceB$ 结束

因此印出的值应该是 456,实际上是这样吗?很可惜,实际结果是:只印出 56!怎麽会这样呢?

不使用 scheduler 的同步执行顺序

别忘记了 of 是「同步执行」的,因此在使用 combineLatest 分别订阅两个 Observable 时,实际上会变成类似以下程序码的执行顺序:

const sourceA$ = of(1, 2);
const sourceB$ = of(3, 4);
sourceA$.subscribe(...);
sourceB$.subscribe(...);

看出问题了吗?因为「同步执行」的关系,在订阅 sourceA$ 时会先同步产生 12 事件後结束;接着才是订阅 sourceB$ 同步产生 34 事件然後结束,因此正确运作过程为:

  • sourceA$ 发生事件 1,此时 sourceB$ 还没有任何事件发生
  • sourceA$ 发生事件 2,此时 sourceB$ 还没有任何事件发生
  • sourceA$ 结束
  • sourceB$ 发生事件 3,此时跟 sourceA$ 最後一次事件值 2 组合在一起,得到 3 + 2 = 5
  • sourceB$ 发生事件 4,此时跟 sourceA$ 会後一次事件值 2 组合在一起,得到 4 + 2 = 6
  • sourceB$ 结束

https://ithelp.ithome.com.tw/upload/images/20201019/20020617QxGWlbX8S3.jpg

以上过程都是「同步执行」的,也就是在一个执行阶段 (task) 内依序完成。

因此结果只印出 56 啦!那麽要怎麽达到我们预期的 456 的结果呢?这时候就是使用 queueScheduler 的时机了。

使用 queueScheduler 的执行顺序

queueScheduler 一样是同步处理的,但在产生资料时,会将资料存入一个伫列 (queue) 内,每个 Observable 都会有自己的 queue,而 queue 除了伫列本身概念外,也可以想像成是一个「虚拟的时间窗格 (frame)」,因此当订阅发生时,整个资料流就会依照这个 queue 内的虚拟时间窗格「一格一格的产生事件」。

因此上述的 sourceA$sourceB$ 若是使用了 queueScheduler,则 sourceA$ 的事件 1sourceB$ 的事件 2 就会同时产生,而要达到这个目的,combineLatest 也必须将资料用「时间窗格」的方式订阅。

https://ithelp.ithome.com.tw/upload/images/20201019/20020617RB14j3C9I6.jpg

最终的程序码为:

const sourceA$ = scheduled([1, 2], queueScheduler);
const sourceB$ = scheduled([3, 4], queueScheduler);

combineLatest([sourceA$, sourceB$])
  .pipe(
    subscribeOn(queueScheduler),
    map(([a, b]) => a + b)
  )
  .subscribe(result => {
    console.log(result);
  });

程序码:

https://stackblitz.com/edit/mastering-rxjs-queuescheduler?file=index.ts

有了这些 Scheduler,控制时间就更容易啦!

本日小结

今天我们学习了 JavaScript 处理非同步程序的基础原理,以及 RxJS 如何使用这些原理来设计出各种的 Scheduler,帮助我们「安排」事件发生的时机点!

  • queueScheduler:同步处理事件,但有个虚拟的时间窗格概念
  • asapScheduler:非同步处理事件,使用 microtask,如 Promise
  • asyncScheduler:非同步处理事件,使用 macrotask,如 setTimeout
  • animationFrameScheduler:非同步处理事件,使用 macrotask,背後处理雷同 requestAnimationFrame,通常主要用於动画处理

另外我们也可以使用 observeOn 控制事件来源处理时机点,以及使用 subscribeOn 处理订阅事件时机点。

虽然一般开发应用程序时,不太会使用到 Scheduler 控制,但当遇到时,就不怕不知道如何是好罗!

相关资源


<<:  强型闯入DenoLand[32] - 使用 Deno 串接 MongoDB

>>:  Elastic Kibana Infographic: 资讯图像化可以炫技到什麽程度 (31)

SQL与NoSQL的连结(一)

对於资料库管理员而言, 另一项重要任务是异质平台之间的资料沟通. 接下来实作从SQL到NSQL的资料...

day19 : redisDB keyDB on K8S (上)

redisDB是一个快速轻量的key-value资料库,因为可以无状态的执行,我个人认为非常适合运行...

DAY 06 Variable

终於进入到介绍 SASS 特点的区块啦~ 第一个要介绍的就是大名鼎鼎的 变数 Variable 的部...

[区块链&DAPP介绍 Day19] contract 案例1 - 抢红包

接下来几天会来模拟一下,实际合约的案例,来更深入了解一下 solidity 语法 首先我们先设定一个...

30天学会C语言: Day 19-考试常用的输入格式

多行输入 or 单行多个数值 如果输入的行数或一行中输入的数值数量固定且非常多,可以用回圈达成 #i...