这个主题花了我好多的时间查资料,之前提到动态的产生publisher其实就有sink
的概念,但文件上对於sink
的描述不是很清楚,当然有可能是我资质驽钝,找了一些其他资料後终於了解其实Sinks
就是经过Reactor优化过後的Processors
。
在介绍Reactive Programming的时候有提到在Publisher
& Subscriber
之间有个Processor
来扮演中间人转换之类的角色,然而在Reactor中,这些功能大部分其实透过Publisher
的operator就能做到,而其中剩下比较特别的部分,就还是需要使用Processor
来特别处理,Processor
同样身为Subscriber
所以可以直接使用onNext
, onComplete
and onError
(透过Subscriber interface
),而这样的行为是比较危险需要谨慎使用,所以Reactor在3.4.0的版本後完全弃用了Processor
,取而代之的是Sinks
。
both abstract and concrete
FluxProcessor
andMonoProcessor
are deprecated and slated for removal in 3.5.0
Sinks
就是Reactor优化过後的Processors
,他是thread-safe也可以避免一些不符合Reactive 规范的设计,相较於Processor.onNext
必须是同步的(synchronized),Sinks
有提供两种Api,tryEmit*
API 会回传EmitResult
,emit*
API提供EmissionFailureHandler
,让你可以更容易的根据发送结果来设计你的api或是更容易的处理错误。
Sinks主要有三种:
Sinks.One
:仅可以传送一个资料,类似Mono
。Sinks.Many
:可以传送多笔资料,类似Flux
。Sinks.Empty
:没有资料,仅能传送终结讯号(terminal signal),类似Mono.empty()
。Sinks.unsafe()
,其之下也有相对的Sinks.unsafe().one()、Sinks.unsafe().empty()、Sinks.unsafe().Many()
,其实就是不保证thread-safe的版本,所以如果你能够确保使用情境会是thread-safe的,使用Sinks.unsafe()
可以相对增加效能。EmitResult
的种类文件Sinks.Many
又分为三种
multicast
:可以有多个订阅者,每个订阅者并不会都拿到全部且一样的资料,而是只会取得订阅後开始最新的。unicast
:透过buffer来处理backpressure,但代价就是只能有一个订阅者。replay
:会快取(cache)所有资料来让每一个订阅者都能拿到全部且一样的资料。下面分别使用三种来转为Flux,产生1~5的资料并且有两个订阅者。
multicast
的部分可以看到subscribe2 只会有订阅後的最新一笔资料而已,不会有之前的。
Sinks.Many<Integer> multicastSink = Sinks.many().multicast().onBackpressureBuffer();
multicastSink.emitNext(1, FAIL_FAST);
multicastSink.emitNext(2, FAIL_FAST);
multicastSink.asFlux().subscribe(t -> System.out.println("subscribe1 :" + t));
multicastSink.emitNext(3, FAIL_FAST);
multicastSink.emitNext(4, FAIL_FAST);
multicastSink.asFlux().subscribe(t -> System.out.println("subscribe2 :" + t));
multicastSink.emitNext(5, FAIL_FAST);
/*
subscribe1 :1
subscribe1 :2
subscribe1 :3
subscribe1 :4
subscribe1 :5
subscribe2 :5
*/
unicast
可以看到第一个订阅者正常显示,但当subscribe2 出现後随即会出现错误。
Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
unicastSink.emitNext(1, FAIL_FAST);
unicastSink.emitNext(2, FAIL_FAST);
unicastSink.asFlux().subscribe(t -> System.out.println("subscribe1 :" + t));
unicastSink.emitNext(3, FAIL_FAST);
unicastSink.emitNext(4, FAIL_FAST);
unicastSink.asFlux().subscribe(t -> System.out.println("subscribe2 :" + t));
unicastSink.emitNext(5, FAIL_FAST);
/*
subscribe1 :1
subscribe1 :2
subscribe1 :3
subscribe1 :4
reactor.core.Exceptions$ErrorCallbackNotImplemented:
java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber
*/
replay
的subscribe2 一样能取得前面的资料,所以最终两个订阅者拿到的资料都是全部且一样的。
Sinks.Many<Integer> replaySink = Sinks.many().replay().all();
replaySink.emitNext(1, FAIL_FAST);
replaySink.emitNext(2, FAIL_FAST);
replaySink.asFlux().subscribe(t -> System.out.println("subscribe1 :" + t));
replaySink.emitNext(3, FAIL_FAST);
replaySink.emitNext(4, FAIL_FAST);
replaySink.asFlux().subscribe(t -> System.out.println("subscribe2 :" + t));
replaySink.emitNext(5, FAIL_FAST);
/*
subscribe1 :1
subscribe1 :2
subscribe1 :3
subscribe1 :4
subscribe2 :1
subscribe2 :2
subscribe2 :3
subscribe2 :4
subscribe1 :5
subscribe2 :5
*/
再补充说明三个种类建构的方法,
multicast()
:
onBackpressureBuffer()
是第一个订阅者订阅之前的暂存,之後的订阅者就只会收到最新的资料。onBackpressureBuffer(int bufferSize, boolean autoCancel)
可以传入buffer的大小,并且当所有订阅者都取消订阅後是否自动清除buffer。directAllOrNothing()
只要有一个订阅者变慢(无法消耗(consume更多的资料),则所有订阅者都会停止,直到恢复正常。directBestEffort()
相较於上者,只会停止推送给无法接受资料的订阅者,其他则正常。unicast()
:
onBackpressureBuffer()
这个buffer是用来存唯一订阅者订阅资料已经推送出去的资料,这样才能确保订阅者可以拿到全部,预设是没有上限所以可能会有OOM的风险,Reactor也提供传入自订Queue来限制上限onBackpressureBuffer(Queue)
,超过的部分就会舍弃掉。replay()
:
limit(int))
因为replay会保存资料让所有订阅者都能接受到一样的资料,limit会限制保存的数量。all()
:所有资料都保存limit(Duration)
:保存某个时间limit(int, Duration)
:结合时间跟数量的限制latest()
:只保存最後一笔latestOrDefault(T)
:保存最後一笔或是预设值Sinks.One
类似於Mono,只能有一个值,内含以下三个method,
emitValue(T value)
等於 onNext(value) + onComplete()
。emitEmpty()
等於onComplete()
,基本上就是Mono.empty()
。emitError``(Throwable t)
等於onError(t)
基本上就是Sinks.One,只是没有emitValue(T value)。
以上就是关於Reactor的 Sinks
介绍,希望看完有基础的了解,感觉Sinks
是比较进阶的使用方式,如果後续有找到使用情境会再补充。
<<: 【Day 27】JavaScript 回呼函式(callback function)
>>: 12. Error x Error Handling x Exception
「混合模式」是什麽呢?有用过photoshop的设计师对图片混合模式肯定不陌生,是元素重叠部分的颜色...
今天是一些实作小练习, 稍微看了一些基础函式库, 跟着别人的文章用原生 net/http 做了会吐...
昨天帮我们用 Template Driven Forms 所撰写的登入系统写完单元测试之後,今天则...
很多时候,我们的state必须要透过HTTP Request从後端取得。然而发送Request常用的...
今天的内容为Unity介面设定,影片中忘记跟大家提到可以自己储存介面设定,大家先设定好自己想要的介面...