RxJS Multicast 类 Operator (1) - multicast / publish / refCount / share / shareReplay

还记得之前我们介绍过 Cold Observable v.s. Hot Observable 吗?

Cold Observable 和观察者 (Observer) 是一对一的关系,也就是每次产生订阅时,都会是一个全新的资料流。而 Hot Observable 和观察者则是一对多的关系,也就是每次产生订阅时,都会使用「同一份资料流」,而今天要介绍的 operators 目的就是将 Cold Observable 转成 Hot Observable,让原来的资料流可以共用。

今天的观念会比较复杂一点,打起精神继续看下去吧!

multicast

Cold Observable 每次订阅只会对应一个观察者,因此也可以说成将资料播放 (cast) 给唯一的观察者,应此也称为单播 (unicast),而 multicast 就是来源 Observable 变成多播 (multicast) 的情况。

multicast 内必须指定一个产生 Hot Observable 的工厂方法,也就是建立 SubjectBehaviorSubject 等逻辑。

以下程序将一个单播的 Observable 转换成一个多播的 Observable,并建立一个 Subject 作为多播的资料来源。

const source$ = interval(1000).pipe(
  take(5),
  multicast(() => new Subject())
);
// srouce$ 变成一个 multicast 的 Observable
// 使用 Subject 作为多播的来源

当使用 multicast 时,新的 Observable 型别会是一个 ConnectableObservable,和一般的 Observable 的差别就在於 ConnectableObservable 是多播的,而且必须呼叫它的 connect 方法,才会开始进行多播的动作:

source$.subscribe(data => {
  console.log(`multicast 示范 (1) 第一次订阅: ${data}`);
});

setTimeout(() => {
  source$.subscribe(data => {
    console.log(`multicast 示范 (1) 第二次订阅: ${data}`);
  });
}, 5000);

setTimeout(() => {
  // pipe 的回传一律是 Observable 型别
  // 因此使用 TypeScript 转型成 ConnectableObservable
  // 使用 JavaScript 则直接呼叫 connect() 就好
  (source$ as ConnectableObservable<any>).connect();
}, 3000);
// multicast 示范 (1) 第一次订阅: 0
// multicast 示范 (1) 第一次订阅: 1
// multicast 示范 (1) 第二次订阅: 1
// multicast 示范 (1) 第一次订阅: 2
// multicast 示范 (1) 第二次订阅: 2
// multicast 示范 (1) 第一次订阅: 3
// multicast 示范 (1) 第二次订阅: 3
// multicast 示范 (1) 第一次订阅: 4
// multicast 示范 (1) 第二次订阅: 4

运作过程如下:

  1. 第一次发生订阅,但 source$ 是 ConnectableObservable 且还没呼叫 connect(),因此持续等待
  2. 三秒後 source$ 呼叫了 connect(),因此资料流开始
  3. 第四秒发出 source$ 的第一个事件值 0,第一次订阅收到事件 0
  4. 第五秒发出 source$ 的第二个事件值 1,第一次订阅收到事件 1;同时第二次订阅发生,由於 source$ 是多播的 Observable,因此第二次订阅也收到事件 1
  5. 第六秒後,第一次订阅和第二次订阅都持续收到 source$ 多播的事件值

弹珠图:

--0--1--2--3--4--5--6...
take(5)
--0--1--2--3--4| -> 此时是 Cold Observable
source$ = multicast(() => new Subject())
--0--1--2--3--4| -> 此时是 Hot Observable

第一次订阅:          ----------0--1--2--3--4|
                    ^ 第一次订阅时间点
第二次订阅:                       1--2--3--4|
                                 ^ 第二次订阅时间点
source$.connect():          --0--1--2--3--4|
                            ^ connect 时间点

有兴趣的话也可以把产生 Subject 的方法换成其他的如 AsyncSubject 看看结果,以 AsyncSubject 来说,就会等到 source$ 结束後,同时收到最後一个事件资料。

除了传入建立 Subject 类别的方法外,也可以在第二个参数传入一个 selector callback function,这个 selector function 会收到被建立的 Subject 类别,同时回传另一个 Observable,当使用这个参数时,将不再会对来源 Observable 进行多次订阅,变成每次订阅都会重新建立新的 Subject 并加上 selector function 回传的 Observable 进行订阅;也因此新的 Observable 不再是 ConnectableObservable,也就不用再次呼叫 connect()(因为也没这方法可呼叫):

