[Day 12] Reactive Programming - Reactor(publishOn/subscribeOn)

前言

在上一篇介绍了Reactor提供Scheduler来帮助开发者,这篇就是来说明具体是如何使用。

publishOn

执行的方式与一般的operator一样,会影响从publishOn以下的operator chain,改变其threading context,也就是改变执行绪,直到如果有下一个publishOn出现。
官方提供的范例,新增一个parallel-scheduler,在主流程里面宣告Flux,最後新开一个Threadsubscribe(),这样在publishOn之前的操作都会是new Thread里面去执行,之後的则会是一开始宣告的Scheduler里面。

Scheduler s = Schedulers.newParallel("parallel-scheduler", 2);

final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i)
    .publishOn(s)
    .map(i -> "value " + i);

Thread thread = new Thread(() -> flux.subscribe(System.out::println));
thread.start();
Thread.sleep(100);

另一篇Spring blog 提供的范例我觉得是可以更直觉的了解使用情境,假设需要去呼叫外部的一个阻断式(blocking)的服务,如果没有使用publishOn特别指定Scheduler,最後执行都会是在subscribe所处在的执行绪(main)里面,就会等ABC处理完才会处理DE。

Flux.fromIterable(firstListOfUrls) //contains A, B and C 
    .map(url -> blockingWebClient.get(url)) 
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body)); 
Flux.fromIterable(secondListOfUrls) //contains D and E 
    .map(url -> blockingWebClient.get(url)) 
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
/*
main from first list, got A 
main from first list, got B 
main from first list, got C 
main from second list, got D 
main from second list, got E
*/

这时候如果加上publishOn,还记得上一篇介绍过最适合用於阻断式服务的就是boundedElastic,就可以看到结果是穿插的,效能也就相对的更好。

Flux.fromIterable(firstListOfUrls) //contains A, B and C 
    .publishOn(Schedulers.boundedElastic()) 
    .map(url -> blockingWebClient.get(url)) 
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body)); 
Flux.fromIterable(secondListOfUrls) //contains D and E 
    .publishOn(Schedulers.boundedElastic()) 
    .map(url -> blockingWebClient.get(url)) 
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
/*
boundedElastic-1 from first list, got A 
boundedElastic-2 from second list, got D 
boundedElastic-1 from first list, got B 
boundedElastic-2 from second list, got E 
boundedElastic-1 from first list, got C
*/

subscribeOn

publishOn几乎一模一样,会去改变operator chain的threading context,差别在於publishOn只会改变之後的,不会朔及既往,而subscribeOn则是从头到尾都改变,不论在哪一个位置,直到遇到publishOn为止,也就是如果有publishOn,那之後的operator 仍然会依照publishOn所指定的Scheduler。将上一个范例直接拿来改成使用subscribeOn,那两个map都会是在一开始宣告的Scheduler内。

Scheduler s = Schedulers.newParallel("parallel-scheduler", 2); 
final Flux<String> flux = Flux 
    .range(1, 2) 
    .map(i -> 10 + i) 
    .subscribeOn(s) 
    .map(i -> "value " + i); 
Thread thread = new Thread(() -> flux.subscribe(System.out::println)); 
thread.start(); 
Thread.sleep(100);

结语

至於为什麽有了publishOn还需要subscribeOn,理论上你只需要将publishOn放在最前面就能够取代subscribeOn,而且我认为即便是放後面结果一样,为了可读性subscribeOn应该也要放在最前面。其实原因就是有些情境当你没办法放在最前面的时候,假设有一个api或是function是别人写好的你无法去更改,如果只有publishOn是无法更改到上一层(upstream),这时候就需要靠subscribeOn来处理。

最後还有一件事情从main thread跳到其他的执行绪可以透过以上的方法,但是从其他执行绪想要再跳回main是不可能的,虽然我也无法理解为何会有这样的需求。

资料来源


<<:  追求JS小姊姊系列 Day11 -- 流程错了怎办?难道要跟D特终老?

>>:  Day19-"字串练习-2"

Mobile Number Tracker Online

It is impossible to track a phone's exact location...

Day16 Vue directives(v-model资料双向绑定)

在Vue里有个很大的特色可能就是资料的双向绑定(Two-wayBinding),而资料绑定的话我们最...

D19 使用分页(Paginator) - 首页跟个人文件页

使用Django内建的paginator分页类别 将原本的Quryset物件(doc_warehou...

Day 07 line bot sdk python范例程序在做什麽

知道了line bot sdk python上的程序的功能是回复你和你传送相同的讯息。这边会看成是在...

网页常用单位-30天学会HTML+CSS,制作精美网站

设置CSS样式大小时,会使用到各种不同的单位,尤其现在都制作响应式网站,用错单位,就会针对不同尺寸调...