[Day 13] Reactive Programming - Reactor(Processors & Sinks)

前言

这个主题花了我好多的时间查资料,之前提到动态的产生publisher其实就有sink的概念,但文件上对於sink的描述不是很清楚,当然有可能是我资质驽钝,找了一些其他资料後终於了解其实Sinks就是经过Reactor优化过後的Processors

Processor

在介绍Reactive Programming的时候有提到在Publisher & Subscriber之间有个Processor来扮演中间人转换之类的角色,然而在Reactor中,这些功能大部分其实透过Publisher 的operator就能做到,而其中剩下比较特别的部分,就还是需要使用Processor来特别处理,Processor同样身为Subscriber所以可以直接使用onNext, onComplete and onError (透过Subscriber interface),而这样的行为是比较危险需要谨慎使用,所以Reactor在3.4.0的版本後完全弃用了Processor,取而代之的是Sinks

both abstract and concrete FluxProcessor and MonoProcessor are deprecated and slated for removal in 3.5.0

Sinks

Sinks就是Reactor优化过後的Processors,他是thread-safe也可以避免一些不符合Reactive 规范的设计,相较於Processor.onNext必须是同步的(synchronized),Sinks有提供两种Api,tryEmit* API 会回传EmitResultemit* API提供EmissionFailureHandler,让你可以更容易的根据发送结果来设计你的api或是更容易的处理错误。
Sinks主要有三种:

  • Sinks.One:仅可以传送一个资料,类似Mono
  • Sinks.Many:可以传送多笔资料,类似Flux
  • Sinks.Empty:没有资料,仅能传送终结讯号(terminal signal),类似Mono.empty()
    除了这三种以外其实还有Sinks.unsafe(),其之下也有相对的Sinks.unsafe().one()、Sinks.unsafe().empty()、Sinks.unsafe().Many(),其实就是不保证thread-safe的版本,所以如果你能够确保使用情境会是thread-safe的,使用Sinks.unsafe()可以相对增加效能。
    补充一下EmitResult的种类文件
    https://ithelp.ithome.com.tw/upload/images/20210927/20141418XWKy0RvNOX.png

Sinks.Many

Sinks.Many又分为三种

  • multicast:可以有多个订阅者,每个订阅者并不会都拿到全部且一样的资料,而是只会取得订阅後开始最新的。
  • unicast:透过buffer来处理backpressure,但代价就是只能有一个订阅者。
  • replay:会快取(cache)所有资料来让每一个订阅者都能拿到全部且一样的资料。

下面分别使用三种来转为Flux,产生1~5的资料并且有两个订阅者。
multicast的部分可以看到subscribe2 只会有订阅後的最新一笔资料而已,不会有之前的。

Sinks.Many<Integer> multicastSink = Sinks.many().multicast().onBackpressureBuffer();
multicastSink.emitNext(1, FAIL_FAST);
multicastSink.emitNext(2, FAIL_FAST);
multicastSink.asFlux().subscribe(t -> System.out.println("subscribe1 :" + t));
multicastSink.emitNext(3, FAIL_FAST);
multicastSink.emitNext(4, FAIL_FAST);
multicastSink.asFlux().subscribe(t -> System.out.println("subscribe2 :" + t));
multicastSink.emitNext(5, FAIL_FAST);
/*
subscribe1 :1
subscribe1 :2
subscribe1 :3
subscribe1 :4
subscribe1 :5
subscribe2 :5
     */

unicast可以看到第一个订阅者正常显示,但当subscribe2 出现後随即会出现错误。

Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
unicastSink.emitNext(1, FAIL_FAST);
unicastSink.emitNext(2, FAIL_FAST);
unicastSink.asFlux().subscribe(t -> System.out.println("subscribe1 :" + t));
unicastSink.emitNext(3, FAIL_FAST);
unicastSink.emitNext(4, FAIL_FAST);
unicastSink.asFlux().subscribe(t -> System.out.println("subscribe2 :" + t));
unicastSink.emitNext(5, FAIL_FAST);
/*
subscribe1 :1
subscribe1 :2
subscribe1 :3
subscribe1 :4
reactor.core.Exceptions$ErrorCallbackNotImplemented:
java.lang.IllegalStateException: UnicastProcessor allows only a single Subscriber

     */

