在上一篇介绍了Reactor提供Scheduler来帮助开发者,这篇就是来说明具体是如何使用。
执行的方式与一般的operator一样,会影响从publishOn
以下的operator chain,改变其threading context,也就是改变执行绪,直到如果有下一个publishOn
出现。
官方提供的范例,新增一个parallel-scheduler
,在主流程里面宣告Flux
,最後新开一个Thread
来subscribe()
,这样在publishOn
之前的操作都会是new Thread
里面去执行,之後的则会是一开始宣告的Scheduler
里面。
Scheduler s = Schedulers.newParallel("parallel-scheduler", 2);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.publishOn(s)
.map(i -> "value " + i);
Thread thread = new Thread(() -> flux.subscribe(System.out::println));
thread.start();
Thread.sleep(100);
另一篇Spring blog 提供的范例我觉得是可以更直觉的了解使用情境,假设需要去呼叫外部的一个阻断式(blocking)的服务,如果没有使用publishOn
特别指定Scheduler
,最後执行都会是在subscribe所处在的执行绪(main)里面,就会等ABC处理完才会处理DE。
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
/*
main from first list, got A
main from first list, got B
main from first list, got C
main from second list, got D
main from second list, got E
*/
这时候如果加上publishOn
,还记得上一篇介绍过最适合用於阻断式服务的就是boundedElastic
,就可以看到结果是穿插的,效能也就相对的更好。
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
/*
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C
*/
跟publishOn
几乎一模一样,会去改变operator chain的threading context,差别在於publishOn
只会改变之後的,不会朔及既往,而subscribeOn
则是从头到尾都改变,不论在哪一个位置,直到遇到publishOn
为止,也就是如果有publishOn
,那之後的operator 仍然会依照publishOn
所指定的Scheduler
。将上一个范例直接拿来改成使用subscribeOn
,那两个map都会是在一开始宣告的Scheduler
内。
Scheduler s = Schedulers.newParallel("parallel-scheduler", 2);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.subscribeOn(s)
.map(i -> "value " + i);
Thread thread = new Thread(() -> flux.subscribe(System.out::println));
thread.start();
Thread.sleep(100);
至於为什麽有了publishOn
还需要subscribeOn
,理论上你只需要将publishOn
放在最前面就能够取代subscribeOn
,而且我认为即便是放後面结果一样,为了可读性subscribeOn
应该也要放在最前面。其实原因就是有些情境当你没办法放在最前面的时候,假设有一个api或是function是别人写好的你无法去更改,如果只有publishOn
是无法更改到上一层(upstream),这时候就需要靠subscribeOn
来处理。
最後还有一件事情从main thread
跳到其他的执行绪可以透过以上的方法,但是从其他执行绪想要再跳回main
是不可能的,虽然我也无法理解为何会有这样的需求。
<<: 追求JS小姊姊系列 Day11 -- 流程错了怎办?难道要跟D特终老?
It is impossible to track a phone's exact location...
在Vue里有个很大的特色可能就是资料的双向绑定(Two-wayBinding),而资料绑定的话我们最...
使用Django内建的paginator分页类别 将原本的Quryset物件(doc_warehou...
知道了line bot sdk python上的程序的功能是回复你和你传送相同的讯息。这边会看成是在...
设置CSS样式大小时,会使用到各种不同的单位,尤其现在都制作响应式网站,用错单位,就会针对不同尺寸调...