Reactor 是concurrency-agnostic
,花了一点时间研究这个英文单字的意思,concurrency是我们熟悉的并发,agnostic是未知论者(认为神存在但不为人知也无法确认的),推测的意思是对於Reactor来说并不在意concurrency是存在或是不存在,取决於使用者的行为,希望有高手可以解释得更精准,Reactor文件上给出的意思是他不会强迫使用并发(concurrency model), 让开发者自己去决定,如果你需要使用concurrency,Reactor提供了Scheduler
来帮助开发者。
在没有特别设定的情况下,Flux & Mono并不是特别一个专门的绪(Thread
)去处理,而是根据最後subscribe()
的绪来决定的。在官方的范例中,在main里面宣告Mono,另开一条Thread
来subscribe
,从印出的结果就可以看出实际上执行绪是根据subscribe
的。
public static void main(String[] args) throws InterruptedException {
System.out.println(Thread.currentThread().getName());
final Mono<String> mono = Mono.just("hello ");
Thread t =
new Thread(
() ->
mono.map(msg -> msg + "thread ")
.subscribe(v ->
System.out.println(
v + Thread.currentThread().getName())
));
t.start();
t.join();
// main
// hello thread Thread-0
}
工具类别Schedulers
提供了几个静态方法:
Schedulers.immediate()
:基本上不会用到这个方法,他不会做任何的操作,可以当成是null,有可能的使用场景是某个api需要传入Schedulers
,但你并不想要更换Thread,这时候就可以传入Schedulers.immediate()
。Schedulers.single()
:只有一条且重复使用的Thread
。Schedulers.elastic()
:会弹性的增加Thread
(无上限),适用於需要较长时间处理的任务(task),像是呼叫阻断(blocking)的服务或是I/O,但可能会导致太多的Thread
或是一些backpressure的问题,再推出Schedulers.boundedElastic()
後就不建议使用(Deprecated
)。Schedulers.boundedElastic()
:就像是elastic()
,只是加上了一些限制来避免产生过多Thread
的问题,有一个worker pool,预设闲置60秒就会release Thread
。Schedulers.parallel()
:适用於快速且non-blocking的任务,根据CPU来产生Thread
的数量。fromExecutorService(ExecutorService)
:如果以前有预先就存在的ExecutorService
。fromExecutorService
,都是有一个共用(global),如果希望可以单独使用可以利用Schedulers.newXXXX()的方式来新增。有一些operator使用指定的Scheduler
来执行,通常也会让你透过传入参数的方式来改变,像是Flux.interval(Duration.ofMillis(300))
,每三百毫秒推送,从程序码可以看出预设使用Schedulers.parallel()
,如果想要自己指定也可以直接传入指定的Scheduler
。
public static Flux<Long> interval(Duration period) {
return interval(period, Schedulers.parallel());
}
除了以上的Scheduler
之外,其实还隐藏一个VirtualTimeScheduler
,属於reactor.test.scheduler
,有自己的一个虚拟时钟来控制时间,随需的增加,控制时间可能不够精准,因为只能前进,不能让时光倒流,这在需要时间流逝才可以测试的场景就会十分好用方便,而不需要真的去等待。
下面这个例子是有一个延迟10秒、每5秒会发送的Flux,正常的情况下是看不到任何结果的,因为main-thread一下子就结束了,根本还来不及。如果想要看到资料就要在main thread加上sleep
来延迟时间,这边第一个不是刚好十秒是因为会有一点时间差,这样等待了约二十秒後就可以看到预期的结果。
@Test
void testDefaultScheduler() throws InterruptedException {
List<Long> list = new ArrayList<>();
Flux
.interval(Duration.ofSeconds(10), Duration.ofSeconds(5))
.take(3)
.subscribe(list::add);
Thread.sleep(10500);
System.out.println(list);
Thread.sleep(5000);
System.out.println(list);
Thread.sleep(5000);
System.out.println(list);
/*
[0]
[0, 1]
[0, 1, 2]
*/
}
这时候如果有VirtualTimeScheduler
的帮助,手动加速时间,就可以在正常main跑完的同时,就可以看到结果,节省了不少时间。
@Test
void testVirtualTimeScheduler() throws InterruptedException {
List<Long> list = new ArrayList<>();
VirtualTimeScheduler scheduler = VirtualTimeScheduler.getOrSet();
Flux
.interval(Duration.ofSeconds(10), Duration.ofSeconds(5), scheduler)
.take(3)
.subscribe(list::add);
scheduler.advanceTimeBy(Duration.ofSeconds(10));
System.out.println(list);
scheduler.advanceTimeBy(Duration.ofSeconds(5));
System.out.println(list);
scheduler.advanceTimeBy(Duration.ofSeconds(5));
System.out.println(list);
/*
[0]
[0, 1]
[0, 1, 2]
*/
}
Reactor提供各式的方便的工具让开发者可以根据使用情境来选择,下一篇就来介绍到底Reactor是要如何去使用这些Scheduler。随着文章往後开始比较复杂,如果有写得不太好的地方希望可以留言讨论,感谢!
我想用录制我的Mac电脑的荧幕和声音,我该怎麽办? 其实,如果你使用合适的应用软件,你可以简单地将M...
There are various competent anti-virus application...
前言:昨天简单实作了链结串列,今天要来介绍进阶一点的应用,第一个是利用之前写的get()和set()...
这礼拜还是再追第九周的进度,因为额外研究了其他东西的关系。 apache alias js 剪贴簿 ...
上午: AIoT资料分析应用系统框架设计与实作 今日老师教学运用Django框架,将Bootstra...