RxJS 工具类型 Operators (1) - tap / toArray / delay / delayWhen

今天要介绍的是「工具类型」的 Operators,也都不太困难,很好理解,继续轻松学习吧!

tap

在之前文章介绍 functional programming 在 RxJS 应用时,已经稍微介绍过 tap 这个 operator 了,今天来更深入的介绍一下。

tap 主要就是用来处理 side effect 的,在使用各种 operators 时,我们应该尽量让程序内不要发生 side effect,但真的有需要处理 side effect 时,可以使用 tap 把「side effect」和「非 side effect」隔离,未来会更加容易找到问题发生的地方。

interval(1000).pipe(
  map(data => data * 2),
  // 使用 tap 来隔离 side effect
  tap(data => console.log('目前资料', data)),
  map(data => data + 1),
  tap(data => console.log('目前资料', data)),
  take(10)
).subscribe((data) => {
  console.log(`tap 示范 (1): ${data}`);
});

tap 拿掉时,我们完全可以知道整个运作的过程,而实际上加入 tap 後运作过程也不会因此改变,我们只是在 tap 中处理 side effect 的逻辑 (如 console.log、DOM 操作等)。

一般来说,在整个 Obsevable 运作时只建议在 Subscribe 内运行执行 side effct 程序码,但若在 Observable 资料流动中执行 side effect 时,使用 tap 来处理就对了!

上面的程序中,我们都是接受来源 Observable 的 next() 事件资料;除此之外,tap 也可以用来接收来源 Observable 的 errorcomplete 资讯,只要传入一个观察者物件即可:

const observer = {
    next: (data) => console.log(`tap 示范 (2): ${data}`),
    error: (error) => console.log(`tap 示范 (2): 发生错误 - ${error}`),
    complete: () => console.log('tap 示范 (2): 结束'),
  };

interval(1000).pipe(
  take(3),
  map(data => data * 2),
  map(data => data + 1),
  tap(observer)
).subscribe();
// tap 示范 (2): 1
// tap 示范 (2): 3
// tap 示范 (2): 5
// tap 示范 (2): 结束

上面的例子会收到每次事件,以及完成的资讯,接下来看看发生错误的情境:

interval(1000).pipe(
  take(3),
  map(data => data * 2),
  map(data => data + 1),
  // 当资料为 3 时,抛出错误
  switchMap(data => iif(() => data === 3, throwError('error'), of(data))),
  tap({
    next: (data) => console.log(`tap 示范 (3): ${data}`),
    error: (error) => console.log(`tap 示范 (3): 发生错误 - ${error}`),
    complete: () => console.log('tap 示范 (3): 结束'),
  })
).subscribe();
// tap 示范 (3): 1
// tap 示范 (3): 发生错误 - error

还有一种写法,是直接在 tap 内传入三个 callback function 分别代表 next()error()complete()

interval(1000).pipe(
  take(3),
  map(data => data * 2),
  map(data => data + 1),
  tap(
    // 处理 next
    (data) => console.log(`tap 示范 (4): ${data}`),
    // 处理 error
    (error) => console.log(`tap 示范 (4): 发生错误 - ${error}`),
    // 处理 complete
    () => console.log('tap 示范 (4): 结束'),
  )
).subscribe();
// tap 示范 (4): 1
// tap 示范 (4): 3
// tap 示范 (4): 5
// tap 示范 (4): 结束

传入 3 个 callback 的写法在 RxJS 7 将被标示为弃用 (只传入一个处理 next() 没问题),没意外的话 RxJS 8 会移除,届时要处理 error()complete() 需要使用传入 Observer 物件的写法。

程序码:https://stackblitz.com/edit/mastering-rxjs-operator-tap

toArray

toArray 在来源 Observable 发生事件时,不会立即发生在新的 Observable 上,而是将资料暂存起来,当来源 Observable 结束时,将这些资料组合成一个阵列发生在新的 Observable 上。

interval(1000)
  .pipe(
    take(3),
    toArray()
  )
  .subscribe(data => {
    console.log(`toArray 示范: ${data}`);
  });
// toArray 示范: 0,1,2

弹珠图:

---0---1---2|
toArray()
-----------([0, 1, 2]|)

程序码:https://stackblitz.com/edit/mastering-rxjs-operator-toarray

toArray 还有一种妙用,就是拿来处理阵列相关的逻辑,我们可以使用 offromrange 等建立 Observable 的 operator 来产生一个固定的 Observable,透过 Observable 及 pipe 是一笔一笔资料流入所有 operators 的特性,来处理资料:

from([1, 2, 3, 4, 5, 6, 7, 8, 9]).pipe(
  map(value => value * value),
  filter(value => value % 3 === 0),
  toArray()
).subscribe(result => console.log(result));

