[Day 16] Reactive Programming -Reactor(ConnectableFlux)

图片

图片来源:unsplash fabio

前言

如果你的需求是想要累积集满几个subscriber再开始推送资料,这时候就是ConnectableFlux派上用场的时候了。某种程度上这跟Hot pubisher高度相关,也可以视作Hot vs Cold part 3

ConnectableFlux

有两种方式面可以将Flux转为ConnectableFlux

  1. publish :会根据consumer的消化速度来发送资料(backpressure),类似之前Sinks.Many.multicast,是Hot publisher,当执行connect之後的subscribe只会收到最新的资料。当所有Subscriber停止要资料的时候,publisher也将停止推送资料。
  2. replay: 会将过去的资料缓存下来,类似之前Sinks.Many.replay也是Hot publisher,可以调整保存资料的数量以及时间,来让之後的Subscriber能取得。
    从下面的范例可以看出,使用publish()则第二的的订阅无资料,replay(2)指定保留两笔资料,则能发现是保存最新的两笔资料。
Flux<Integer> source = 
    Flux.range(1, 5) 
        .doOnSubscribe(s -> System.out.println("subscribed to source")); 
ConnectableFlux<Integer> co = source.publish(); 
co.subscribe(System.out::println, e -> {}, () -> {}); 
System.out.println("done subscribing"); 
Thread.sleep(2000); 
System.out.println("will now connect"); 
co.connect(); 
co.subscribe(System.out::println, e -> {}, () -> {}); 
/*
done subscribing 
will now connect 
subscribed to source
1 
2 
3 
4 
5 
*/
Flux<Integer> source =
    Flux.range(1, 5)
        .doOnSubscribe(s -> System.out.println("subscribed to source"));

ConnectableFlux<Integer> co = source.replay(2);

co.subscribe(System.out::println, e -> {}, () -> {});

System.out.println("done subscribing");
Thread.sleep(2000);

System.out.println("will now connect");
co.connect();

co.subscribe(System.out::println, e -> {}, () -> {});
/*
done subscribing 
will now connect 
subscribed to source
1 
2 
3 
4 
5 
4 
5
*/

ConnectableFlux提供了几个方法来控制publisher是否需要推送资料,而不是单纯像是一般的publisher是透过subscribe触发。

  • connect():手动控制,可以自订需求逻辑判断已经有足够的订阅数再透过connect()来让publisher开始推送资料。
  • autoConnect(n):自动控制,传入参数n为订阅数,当订阅数等於n则开始推送资料。
  • refCount(n):另外一个方向的控制,剩余还在订阅的数量若小於参数N,则停止推送资料。
  • refCount(int, Duration):类似於grace period,有使用过k8s的人可能比较好理解,就是当订阅数小於N之後再经过传入的时间都没有再让订阅数大於N才会停止推送资料。

cache

cache并不会将Flux转为ConnectableFlux,但常常会混为一谈,因为cache的作用也是将Flux变成Hot pubisher,并且会缓存资料让之後才订阅的也能拿到资料,预设会无上限的保留,但也提供参数可传入用来限制保存的数量,看到这边是不是觉得似曾相似,某种程度上cache就等於 replay().autoConnect(1),一个订阅者就自动connect,也跟replay一样会保留资料。

https://ithelp.ithome.com.tw/upload/images/20210930/20141418O4MS9JQ8yA.png

总结

cache() vs replay() 的差异就是 replay()可以根据需求选择connect()的时机点或是其他ConnectableFlux所提供的方法来控制逻辑,cache()则是当有第一个订阅者就自动connect()
replay() vs publish() 的差别在於publish()并不会保存资料,也就是connect後的订阅者就只能拿到最新的资料而不会是全部资料,反之replay()则是可以根据传入的参数决定要保存多少资料。

资料来源


<<:  TailwindCSS 从零开始 - 切一个响应式留言按钮画面

>>:  Day-14 传值与传址

Day 27 用户拒绝权定义规划实作

用户Say NO!!的拒绝权力,法规有明确的赋予用户可以放心行使,当用户觉得自己的个资被处理运用有疑...

Day5 State vs Props

前言 State跟Props这两个东西其实不会很难,却很重要,基本上你在写React的日子里都会一直...

Day29. 范例:运输系统 (抽象工厂模式)

本文同步更新於blog 前情提要:铁路运输系统,参考范例:运输系统(工厂方法模式) <?p...

[拯救上班族的 Chrome 扩充套件] Chrome Extention 的讯息传接球

Hi 各位大大~ 今天要来分享在 Chrome extension 讯息传递的部分, 主要算是官方文...

Day 7 Swift语法-基础篇(5/5)-Structures and Classes

类别的基本架构: class ExampleClass{ //你想要放的东西 } 例如这样: cla...