replay的subscribe2 一样能取得前面的资料,所以最终两个订阅者拿到的资料都是全部且一样的。

Sinks.Many<Integer> replaySink = Sinks.many().replay().all();
replaySink.emitNext(1, FAIL_FAST);
replaySink.emitNext(2, FAIL_FAST);
replaySink.asFlux().subscribe(t -> System.out.println("subscribe1 :" + t));
replaySink.emitNext(3, FAIL_FAST);
replaySink.emitNext(4, FAIL_FAST);
replaySink.asFlux().subscribe(t -> System.out.println("subscribe2 :" + t));
replaySink.emitNext(5, FAIL_FAST);
/*
subscribe1 :1
subscribe1 :2
subscribe1 :3
subscribe1 :4
subscribe2 :1
subscribe2 :2
subscribe2 :3
subscribe2 :4
subscribe1 :5
subscribe2 :5
     */

再补充说明三个种类建构的方法,

  • multicast()
    • onBackpressureBuffer()是第一个订阅者订阅之前的暂存,之後的订阅者就只会收到最新的资料。
    • onBackpressureBuffer(int bufferSize, boolean autoCancel) 可以传入buffer的大小,并且当所有订阅者都取消订阅後是否自动清除buffer。
    • directAllOrNothing() 只要有一个订阅者变慢(无法消耗(consume更多的资料),则所有订阅者都会停止,直到恢复正常。
    • directBestEffort() 相较於上者,只会停止推送给无法接受资料的订阅者,其他则正常。
  • unicast()
    • onBackpressureBuffer() 这个buffer是用来存唯一订阅者订阅资料已经推送出去的资料,这样才能确保订阅者可以拿到全部,预设是没有上限所以可能会有OOM的风险,Reactor也提供传入自订Queue来限制上限onBackpressureBuffer(Queue),超过的部分就会舍弃掉。
  • replay()
    • limit(int))因为replay会保存资料让所有订阅者都能接受到一样的资料,limit会限制保存的数量。
    • all():所有资料都保存
    • limit(Duration):保存某个时间
    • limit(int, Duration):结合时间跟数量的限制
    • latest():只保存最後一笔
    • latestOrDefault(T):保存最後一笔或是预设值

Sinks.One

类似於Mono,只能有一个值,内含以下三个method,
  • emitValue(T value) 等於 onNext(value) + onComplete()
  • emitEmpty()  等於onComplete(),基本上就是Mono.empty()
  • emitError``(Throwable t) 等於onError(t)

Sinks.Empty

基本上就是Sinks.One,只是没有emitValue(T value)。

结语

以上就是关於Reactor的 Sinks介绍,希望看完有基础的了解,感觉Sinks是比较进阶的使用方式,如果後续有找到使用情境会再补充。

资料来源


<<:  【Day 27】JavaScript 回呼函式(callback function)

>>:  12. Error x Error Handling x Exception

混合模式-30天学会HTML+CSS,制作精美网站

「混合模式」是什麽呢?有用过photoshop的设计师对图片混合模式肯定不陌生,是元素重叠部分的颜色...

【Day 24】Go:http server / request / gRPC 实际操作

今天是一些实作小练习, 稍微看了一些基础函式库, 跟着别人的文章用原生 net/http 做了会吐...

Angular 深入浅出三十天:表单与测试 Day07 - 整合测试实作 - 登入系统 by Template Driven Forms

昨天帮我们用 Template Driven Forms 所撰写的登入系统写完单元测试之後,今天则...

【Day.29】React进阶 - 以Redux Thunk处理非同步资料流

很多时候,我们的state必须要透过HTTP Request从後端取得。然而发送Request常用的...

Unity与Photon的新手相遇旅途 | Day3-介面设定、汇入角色、物件操作

今天的内容为Unity介面设定,影片中忘记跟大家提到可以自己储存介面设定,大家先设定好自己想要的介面...