卡哇勾嘎!!!想必大家都有开发过Publisher-Subscriber架构,有些开发者可能是透过第三放套件进行介接,如:Redis、Kafka或RabbitMQ等套件,或者您系统是在JAVA 8以前开发的,可能是采用RxJAVA或者自己实作Observable Pattern进行管理各项讯息传播,小编今日要介绍的Spring Reactor套件与RxJava 2采用共同一套接口API标准Reactive Streams Commons,故说明他们的最终目的都是一致的,且这些API都有通用性,如果您曾经有RxJava的开发经验的话,在今天这个Flux初阶介绍,想必是探囊取物一般的简单,想不想变忍者龟一样,可以自由地在下水道自由行走,那现在我们就开始来深入分析罗。
Reactor 3是基於JDK中提供的java.util.function来设计实现的,可以很轻松地从Java.util.stream.Stream转为Flux(RxJava中的Flowable类型),亦可轻松转回Stream,也可很快速地实现CompletableFuture与Mono(Mono也支援Publisher接口,可被理解为RxJava 2中对Single的背压(Back Pressure)加强版)之间的互相转换,所以可以很轻松且安全的基於Optional类型的元素创建Mono,所以在操作上可快方便的应用於Spring Framework 5,适用於各种新版的JDK,也是小编为什麽一开始选用Java 15的原因罗。
根据下图我们可以看到三角行为开始执行订阅,圆圈可以视为一个物件、I可视为输入原子(int,long,float等)、X可视为关闭订阅频道(channel),所有推播内容会循序发送给个订阅者,若推播端发生错误或关闭时,会自动将所有订阅者进行关闭,并停止进行接收任何内容。
图一、实现Publisher接口原理
小编延续前几天的架构,新增一个推播产品控制器,此产品可推播给台湾及中国两个地区国家共同销售,故小编先建立一个产品推播服务(CacheSubscribeService),由台湾(SeaFoodRetailerServiceImpl)区服务及中国(ChinaSeaFoodRetailerServiceImpl)区服务当订阅者,透过以下范例可得知,小编建立一个水池道称为seaFoodSink,并建立一个通量道(Flux)配置推播者入口道(sink),在配置订阅者排成类型及行为後,即可开始进行产品的推播任务,控制器仅需把产品放入seaFoodSink中,即可让相关服务开始进行任务,以下程序码范例提供参考。
// Publisher 服务端配置
@Service
public class CacheSubscribeServiceImpl implements CacheSubscribeService {
Logger logger = LoggerFactory.getLogger(CacheSubscribeServiceImpl.class);
private static FluxSink<SeaFood> seaFoodSink;
@Autowired
@Qualifier("seaFoodRetailService")
SeaFoodRetailerService taiwanSeaFoodRetailerService;
@Autowired
@Qualifier("chinaSeaFoodRetailService")
SeaFoodRetailerService chinaSeaFoodRetailerService;
@PostConstruct
public void init() {
Flux.<SeaFood>create(sink -> this.seaFoodSink = sink)
.doOnNext(seaFood -> {
try {
taiwanSeaFoodRetailerService.createSeaFood(seaFood);
chinaSeaFoodRetailerService.createSeaFood(seaFood);
} catch (SeaFoodRetailerGenericException e) {
logger.error("Create Sea Food into all Place Fail. ex:" + e.toString());
}
})
.onErrorReturn(new SeaFood())
.subscribeOn( Schedulers.elastic())
.subscribe(seaFood -> {
System.out.println("Subscribe model : " + new Gson().toJson(seaFood));
});
}
@Override
public SeaFood subscribeAllLocationSeaFoodProducts(SeaFood seaFood) {
seaFoodSink.next(seaFood);
return seaFood;
}
}
// 控制器配置
@RestController
public class PublishProductController extends ControllerBase {
@Autowired
CacheSubscribeService cacheSubscribeService;
@PostMapping(
value="/${sea.food.api.all}/create",
produces = MediaType.APPLICATION_JSON_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE
)
ResponseEntity<SeaFood> createSeaFood(@RequestBody SeaFood entity) throws SeaFoodRetailerGenericException {
return new ResponseEntity<>(
cacheSubscribeService.subscribeAllLocationSeaFoodProducts(entity)
, HttpStatus.CREATED
);
}
}
测试结果,进行触发推播产品
推播产品
台湾区及中国区产品均已加入
故我们可以看到透过Flux可以快速的传播各项物件,建议最佳的使用环境是在Socket服务上喔,这样双边介面都可快速达到映射物件转到作用。
由下图可看出,LambdaSubscriber实现了CoreSubscriber接口,该接口可衍生出各式各样的订阅者,对於生产者Publisher接口,也是可衍生多个,Flux留了抽象subscribe方法,提供给各类别具体实现类来实现,CoreSubscriber继承了org.reactivestreams.Subscriber接口,并加入一些Reactor3特有的Context功能来实现,所以我们可得知此订阅逻辑概念,如果在LambdaSubscriber中的引数值subscriptionConsumer不为空的,就会触发onSubscribe方法,反之,则自动触发onNext方法,详细各位可参照程序码逻辑,这边就不在多加详述。
图二、讯息发送流程图
透过API监测结果,我们可看到此API创建顺利,并回传成功建立的代码201,无相关异常产生。
Java 编成方法论 - 响应式Spring Reactor 3设计与实现
Reactive Spring实战 -- 理解Reactor的设计与实现
<<: 22 - Commitizen - 产生合法的 Commit 讯息
>>: 延伸(2)-ML到底要不要念统计 | ML#Day30
今天依然手动 redirect 【Day 5】逻辑时间与广播 反正网路上讲 dp 的多的是,dp写得...
Heroku PostgreSQL是一种Heroku提供的PostgreSQL服务,可免费使用,免费...
笔者接触软件的时间不长,先後接触三种架构,分别为 ASP.NET MVC、ASP.NET Core ...
Future介面定义有get()方法以及isDone()方法,其目的就是在呼叫get()时看看能不能...
欢迎大家来看我的文章,这次我挑战的主题是 Android 架构,就如同我简介中说的,关於架构方面的文...