const source2$ = interval(1000).pipe(
  take(5),
  multicast(
    () => new Subject(), 
    (subject) => subject.pipe(map((data: number) => data + 1)))
);

source2$.subscribe(data => {
  console.log(`multicast 示范 (2) 第一次订阅: ${data}`);
});

setTimeout(() => {
  source2$.subscribe(data => {
    console.log(`multicast 示范 (2) 第二次订阅: ${data}`);
  });
}, 3000);
// multicast 示范 (2) 第一次订阅: 1
// multicast 示范 (2) 第一次订阅: 2
// multicast 示范 (2) 第一次订阅: 3
// multicast 示范 (2) 第一次订阅: 4
// multicast 示范 (2) 第二次订阅: 1 (第二次订阅,但从头收到所有事件资料)
// multicast 示范 (2) 第一次订阅: 5
// multicast 示范 (2) 第二次订阅: 2
// multicast 示范 (2) 第二次订阅: 3
// multicast 示范 (2) 第二次订阅: 4
// multicast 示范 (2) 第二次订阅: 5

上面程序中,每次订阅发生时,会使用 new Subject() 产出的新 Subject 类别做为多播的来源,以及搭配 selector function 回传的 Observable 订阅,并多播给每次订阅的观察者,由於是使用 Subject 类别,因此订阅来源依然是多播的 Observable,只是这个 Observable 只会有目前订阅的观察者收到而已。

程序码:https://stackblitz.com/edit/mastering-rxjs-operator-multicast

Publish

publishmulticast 内封装了 multicast 内建立 Subject 的方法,直接使用 new Subject(),因此以下两段程序码完全一样:

interval(1000).pipe(
  multicast(() => new Subject())
);

interval(1000).pipe(
  publish()
);

如果去挖 publish 的程序码,更可以发现它就是呼叫 multicast 而已,只是预设建立 Subject 的工厂方法带入程序码 new Subject()

因为前面就示范过使用 new Subject() 时的运作过程,因此就不多作介绍啦,直接看 multicast 的范例即可。

当我们想要自行决定使用哪一种 Subject 类别建立 Hot Observable 时,请使用 multicast,当直接使用 Subject 时,则可以使用 publish,封装一些细节。

除此之外,publish 对应不同的 Subject 类别还有其他的 operators:

  • publishLast:等於 multicast(() => new AsyncSubject())
  • publishBehavior:等於 multicast(() => new BehaviorSubject())
    • 使用的参数与 BehaviorSubject 相同
  • publishReplay:等於 multicast(() => new ReplaySubject())
    • 使用的参数与 ReplaySubject 相同

refCount

当 Observable 是 Connectable Observable 时,我们必须主动呼叫 connect,才可以让资料开始流动 (当然也要有订阅发生),如果不需要自行控制 connect 时机,可以使用 refCount 来帮我们呼叫 connect

const source1$ = interval(1000).pipe(
  take(5),
  publish()
);

const source2$ = interval(1000).pipe(
  take(5),
  publish(),
  refCount(),
);

source1$.subscribe((data) => {
  console.log(`refCount 示范 (source1$ 订阅值): ${data}`);
});

source2$.subscribe((data) => {
  console.log(`refCount 示范 (source2$ 订阅值): ${data}`);
});
// refCount 示范 (source2$ 订阅值): 0
// refCount 示范 (source2$ 订阅值): 1
// refCount 示范 (source2$ 订阅值): 2
// refCount 示范 (source2$ 订阅值): 3
// refCount 示范 (source2$ 订阅值): 4

从执行结果可以看到, source1$ 因为没有主动去呼叫 connect() 的关系,虽然有订阅,但还没办法开始;而 source2$ 则使用 refCount() 帮我们呼叫 connect(),因此当订阅发生时,整个资料流就会直接开始。

程序码:https://stackblitz.com/edit/mastering-rxjs-operator-refcount

share

