[Day 14] Reactive Programming -Reactor(COLD VS HOT) -PART 1

前言

之前文章提到Publisher都是Lazy loading,subscribe触发之前整个streaming是不会运作的,就像是java streaming一样,需要透过terminal动作(terminal operations)来启动整个流程,但有别於java streaming是无法重复使用的,reactive steaming就算是已经subscribe触发过,仍可被其他的Subscribersubscribe一次。但其实Publisher都是Lazy loading并不完全正确。先介绍reactor的两种时间。

图片

图片来源 : unsplash

Assembly Time

我们学习了一些Flux或是Mono的operator,像是map、flatMap、filter......等等,也知道在宣告整个Flux的过程中并不会马上触发这些operators的行为,因为我们知道Reactor是lazy的,而这个在宣告并且串联operators的时刻就被称作是「Assembly Time」。
像是我们先产生一个0~9数字的Flux,并根据奇偶数又拆分了两个不同的Flux,他们其实彼此间不会相互影响,不像是java 8 streams。一旦Flux被宣告出来,他就是immutable,就算被拿去做了其他的操作也一样,像是积木组装一样,可以互相组装,但最原始的积木是不会变的。


Flux<Integer> integerFlux = Flux.range(0, 10);
Flux<Integer> evenInt = integerFlux.filter(integer -> integer % 2 == 0);
Flux<Integer> oddInt = integerFlux.filter(integer -> integer % 2 != 0);

Subscription Time

在Assembly Time时,我们宣告并组合了Flux,但并不会马上的开始执行,而是直到有人订阅(subscription),而官方推荐最简单的订阅方式就是「Flux.subscribe(valueConsumer, errorConsumer)」,当subscribe开始触发,开始的讯号从subscribe向上,经过map、filter,最後到达range,也就是我们的来源(source operator)就会开始产生资料(initial data),从这个时刻开始,就是Subscription Time。

Flux.range(0, 10) 
    .filter(integer -> integer % 2 == 0) 
    .map(integer -> "偶数: " + integer) 
    .subscribe(System.out::println, Throwable::printStackTrace);

讲了这麽多次当subscribe触发後才开始动作,其实触发的方式有三种。

  1. subscribe:之前介绍都是属於这种
  2. block:Flux.blockFirst,阻断也会触发。
  3. hot publisher:接下来就来说明第三种。

Cold Publisher

如同上面介绍的,触发subscribe之前不做任何事情,只有在触发之後才会产生资料(generate data),而且每个订阅者(subscriber)都能拿到全部且一样的资料。

Hot Publisher

不需要subscriber就会产生资料,常常是持续变动的资料,像是滑鼠轨迹、股票价格之类的,在连线(connection)就开始产生资料,通常是一开始。每个订阅者(subscriber)只能拿到当他们开始subscribe之後的最新资料。

DEMO

来做一个每隔一秒推送资料的Flux,分别有两个subscriber来订阅,为了做出区隔中间停止两秒再开始第二个,因为interval是cold operator,所以可以预期到第二个subscribe会是从0s开始,符合我们对Cold Publisher的期待,每一个subscriber都能拿到一样的资料。

Flux<Long> clockTicks = Flux.interval(Duration.ofSeconds(1));

clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s"));
Thread.sleep(2000);

clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s"));
Thread.sleep(5000);
clock1 0s 
clock1 1s 
clock1 2s 
	clock2 0s 
clock1 3s 
	clock2 1s 
clock1 4s 
	clock2 2s 
	clock2 3s 
clock1 5s 
	clock2 4s 
clock1 6s

而当我们透过share来改变我们的程序,share可以将Cold Publisher转换为Hot Publisher,就可以明显看出第二个订阅者因为间隔了两秒,只会拿到从两秒开始推送的资料,而不会是从0s。

Flux<Long> source = Flux.interval(Duration.ofSeconds(1));
Flux<Long> clockTicks = source.share();
clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s"));
Thread.sleep(2000);

clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s"));
Thread.sleep(5000);
clock1 0s 
clock1 1s 
clock1 2s 
	clock2 2s 
clock1 3s 
	clock2 3s 
clock1 4s 
	clock2 4s 
clock1 5s 
	clock2 5s 
clock1 6s 
	clock2 6s

范例来源

总结

用youtube来比喻Hot /Cold Publisher,直播、首播就像是Hot Publisher,每个进来频道的人都是从最新的地方开始,进入的时间点不同得到的资料就会不同,就算没有人看影片还是持续在跑动。
一般的影片就是像是Cold Publisher,在没有点开之前是不会做事,每个人点开都一样是从最一开始的地方。

下一篇就来介绍有哪些像是share可以转换的operators。
资料来源


<<:  Day 23:专案05 - KKBOX风云榜02 | AJAX

>>:  Data layer testing (4)

D28 第十五周 (回忆篇)

持续跟第十一周作业奋斗 周一先把相对简单的 week11 hw3 简答题完成,中间因为穿插其他大活动...

DAY 24 『 客制化文字输入框 Custom TextField 』

昨天介绍完客制化按钮,今天会分享客制化文字输入框( 加入图示、图示显示在左边或右边 ) 成品: 刻好...

MySQL学习_Day4

学习内容 建立简单公司资料库、聚合函数、万用字元 简单公司资料库 : 由於Icebear先前学习都是...

[Day 30] 人脸表情辨识App成果发表与完赛感想

-1. Google store link 这七天做的app上架罗! 欢迎下载 https://pl...

[Day10] 设定 Actions On Google 专案

接续昨日的对话流设计, 现在我们要进入实作的部分让各位能更了解设计流程。 首先从设定 Action...