[Day - 23] - Spring Reactor之进入忍者龟的Flux

Abstract

卡哇勾嘎!!!想必大家都有开发过Publisher-Subscriber架构,有些开发者可能是透过第三放套件进行介接,如:Redis、Kafka或RabbitMQ等套件,或者您系统是在JAVA 8以前开发的,可能是采用RxJAVA或者自己实作Observable Pattern进行管理各项讯息传播,小编今日要介绍的Spring Reactor套件与RxJava 2采用共同一套接口API标准Reactive Streams Commons,故说明他们的最终目的都是一致的,且这些API都有通用性,如果您曾经有RxJava的开发经验的话,在今天这个Flux初阶介绍,想必是探囊取物一般的简单,想不想变忍者龟一样,可以自由地在下水道自由行走,那现在我们就开始来深入分析罗。

Principle Introduction

Reactor 3是基於JDK中提供的java.util.function来设计实现的,可以很轻松地从Java.util.stream.Stream转为Flux(RxJava中的Flowable类型),亦可轻松转回Stream,也可很快速地实现CompletableFuture与Mono(Mono也支援Publisher接口,可被理解为RxJava 2中对Single的背压(Back Pressure)加强版)之间的互相转换,所以可以很轻松且安全的基於Optional类型的元素创建Mono,所以在操作上可快方便的应用於Spring Framework 5,适用於各种新版的JDK,也是小编为什麽一开始选用Java 15的原因罗。
根据下图我们可以看到三角行为开始执行订阅,圆圈可以视为一个物件、I可视为输入原子(int,long,float等)、X可视为关闭订阅频道(channel),所有推播内容会循序发送给个订阅者,若推播端发生错误或关闭时,会自动将所有订阅者进行关闭,并停止进行接收任何内容。

图一、实现Publisher接口原理
image

小编延续前几天的架构,新增一个推播产品控制器,此产品可推播给台湾及中国两个地区国家共同销售,故小编先建立一个产品推播服务(CacheSubscribeService),由台湾(SeaFoodRetailerServiceImpl)区服务及中国(ChinaSeaFoodRetailerServiceImpl)区服务当订阅者,透过以下范例可得知,小编建立一个水池道称为seaFoodSink,并建立一个通量道(Flux)配置推播者入口道(sink),在配置订阅者排成类型及行为後,即可开始进行产品的推播任务,控制器仅需把产品放入seaFoodSink中,即可让相关服务开始进行任务,以下程序码范例提供参考。

  // Publisher 服务端配置
@Service
public class CacheSubscribeServiceImpl implements CacheSubscribeService {

    Logger logger =  LoggerFactory.getLogger(CacheSubscribeServiceImpl.class);

    private static FluxSink<SeaFood> seaFoodSink;

    @Autowired
    @Qualifier("seaFoodRetailService")
    SeaFoodRetailerService taiwanSeaFoodRetailerService;


    @Autowired
    @Qualifier("chinaSeaFoodRetailService")
    SeaFoodRetailerService chinaSeaFoodRetailerService;

    @PostConstruct
    public void init() {
        Flux.<SeaFood>create(sink -> this.seaFoodSink = sink)
                .doOnNext(seaFood -> {
                    try {
                        taiwanSeaFoodRetailerService.createSeaFood(seaFood);
                        chinaSeaFoodRetailerService.createSeaFood(seaFood);
                    } catch (SeaFoodRetailerGenericException e) {
                        logger.error("Create Sea Food into all Place Fail. ex:" + e.toString());
                    }
                })
                .onErrorReturn(new SeaFood())
                .subscribeOn( Schedulers.elastic())
                .subscribe(seaFood -> {
                    System.out.println("Subscribe model : " + new Gson().toJson(seaFood));
                });

    }

    @Override
    public SeaFood subscribeAllLocationSeaFoodProducts(SeaFood seaFood) {
        seaFoodSink.next(seaFood);
        return seaFood;
    }

}

  
  // 控制器配置
  @RestController
public class PublishProductController extends ControllerBase {

    @Autowired
    CacheSubscribeService cacheSubscribeService;

    @PostMapping(
            value="/${sea.food.api.all}/create",
            produces = MediaType.APPLICATION_JSON_VALUE,
            consumes = MediaType.APPLICATION_JSON_VALUE
    )
    ResponseEntity<SeaFood> createSeaFood(@RequestBody SeaFood entity) throws SeaFoodRetailerGenericException {
        return new ResponseEntity<>(
                cacheSubscribeService.subscribeAllLocationSeaFoodProducts(entity)
                , HttpStatus.CREATED
        );
    }

}

测试结果,进行触发推播产品
推播产品
image

台湾区及中国区产品均已加入

image

image

故我们可以看到透过Flux可以快速的传播各项物件,建议最佳的使用环境是在Socket服务上喔,这样双边介面都可快速达到映射物件转到作用。

Structure

由下图可看出,LambdaSubscriber实现了CoreSubscriber接口,该接口可衍生出各式各样的订阅者,对於生产者Publisher接口,也是可衍生多个,Flux留了抽象subscribe方法,提供给各类别具体实现类来实现,CoreSubscriber继承了org.reactivestreams.Subscriber接口,并加入一些Reactor3特有的Context功能来实现,所以我们可得知此订阅逻辑概念,如果在LambdaSubscriber中的引数值subscriptionConsumer不为空的,就会触发onSubscribe方法,反之,则自动触发onNext方法,详细各位可参照程序码逻辑,这边就不在多加详述。

图二、讯息发送流程图
image

Monitor Result

透过API监测结果,我们可看到此API创建顺利,并回传成功建立的代码201,无相关异常产生。
image

Sample Source

Spring-Sample-Flux

Reference Url

Java 编成方法论 - 响应式Spring Reactor 3设计与实现

Reactive Spring实战 -- 理解Reactor的设计与实现

响应式程序设计简介之:Reactor


<<:  22 - Commitizen - 产生合法的 Commit 讯息

>>:  延伸(2)-ML到底要不要念统计 | ML#Day30

【LeetCode】Dynamic Programming I

今天依然手动 redirect 【Day 5】逻辑时间与广播 反正网路上讲 dp 的多的是,dp写得...

DAY 13 资料库-建立并操作Heroku PostgreSQL

Heroku PostgreSQL是一种Heroku提供的PostgreSQL服务,可免费使用,免费...

Day 01 初见Blazor

笔者接触软件的时间不长,先後接触三种架构,分别为 ASP.NET MVC、ASP.NET Core ...

Day24:老板我等等来拿

Future介面定义有get()方法以及isDone()方法,其目的就是在呼叫get()时看看能不能...

前言与自我回顾

欢迎大家来看我的文章,这次我挑战的主题是 Android 架构,就如同我简介中说的,关於架构方面的文...