[Day 8] Reactive Programming - Reactor(FLUX & MONO) Part 2

前言

上一篇介绍了何谓FLUX & MONO,本篇就来说明具体的使用方式。

Just

最简单建立Flux or Mono的方法Just

Flux<String> seq1 = Flux.just("Robert", "Jean", "Jerry");
//从Iterable建立
Flux<String> seq2 = Flux.fromIterable(Arrays.asList("Robert", "Jean", "Jerry"));
//一个范围
Flux<Integer> numbersFromFiveToSix = Flux.range(5, 2); 
Mono<String> mono = Mono.just("Robert");
//即便是空的也要给泛型
Mono<String> mono1 = Mono.empty(); 
Mono<String> mono2 = Mono.justOrEmpty("Robert"); 

Reactor doc有很多精美的图片

https://ithelp.ithome.com.tw/upload/images/20210922/20141418gmKjF5Ejwx.png
https://ithelp.ithome.com.tw/upload/images/20210922/20141418FHsPRXMwsD.png
https://ithelp.ithome.com.tw/upload/images/20210922/20141418bUiCiwpbqC.png

subscribe

Reactive Steam是lazy的,而Flux & Mono是Publisher,所以这时候就需要subscribe来让Publisher动起来。常见的subscribe有三个参数,也可以只传一个或两个,第一个参数就是需要对资料做甚麽处理,第二则是当发生error需要如何处理、最後则是完成的时候,跟之前Java9里面onNextonErroronComplete的概念一样,要注意errorcomplete都是终结(terminal)讯号,就跟java 8 stream.collect  stream.foreach 只会有一个而不会同时出现。

subscribe();  
subscribe(Consumer<? super T> consumer);  
subscribe(Consumer<? super T> consumer, 
          Consumer<? super Throwable> errorConsumer);  
subscribe(Consumer<? super T> consumer, 
          Consumer<? super Throwable> errorConsumer, 
          Runnable completeConsumer);
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> System.out.println("Done"));
// 1
// 2
// 3
// 4
// Done

https://ithelp.ithome.com.tw/upload/images/20210922/20141418b0FXL1Qu5w.png

操作flux内的资料做法类似於java 8 stream,简单介绍几个常用的方法

map

转换每一个flux内的资料
https://ithelp.ithome.com.tw/upload/images/20210922/20141418aKpVazjmB0.png

flatMap

从文件上来看,简化後map的参数是Function<T,  V>,flatMap参数Function<T,  Publisher>,可以很容易的看出来flatMap会传入另一个Publisher,从文件上的图来看就是将两条Publisher摊开合并成同一条。
https://ithelp.ithome.com.tw/upload/images/20210922/20141418LXR4RTcZuc.png
从下面这个范例更容易去理解,两个字串经过flatpMap後变成十个字元。

Flux<String> strFlux = Flux.just("robert", "wang");
strFlux.flatMap(i -> Flux.just(i.split(""))).subscribe(System.out::println);
//r
//o
//b
//....

flatMap vs map

flatMap map
转换一个来源对应一个Flux(N个) 转换一个来源对应一个输出
subscribe每一个Flux 单纯的处理转换
非同步(async) 同步(synchronous)

mergeWith

类似的还有mergeWith,他是单纯传入一个publisher而且是同一个类别的资料,图形上可以看出来都是圆形的图案,也就是合并两个内含是同类别的Publisher。
https://ithelp.ithome.com.tw/upload/images/20210922/20141418yW0z3kElQq.png
Filter
stream中常常使用到的filter过滤,过滤出符合条件的资料。
https://ithelp.ithome.com.tw/upload/images/20210922/20141418jnK5fArLYb.png
Zip
Zip会合并多个Publisher,通常会搭配map/flatMap使用,根据文件可以看出回传Tuple物件,单纯就是帮你把各个物件组合起来方便取用,要注意就是长度若不一样,会在最短的完成的时候就结束。
https://ithelp.ithome.com.tw/upload/images/20210922/20141418pARb5D4lLm.png
这个范例是可能当你有拿到各种资料(依造顺序),需要把所有资料组合成一个客户的物件时,zip就能很好的处理,下面有三个基本资料的Flux,透过zip整合配对後转为Customer,要注意第三个人因为性别没有资料就不会处理到。

  {
     Flux<String> name = Flux.just("Robert", "Jean", "Jerry");
    Flux<Integer> age = Flux.just(30, 29, 36);
    Flux<String> sex = Flux.just("M", "F");
    Flux<Tuple3<String, Integer, String>> zip = Flux.zip(name, age, sex);
     zip
        .map(data -> new Customer(data.getT1(), data.getT2(), data.getT3()))
        .subscribe(System.out::println);
//    Customer{name='Robert', age=30, sex='M'}
//    Customer{name='Jean', age=29, sex='F'}
  }

  class Customer{
    private String name;
    private Integer age;
    private String sex;

    public Customer(String name, Integer age, String sex) {
      this.name = name;
      this.age = age;
      this.sex = sex;
    }

    @Override
    public String toString() {
      return "Customer{" +
          "name='" + name + '\'' +
          ", age=" + age +
          ", sex='" + sex + '\'' +
          '}';
    }
  }

结语

部分的操作跟java stream类似,其余的等用到再来说明,下一篇来说明之前提到的backpressure

资料来源


<<:  D3JsDay07不懂资料格式,那就会我们曾相识,只是不合适—档案格式介绍

>>:  Day 7. Compare × G2 × Draft

出生第39天 打疫苗日

原本想回诊禾馨的时候顺便带幼兽去小禾馨打满一个月可以打的B肝疫苗,但完全预约不到,打电话去问说有没有...

[Java Day26] 6.3. super

教材网址 https://coding104.blogspot.com/2021/06/java-s...

[Day3] Vite 出小蜜蜂~ Game Loop!

Day3 软件架构 这边卡比要介绍一个名词, Software Architecture 软件架构。...

Day-30 资讯安全宣导

资讯安全宣导 tags: IT铁人 何谓资讯安全 随着资讯科技进步,资讯安全的重要程度日渐提升,以杰...

D3JsDay07不懂资料格式,那就会我们曾相识,只是不合适—档案格式介绍

格式介绍 通常你的资料会是档案、API接口或是一个连结作为D3输入的资料,这边就以下常见的资料格式简...