如果你的需求是想要累积集满几个subscriber
再开始推送资料,这时候就是ConnectableFlux
派上用场的时候了。某种程度上这跟Hot pubisher高度相关,也可以视作Hot vs Cold part 3
有两种方式面可以将Flux
转为ConnectableFlux
publish
:会根据consumer的消化速度来发送资料(backpressure),类似之前Sinks.Many.multicast
,是Hot publisher,当执行connect
之後的subscribe
只会收到最新的资料。当所有Subscriber停止要资料的时候,publisher也将停止推送资料。replay
: 会将过去的资料缓存下来,类似之前Sinks.Many.replay
也是Hot publisher,可以调整保存资料的数量以及时间,来让之後的Subscriber能取得。publish()
则第二的的订阅无资料,replay(2)
指定保留两笔资料,则能发现是保存最新的两笔资料。Flux<Integer> source =
Flux.range(1, 5)
.doOnSubscribe(s -> System.out.println("subscribed to source"));
ConnectableFlux<Integer> co = source.publish();
co.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("done subscribing");
Thread.sleep(2000);
System.out.println("will now connect");
co.connect();
co.subscribe(System.out::println, e -> {}, () -> {});
/*
done subscribing
will now connect
subscribed to source
1
2
3
4
5
*/
Flux<Integer> source =
Flux.range(1, 5)
.doOnSubscribe(s -> System.out.println("subscribed to source"));
ConnectableFlux<Integer> co = source.replay(2);
co.subscribe(System.out::println, e -> {}, () -> {});
System.out.println("done subscribing");
Thread.sleep(2000);
System.out.println("will now connect");
co.connect();
co.subscribe(System.out::println, e -> {}, () -> {});
/*
done subscribing
will now connect
subscribed to source
1
2
3
4
5
4
5
*/
ConnectableFlux
提供了几个方法来控制publisher是否需要推送资料,而不是单纯像是一般的publisher是透过subscribe触发。
connect()
:手动控制,可以自订需求逻辑判断已经有足够的订阅数再透过connect()来让publisher开始推送资料。autoConnect(n)
:自动控制,传入参数n为订阅数,当订阅数等於n则开始推送资料。refCount(n)
:另外一个方向的控制,剩余还在订阅的数量若小於参数N,则停止推送资料。refCount(int, Duration)
:类似於grace period
,有使用过k8s的人可能比较好理解,就是当订阅数小於N之後再经过传入的时间都没有再让订阅数大於N才会停止推送资料。cache
并不会将Flux
转为ConnectableFlux
,但常常会混为一谈,因为cache
的作用也是将Flux变成Hot pubisher,并且会缓存资料让之後才订阅的也能拿到资料,预设会无上限的保留,但也提供参数可传入用来限制保存的数量,看到这边是不是觉得似曾相似,某种程度上cache
就等於 replay().autoConnect(1)
,一个订阅者就自动connect
,也跟replay
一样会保留资料。
cache() vs replay()
的差异就是 replay()
可以根据需求选择connect()
的时机点或是其他ConnectableFlux
所提供的方法来控制逻辑,cache()
则是当有第一个订阅者就自动connect()
。
replay() vs publish()
的差别在於publish()
并不会保存资料,也就是connect
後的订阅者就只能拿到最新的资料而不会是全部资料,反之replay()
则是可以根据传入的参数决定要保存多少资料。
<<: TailwindCSS 从零开始 - 切一个响应式留言按钮画面
用户Say NO!!的拒绝权力,法规有明确的赋予用户可以放心行使,当用户觉得自己的个资被处理运用有疑...
前言 State跟Props这两个东西其实不会很难,却很重要,基本上你在写React的日子里都会一直...
本文同步更新於blog 前情提要:铁路运输系统,参考范例:运输系统(工厂方法模式) <?p...
Hi 各位大大~ 今天要来分享在 Chrome extension 讯息传递的部分, 主要算是官方文...
类别的基本架构: class ExampleClass{ //你想要放的东西 } 例如这样: cla...