share 基本上就是 multicast(new Subject())refCount() 的组合,当然也可以当作是 publish()refCount() 的组合,在之前介绍 Cold Observable 与 Hot Observable 时,就介绍过了使用 share() 来进行转换,如果对前面的范例都能理解,share() 应该就没什麽问题罗!在实务上,都会直接使用 share() 来取代 multicast(new Subject()) + refCount(),毕竟程序码比较短,也更好理解。

shareReplay

shareReplay 可以直接当作 multicast(new ReplaySubject())refCount() 的组合,与 share() 不同的地方在於,shareReplay() 还有重播的概念,也就是每次订阅时,会重播过去 N 次发生的资料:

const source$ = interval(1000).pipe(
  shareReplay(2)
);

source$.subscribe(data => {
  console.log(`shareReplay 示范 第一次订阅: ${data}`);
});

setTimeout(() => {
  source$.subscribe(data => {
    console.log(`shareReplay 示范 第二次订阅: ${data}`);
  });
}, 5000);
// shareReplay 示范 第一次订阅: 0
// shareReplay 示范 第一次订阅: 1
// shareReplay 示范 第一次订阅: 2
// shareReplay 示范 第一次订阅: 3
// shareReplay 示范 第一次订阅: 4
// (第二次订阅发生时,先重播过去两次的资料)
// shareReplay 示范 第二次订阅: 3
// shareReplay 示范 第二次订阅: 4
// shareReplay 示范 第一次订阅: 5
// shareReplay 示范 第二次订阅: 5
// shareReplay 示范 第一次订阅: 6
// shareReplay 示范 第二次订阅: 6

如果对於 ReplaySubject 还有印象,这部分应该不困难才对罗。

程序码:https://stackblitz.com/edit/mastering-rxjs-operator-sharereplay

本日小结

  • multicast:将单播 (unicast) 的 Observable 转换成多播 (multicast),需要决定使用哪种多播的来源(SubjectBehaviorSubject 等等),之後会得到一个 ConnectableObservable,需要呼叫它的 connect() 方法後才能开始资料流。可自行决定 connect() 时机。
  • publishmulticast 的特定版本,直接使用 Subject 类别做为多播的来源。
    • 等同於 multicast(() => new Subject())
    • 另外还有:
      • publishLastmulticast(() => new AsyncSubject())
      • publishBehaviormulticast(() => new BehaviorSubject())
      • publishReplaymulticast(() => new ReplaySubject())
  • refCount:帮我们直接呼叫来源 ConnectableObservable 的 connect() 方法。
  • share:意义为来源 Observable 的资料共享给所有观察者。
    • multicast(() => new Subject()) + refCount()
  • shareReplay:每次订阅时会重播来源 Observable 最近 N 次的资料,也就是最近 N 次事件资料共享给所有观察者。
    • multicast(() => new ReplaySubject()) + refCount()

如果能理解单播和多播的不同,对於今天的 operators 应该会相对好理解,如果觉得太抽象,可以多看几次文章,如果还是不容易理解,至少要知道 shareshareReplay,因为实务上几乎都是直接使用这两个 operators。

相关资源

还没结束

第 30 天!终於算是挑战成功啦!!不过离我想要介绍 RxJS 相关的知识还有一小段距离,所以接下来大概还会有 5 天继续介绍,包含实战范例以及更进阶的观念,敬请期待罗。


<<:  API 开发方法

>>:  (Hard) 32. Longest Valid Parentheses

Day 20 TensorFlowJS

ml5 读取模型还是用 tensorflowjs 的函式,那我不如直接用 tensorflowjs ...

[Day03] - 第一个 WebComponent 元件

昨天借用了 Wired Elements 来说明什麽是 WebComponent 跟它有什麽特点 今...

Day21 TensorFlow&OpenCV简介

我的目的 学习图像辨识,顺便拯救专题,再顺便参加铁人赛,一鱼三吃,真香。 图像辨识的原理 简单说就是...

【Day15-文字】文字资料的基本处理——Token、Stem、Stopword

前一天我们谈了一些关於如何处理字串的的基本操作 同时在结尾有稍微提出一点对於文字的看待观点 那我们今...

危险气息的研究室:尾递回 Tail Calls

研究生和大学生不同,跟着指导的教授有着独立的研究室,以滞留时间来看,可说是研究生的第二个家。 「呐,...