[Day 11] Reactive Programming - Reactor(Scheduler)

前言

Reactor 是concurrency-agnostic ,花了一点时间研究这个英文单字的意思,concurrency是我们熟悉的并发,agnostic是未知论者(认为神存在但不为人知也无法确认的),推测的意思是对於Reactor来说并不在意concurrency是存在或是不存在,取决於使用者的行为,希望有高手可以解释得更精准,Reactor文件上给出的意思是他不会强迫使用并发(concurrency model), 让开发者自己去决定,如果你需要使用concurrency,Reactor提供了Scheduler来帮助开发者。

Scheduler

在没有特别设定的情况下,Flux & Mono并不是特别一个专门的绪(Thread)去处理,而是根据最後subscribe()的绪来决定的。在官方的范例中,在main里面宣告Mono,另开一条Threadsubscribe,从印出的结果就可以看出实际上执行绪是根据subscribe的。

public static void main(String[] args) throws InterruptedException { 
    System.out.println(Thread.currentThread().getName()); 
    final Mono<String> mono = Mono.just("hello "); 
    Thread t = 
        new Thread( 
            () -> 
                mono.map(msg -> msg + "thread ") 
                    .subscribe(v -> 
                    System.out.println(
                        v + Thread.currentThread().getName())
                        )); 
    t.start(); 
    t.join(); 
    // main 
    // hello thread Thread-0 
  }

工具类别Schedulers提供了几个静态方法:

  • Schedulers.immediate():基本上不会用到这个方法,他不会做任何的操作,可以当成是null,有可能的使用场景是某个api需要传入Schedulers,但你并不想要更换Thread,这时候就可以传入Schedulers.immediate()
  • Schedulers.single():只有一条且重复使用的Thread
  • Schedulers.elastic():会弹性的增加Thread(无上限),适用於需要较长时间处理的任务(task),像是呼叫阻断(blocking)的服务或是I/O,但可能会导致太多的Thread或是一些backpressure的问题,再推出Schedulers.boundedElastic()後就不建议使用(Deprecated)。
  • Schedulers.boundedElastic():就像是elastic(),只是加上了一些限制来避免产生过多Thread的问题,有一个worker pool,预设闲置60秒就会release Thread
  • Schedulers.parallel():适用於快速且non-blocking的任务,根据CPU来产生Thread的数量。
  • fromExecutorService(ExecutorService):如果以前有预先就存在的ExecutorService
    上面这几种方法除了fromExecutorService,都是有一个共用(global),如果希望可以单独使用可以利用Schedulers.newXXXX()的方式来新增。

有一些operator使用指定的Scheduler来执行,通常也会让你透过传入参数的方式来改变,像是Flux.interval(Duration.ofMillis(300)),每三百毫秒推送,从程序码可以看出预设使用Schedulers.parallel(),如果想要自己指定也可以直接传入指定的Scheduler

public static Flux<Long> interval(Duration period) {
 return interval(period, Schedulers.parallel());
}

VirtualTimeScheduler

除了以上的Scheduler之外,其实还隐藏一个VirtualTimeScheduler,属於reactor.test.scheduler,有自己的一个虚拟时钟来控制时间,随需的增加,控制时间可能不够精准,因为只能前进,不能让时光倒流,这在需要时间流逝才可以测试的场景就会十分好用方便,而不需要真的去等待。

下面这个例子是有一个延迟10秒、每5秒会发送的Flux,正常的情况下是看不到任何结果的,因为main-thread一下子就结束了,根本还来不及。如果想要看到资料就要在main thread加上sleep来延迟时间,这边第一个不是刚好十秒是因为会有一点时间差,这样等待了约二十秒後就可以看到预期的结果。

  @Test
  void testDefaultScheduler() throws InterruptedException {
    List<Long> list = new ArrayList<>();
    Flux
        .interval(Duration.ofSeconds(10), Duration.ofSeconds(5))
        .take(3)
        .subscribe(list::add);

    Thread.sleep(10500);
    System.out.println(list);
    Thread.sleep(5000);
    System.out.println(list);
    Thread.sleep(5000);
    System.out.println(list);
    /*
    [0] 
    [0, 1] 
    [0, 1, 2]
    */
  }

这时候如果有VirtualTimeScheduler的帮助,手动加速时间,就可以在正常main跑完的同时,就可以看到结果,节省了不少时间。

  @Test
  void testVirtualTimeScheduler() throws InterruptedException {
    List<Long> list = new ArrayList<>();

    VirtualTimeScheduler scheduler = VirtualTimeScheduler.getOrSet();
    Flux
        .interval(Duration.ofSeconds(10), Duration.ofSeconds(5), scheduler)
        .take(3)
        .subscribe(list::add);

    scheduler.advanceTimeBy(Duration.ofSeconds(10));
    System.out.println(list);
    scheduler.advanceTimeBy(Duration.ofSeconds(5));
    System.out.println(list);
    scheduler.advanceTimeBy(Duration.ofSeconds(5));
    System.out.println(list);
    /* 
    [0] 
    [0, 1] 
    [0, 1, 2] 
    */
  }

结语

Reactor提供各式的方便的工具让开发者可以根据使用情境来选择,下一篇就来介绍到底Reactor是要如何去使用这些Scheduler。随着文章往後开始比较复杂,如果有写得不太好的地方希望可以留言讨论,感谢!

资料来源


<<:  Day17-Goroutine

>>:  DAY10:应用程序元件Activity之简介

可以录制Mac 内部/外部声音的 5 种荧幕录制方法

我想用录制我的Mac电脑的荧幕和声音,我该怎麽办? 其实,如果你使用合适的应用软件,你可以简单地将M...

Fix McAfee Activate Product Key Issue by 1877-956-4555

There are various competent anti-virus application...

[Day08]程序菜鸟自学C++资料结构演算法 – 链结串列实作应用之二

前言:昨天简单实作了链结串列,今天要来介绍进阶一点的应用,第一个是利用之前写的get()和set()...

D21 第十一周 (回忆篇)

这礼拜还是再追第九周的进度,因为额外研究了其他东西的关系。 apache alias js 剪贴簿 ...

Day14 参加职训(机器学习与资料分析工程师培训班),Django实作 & 深度学习

上午: AIoT资料分析应用系统框架设计与实作 今日老师教学运用Django框架,将Bootstra...