乍看之下跟直接使用阵列的操作没什麽不同:

[1, 2, 3, 4, 5, 6, 7, 8, 9]
  .map(value => value * value)
  .filter(value => value % 3 === 0);

但实际上效能会好上很多,因为 Observable 不会把整个阵列全部带入 map 再带入 filter 内;同时还可以享有更多 operators 的支援!

delay

delay 会让来源 Observable 延迟一个指定时间(毫秒)再开始。

of(1, 2, 3).pipe(
  delay(1000)
).subscribe(data => {
  console.log(`delay 示范: ${data}`);
});
// (等候 1 秒钟)
// delay 示范: 1
// delay 示范: 2
// delay 示范: 3

弹珠图:

(123|)
delay(1000)
---(123|)

程序码:https://stackblitz.com/edit/mastering-rxjs-operator-delay

delayWhen

delayWhen 可以自行决定来源 Observable 每次事件延迟发生的时机点,在 delayWhen 内需要传入一个 delayDurationSelector callback function,delayWhen 会将事件资讯传入,而 delayDurationSelector 需要回传一个 Observable,当此 Observable 发生新事件时,才会将来源事件值发生在新的 Observable 上:

const delayFn = (value) => {
  return of(value).pipe(delay(value % 2 * 2000));
}

interval(1000).pipe(
  take(3),
  delayWhen(value => delayFn(value))
).subscribe(data => {
  console.log(`delayWhen 示范 (1): ${data}`);
});
// delayWhen 示范 (1): 0
// (原本应该发生事件 1,但被延迟了)
// delayWhen 示范 (1): 2
// delayWhen 示范 (1): 1

上面例子中,我们自定要延迟的时间点,当资料是偶数时,因为 delay(0) 的关系不会有任何延迟,而当资料是奇数时。因为 delay(2000) 的关系,所以会延迟两秒钟,因此事件资料 1 会比较晚发生。

弹珠图:

----0----1----2|
delayWhen(value => of(value).pipe(delay(value % 2 * 2000)))
----0---------2----1|
         ^ 延迟两秒发生
                   ^ 所以到这时才发生事件

delayWhen 还有第二个参数(非必须),是一个 subscriptionDelay Observable,delayWhen 可以透过这个 Observable 来决定来源 Observable 开始的时机点;当整个 Observable 订阅开始时,delayWhen 会订阅这个 subscriptionDelay Observable ,当事件发生时,才真正订阅来源 Observable,然後退订阅 subscriptionDelay Observable。

interval(1000).pipe(
  take(3),
  delayWhen(
    value => delayFn(value),
    fromEvent(document, 'click')
  )
).subscribe(data => {
  console.log(`delayWhen 示范 (2): ${data}`);
});
// ...(当按下滑鼠时,才开始)
// delayWhen 示范 (1): 0
// (原本应该发生事件 1,但被延迟了)
// delayWhen 示范 (1): 2
// delayWhen 示范 (1): 1

弹珠图:

click$   ------c...
source$  ----0----1----2|
delayWhen(
  value => delayFn(value),
  click$
)
         ----------0---------2----1|
               ^ click$ 事件发生
                   ^ 依照 delayFn 的逻辑决定资料延迟时间

程序码:https://stackblitz.com/edit/mastering-rxjs-operator-delaywhen

本日小结

  • tap:可以用来隔离「side effect」以及「非 side effect」,在 Observable 运作过程中,不论是 next()error()complete(),只要有 side effect 逻辑都建议放到 tap 内处理。
  • toArray:将来源 Observable 资料汇整成一个阵列。toArray 可以应用来处理阵列资料。
  • delay:延迟一段时间後,才开始运行来源 Observable。
  • delayWhen:可自行设计 Observable,来决定来源 Observable 每个事件的延迟逻辑。

相关资源


<<:  DAY28 mongodb aggregate(2)

>>:  Day 30 总结

DAY25神经网路(续三)

昨天介绍完浅层神经网路演算法,今天要来研究浅层神经网路程序: 首先要先设定学习率和隐藏神经元个数,在...

Computed vs Methods

昨天已经把JSON档建置好了!今天就可以取用JSON档的资料然後实作出Methods和Compute...

Day07 - Flowchart versus State Diagram 让我们比一比

我们根据昨天的需求画出以下两张图 1. Flowchart 我们先看看 Flowchart 图中的白...

输家的特质

股市只有两种人:输家跟赢家。 其中长期赢家只占极少部分,若我们不想成为绝大部分的输家,以下几种行为请...

[iT铁人赛Day17]JAVA的函数(上篇)

函数要讲其实可以讲很多,但是这边只稍微做一个简单的介绍就好了 今天先来做个简单的介绍以及范例 函数的...