[Day 19] Reactive Programming - Reactor (operator fusion)

前言

这篇挣扎了很久要不要写,算是进阶一点的主题,内容虽然不多,但已经让我绞尽脑汁,关於这个主题我自己也还是有不了解的部分,但有监於蛮多影片都会提到operator fusion,决定还是来介绍一下大致上的意思,至於内部的细节只能待高手来补了。基本上就是参考整理并简化了这篇原文David Karnok Operator-fusion (Part 1),简中翻译Piasy

upslash

图片来源:upslash/greg

operator fusion

operator就是我们常用到的map、filter、merge等操作Reactive stream的方式,fusion则是融合,简而言之就是透过不同的方式融合operator来达到增加效能节省时间或空间的功效,就称为 operator fusion,融合的方式又分为两大类Macro-fusionMicro-fusion,Macro的意思是宏观,Macro-fusion就是将多个operator合并为单个,反之Micro则是微观,Micro-fusion就是将各operator内部用到的资源共用。

历史

reactive programming 是不断的在演化的,而reactive的历程分为四代

  • 第0代:盘古开天时期,只有java.util.Observable API,就跟之前介绍Observer pattern差不多,没有中间层而且无法组合。
  • 第1代:一旦发布无法取消,Producer 和 Consumer无法沟通,若是速率不一致将导致backpressure问题。
  • 第2代:Subscriber可自行决定是否还需要资料,双方多了沟通管道来支援backpressure
  • 第3代:各公司合作统一定义了Reactive-Streams Spec,彼此兼容。
  • 第4代:operator fusion
  • 第5代:未来目标(2016年),reactive IO.....等新特性。

Macro-fusion

合并或更换operator

之前介绍到publishOn/subscribeOn,如果在source operator(just、range.....)之後是subscribeOn意义就不大,这时候替换publishOn就会更好一些。

修改operator参数

可以考虑将性质差不多的operator合并提升效能。

Flux.range(1, 10) 
   .filter(v -> v % 3 == 0) 
   .filter(v -> v % 2 == 0) 
   .map(v -> v + 1) 
   .map(v -> v * v) 
   .subscribe(System.out::println);
Predicate<Integer> p1 = v -> v % 3 == 0; 
Predicate<Integer> p2 = v -> v % 2 == 0; 
Predicate<Integer> p3 = v -> p1.test(v) && p2.test(v);

Micro-fusion

有些operator内含伫列,有些operator需要传入伫列,这时候如果能内部共用就可以节省记忆体的空间开校,也能节省不断创建伫列的效能影响,

同步(Synchronous-fusion)

如果资料来源是同步的而且可以被视为伫列,像是range,fromIterable,fromArray,fromStreamfromCallable,just也算,但是just大部分是用在Macro-fusion,operator内部用到伫列的包含像是 observeOn(), flatMap()publish(), zip()等等,如果在onSubscribe()的时候,发现之前说明用来沟通上下游的Subscription有实作Queue interface,就直接使用而不需自行创建。来先看一下原始码比较有感觉。
这个是Flux.range实际上的类别,可以看到有implements Fuseable,而他的doc就清楚的写专门为了 Micro-fusion

final class FluxRange extends Flux<Integer>
  implements Fuseable, SourceProducer<Integer> {
}
/**
 * A micro API for stream fusion, in particular marks producers that support a {@link QueueSubscription}.
 */
public interface Fuseable {
}

flatMap实际上的class  的其中一个方法,从以下程序码可以看到他直接使用而不是创建新的。
FluxFlatMap.java trySubscribeScalarMap(...)

if (!fuseableExpected || p instanceof Fuseable) {
 p.subscribe(s);
}
else {
 p.subscribe(new FluxHide.SuppressFuseableSubscriber<>(s));
}

原作者声称因为这样的调整让range().observeOn()的吞吐量从55M Ops/s ->200M Ops/s  提升了接近四倍。

结语

operator fusion可以有效降低reactive dataflows的效能开销,主要的面向可能是library的开发人员,内容上的确也是比较艰深,我只有挑其中比较能理解的地方拿出来分享,希望以上的介绍可以让大家对operator fusion有一定的了解,建议可以去看看原文还有提到很多不同的方式。


<<:  Day19 Vue元件的命名规则 & 单一元件(SFC)

>>:  我想当工程师!要念资讯相关科系吗?

【从零开始的Swift开发心路历程-Day17】简易订单系统Part1

昨天安装完Realm之後,今天我们来实做一个简易的订单系统吧!透过TextField及Button新...

28 - lint-staged - Lint Git Commit 的档案

做 lint 、 format 或是通过测试,对於程序码的品质维护有很大的帮助,因此在提交代码时,我...

为了转生而点技能-JavaScript,day17(原型-prototype、自订原型、新增method

本篇记录有关prototype的定义,自订及新增methods的简单操作。 原型:prototype...

Day 31: 【全系列终】架构考古学

Appendix: 架构考古学 联盟会计系统 简述 1960 年代,很简单的 CRUD 记帐系统,由...

从零开始的8-bit迷宫探险【Level 2】Xcode 开发环境介绍

今日目标 安装 Xcode 使用模拟器执行游戏专案 认识 Xcode 的开发环境 使用 Playgr...