还记得之前我们介绍过 Cold Observable v.s. Hot Observable 吗?
Cold Observable 和观察者 (Observer) 是一对一的关系,也就是每次产生订阅时,都会是一个全新的资料流。而 Hot Observable 和观察者则是一对多的关系,也就是每次产生订阅时,都会使用「同一份资料流」,而今天要介绍的 operators 目的就是将 Cold Observable 转成 Hot Observable,让原来的资料流可以共用。
今天的观念会比较复杂一点,打起精神继续看下去吧!
Cold Observable 每次订阅只会对应一个观察者,因此也可以说成将资料播放 (cast) 给唯一的观察者,应此也称为单播 (unicast),而 multicast
就是来源 Observable 变成多播 (multicast) 的情况。
在 multicast
内必须指定一个产生 Hot Observable 的工厂方法,也就是建立 Subject
、BehaviorSubject
等逻辑。
以下程序将一个单播的 Observable 转换成一个多播的 Observable,并建立一个 Subject
作为多播的资料来源。
const source$ = interval(1000).pipe(
take(5),
multicast(() => new Subject())
);
// srouce$ 变成一个 multicast 的 Observable
// 使用 Subject 作为多播的来源
当使用 multicast
时,新的 Observable 型别会是一个 ConnectableObservable,和一般的 Observable 的差别就在於 ConnectableObservable 是多播的,而且必须呼叫它的 connect
方法,才会开始进行多播的动作:
source$.subscribe(data => {
console.log(`multicast 示范 (1) 第一次订阅: ${data}`);
});
setTimeout(() => {
source$.subscribe(data => {
console.log(`multicast 示范 (1) 第二次订阅: ${data}`);
});
}, 5000);
setTimeout(() => {
// pipe 的回传一律是 Observable 型别
// 因此使用 TypeScript 转型成 ConnectableObservable
// 使用 JavaScript 则直接呼叫 connect() 就好
(source$ as ConnectableObservable<any>).connect();
}, 3000);
// multicast 示范 (1) 第一次订阅: 0
// multicast 示范 (1) 第一次订阅: 1
// multicast 示范 (1) 第二次订阅: 1
// multicast 示范 (1) 第一次订阅: 2
// multicast 示范 (1) 第二次订阅: 2
// multicast 示范 (1) 第一次订阅: 3
// multicast 示范 (1) 第二次订阅: 3
// multicast 示范 (1) 第一次订阅: 4
// multicast 示范 (1) 第二次订阅: 4
运作过程如下:
source$
是 ConnectableObservable 且还没呼叫 connect()
,因此持续等待source$
呼叫了 connect()
,因此资料流开始source$
的第一个事件值 0
,第一次订阅收到事件 0
source$
的第二个事件值 1
,第一次订阅收到事件 1
;同时第二次订阅发生,由於 source$
是多播的 Observable,因此第二次订阅也收到事件 1
。source$
多播的事件值弹珠图:
--0--1--2--3--4--5--6...
take(5)
--0--1--2--3--4| -> 此时是 Cold Observable
source$ = multicast(() => new Subject())
--0--1--2--3--4| -> 此时是 Hot Observable
第一次订阅: ----------0--1--2--3--4|
^ 第一次订阅时间点
第二次订阅: 1--2--3--4|
^ 第二次订阅时间点
source$.connect(): --0--1--2--3--4|
^ connect 时间点
有兴趣的话也可以把产生 Subject 的方法换成其他的如 AsyncSubject
看看结果,以 AsyncSubject
来说,就会等到 source$
结束後,同时收到最後一个事件资料。
除了传入建立 Subject 类别的方法外,也可以在第二个参数传入一个 selector
callback function,这个 selector
function 会收到被建立的 Subject 类别,同时回传另一个 Observable,当使用这个参数时,将不再会对来源 Observable 进行多次订阅,变成每次订阅都会重新建立新的 Subject 并加上 selector
function 回传的 Observable 进行订阅;也因此新的 Observable 不再是 ConnectableObservable,也就不用再次呼叫 connect()
(因为也没这方法可呼叫):
const source2$ = interval(1000).pipe(
take(5),
multicast(
() => new Subject(),
(subject) => subject.pipe(map((data: number) => data + 1)))
);
source2$.subscribe(data => {
console.log(`multicast 示范 (2) 第一次订阅: ${data}`);
});
setTimeout(() => {
source2$.subscribe(data => {
console.log(`multicast 示范 (2) 第二次订阅: ${data}`);
});
}, 3000);
// multicast 示范 (2) 第一次订阅: 1
// multicast 示范 (2) 第一次订阅: 2
// multicast 示范 (2) 第一次订阅: 3
// multicast 示范 (2) 第一次订阅: 4
// multicast 示范 (2) 第二次订阅: 1 (第二次订阅,但从头收到所有事件资料)
// multicast 示范 (2) 第一次订阅: 5
// multicast 示范 (2) 第二次订阅: 2
// multicast 示范 (2) 第二次订阅: 3
// multicast 示范 (2) 第二次订阅: 4
// multicast 示范 (2) 第二次订阅: 5
上面程序中,每次订阅发生时,会使用 new Subject()
产出的新 Subject 类别做为多播的来源,以及搭配 selector
function 回传的 Observable 订阅,并多播给每次订阅的观察者,由於是使用 Subject 类别,因此订阅来源依然是多播的 Observable,只是这个 Observable 只会有目前订阅的观察者收到而已。
程序码:https://stackblitz.com/edit/mastering-rxjs-operator-multicast
publish
将 multicast
内封装了 multicast
内建立 Subject 的方法,直接使用 new Subject()
,因此以下两段程序码完全一样:
interval(1000).pipe(
multicast(() => new Subject())
);
interval(1000).pipe(
publish()
);
如果去挖 publish 的程序码,更可以发现它就是呼叫 multicast
而已,只是预设建立 Subject 的工厂方法带入程序码 new Subject()
。
因为前面就示范过使用 new Subject()
时的运作过程,因此就不多作介绍啦,直接看 multicast
的范例即可。
当我们想要自行决定使用哪一种 Subject 类别建立 Hot Observable 时,请使用 multicast
,当直接使用 Subject
时,则可以使用 publish
,封装一些细节。
除此之外,publish
对应不同的 Subject
类别还有其他的 operators:
publishLast
:等於 multicast(() => new AsyncSubject())
publishBehavior
:等於 multicast(() => new BehaviorSubject())
BehaviorSubject
相同publishReplay
:等於 multicast(() => new ReplaySubject())
ReplaySubject
相同当 Observable 是 Connectable Observable 时,我们必须主动呼叫 connect
,才可以让资料开始流动 (当然也要有订阅发生),如果不需要自行控制 connect
时机,可以使用 refCount
来帮我们呼叫 connect
。
const source1$ = interval(1000).pipe(
take(5),
publish()
);
const source2$ = interval(1000).pipe(
take(5),
publish(),
refCount(),
);
source1$.subscribe((data) => {
console.log(`refCount 示范 (source1$ 订阅值): ${data}`);
});
source2$.subscribe((data) => {
console.log(`refCount 示范 (source2$ 订阅值): ${data}`);
});
// refCount 示范 (source2$ 订阅值): 0
// refCount 示范 (source2$ 订阅值): 1
// refCount 示范 (source2$ 订阅值): 2
// refCount 示范 (source2$ 订阅值): 3
// refCount 示范 (source2$ 订阅值): 4
从执行结果可以看到, source1$
因为没有主动去呼叫 connect()
的关系,虽然有订阅,但还没办法开始;而 source2$
则使用 refCount()
帮我们呼叫 connect()
,因此当订阅发生时,整个资料流就会直接开始。
程序码:https://stackblitz.com/edit/mastering-rxjs-operator-refcount
share
基本上就是 multicast(new Subject())
与 refCount()
的组合,当然也可以当作是 publish()
与 refCount()
的组合,在之前介绍 Cold Observable 与 Hot Observable 时,就介绍过了使用 share()
来进行转换,如果对前面的范例都能理解,share()
应该就没什麽问题罗!在实务上,都会直接使用 share()
来取代 multicast(new Subject())
+ refCount()
,毕竟程序码比较短,也更好理解。
shareReplay
可以直接当作 multicast(new ReplaySubject())
与 refCount()
的组合,与 share()
不同的地方在於,shareReplay()
还有重播的概念,也就是每次订阅时,会重播过去 N 次发生的资料:
const source$ = interval(1000).pipe(
shareReplay(2)
);
source$.subscribe(data => {
console.log(`shareReplay 示范 第一次订阅: ${data}`);
});
setTimeout(() => {
source$.subscribe(data => {
console.log(`shareReplay 示范 第二次订阅: ${data}`);
});
}, 5000);
// shareReplay 示范 第一次订阅: 0
// shareReplay 示范 第一次订阅: 1
// shareReplay 示范 第一次订阅: 2
// shareReplay 示范 第一次订阅: 3
// shareReplay 示范 第一次订阅: 4
// (第二次订阅发生时,先重播过去两次的资料)
// shareReplay 示范 第二次订阅: 3
// shareReplay 示范 第二次订阅: 4
// shareReplay 示范 第一次订阅: 5
// shareReplay 示范 第二次订阅: 5
// shareReplay 示范 第一次订阅: 6
// shareReplay 示范 第二次订阅: 6
如果对於 ReplaySubject
还有印象,这部分应该不困难才对罗。
程序码:https://stackblitz.com/edit/mastering-rxjs-operator-sharereplay
multicast
:将单播 (unicast) 的 Observable 转换成多播 (multicast),需要决定使用哪种多播的来源(Subject
、BehaviorSubject
等等),之後会得到一个 ConnectableObservable,需要呼叫它的 connect()
方法後才能开始资料流。可自行决定 connect()
时机。publish
:multicast
的特定版本,直接使用 Subject
类别做为多播的来源。
multicast(() => new Subject())
publishLast
: multicast(() => new AsyncSubject())
publishBehavior
: multicast(() => new BehaviorSubject())
publishReplay
: multicast(() => new ReplaySubject())
refCount
:帮我们直接呼叫来源 ConnectableObservable 的 connect()
方法。share
:意义为来源 Observable 的资料共享给所有观察者。
multicast(() => new Subject())
+ refCount()
shareReplay
:每次订阅时会重播来源 Observable 最近 N 次的资料,也就是最近 N 次事件资料共享给所有观察者。
multicast(() => new ReplaySubject())
+ refCount()
如果能理解单播和多播的不同,对於今天的 operators 应该会相对好理解,如果觉得太抽象,可以多看几次文章,如果还是不容易理解,至少要知道 share
和 shareReplay
,因为实务上几乎都是直接使用这两个 operators。
第 30 天!终於算是挑战成功啦!!不过离我想要介绍 RxJS 相关的知识还有一小段距离,所以接下来大概还会有 5 天继续介绍,包含实战范例以及更进阶的观念,敬请期待罗。
>>: (Hard) 32. Longest Valid Parentheses
ml5 读取模型还是用 tensorflowjs 的函式,那我不如直接用 tensorflowjs ...
昨天借用了 Wired Elements 来说明什麽是 WebComponent 跟它有什麽特点 今...
我的目的 学习图像辨识,顺便拯救专题,再顺便参加铁人赛,一鱼三吃,真香。 图像辨识的原理 简单说就是...
前一天我们谈了一些关於如何处理字串的的基本操作 同时在结尾有稍微提出一点对於文字的看待观点 那我们今...
研究生和大学生不同,跟着指导的教授有着独立的研究室,以滞留时间来看,可说是研究生的第二个家。 「呐,...