好的,我们已先行叙述过Flux及Mano两项角色套件,最後,我们开始进行介绍Reactor间接角色套件,Reactor 3 是在Spring Framework 5.2中才开始进行登场,Reactor再以发展近六年的时间,但相对较少人使用,今天来跟大家介绍核心处理器(Processor)功能,它有两个实现类MonoProcessor及FluxProcessor,由於Mono小编昨日以提过,仅支援一对一架构,故小编今日就只有介绍由FluxProcessor延伸出来的六种处理器罗,我们将运用此新技术进行开发一个仿造交易所的OrderBook功能。
Processor是一个很特别的存在,他是一个推播器(Publisher),同时也是一个订阅者(Subscriber),也就是说可同时行使两种权力,在前天我们可以通过特定累Create方法来创建一个FluxSink推播者,但若您想设置一个初始的生产源,则可以通过此调用已创建好Processor实例的sink方法,通过他创建一个SerializedSink或SerializeOnRequestSink(其内部调用的都是FluxCreate.createSink中相关方法),所以在触发FluxHandle中包装的订阅者HandleSubscriber的onNext方法时,才会调用BiConsumer,此BiConsumer的内部最多只能调用一次SynchronousSink#next(Object),其中会调用0~1项SynchronousSink#error(Throwable生产者异常的情况下)或SynchronousSink#complete(定义某些情况下直接结束)操作。下面有六项延伸的处理器(Processor),小编陆续在下面表格做介绍。
Processor name | Description |
---|---|
TopicProcessor | 算是一个相对复杂的Processor,这是一个异步处理器,基於share方法获得一个TopicProcessor时,可以转发从上游多个元素生产下发元素(但同时只能订阅一个Publisher)。 |
WorkQueueProcessor | 与TopicProcessor相同,也是一个异步处理器,shared设置为true时,会支持多个元素生产执行绪并下发元素 |
ReplayProcessor | 可针对推播者或通过它自己创建的Sink实力向下发送元素及进行缓存,後来新订阅者可以重复接收这些元素,可透过create()方法配置缓存数量,即可达到暂存量效用。 |
EmitterProcessor | 可同时有多个订阅者,并为订阅者提供背压支持功能,也可作为订阅者来订阅上游订阅者,并同步向下发送元素给自己的订阅者们,也就是说只要第一个订阅者配置dispose(),後面所有订阅者都会收不到推播者讯息,开发者要小心配置才行。 |
DirectProcessor | 可接收0到N个Subscriber,DirectProcessor不支持背压,内部没有用於存储元素的数据结构(onNext也没有提供元素的缓存操作) |
UnicastProcessor | 通过一个字定义的queue来实现背压方式,仅允许一个订阅者(Subscriber)及UnicastProcessor(单一推播器)可以在多个执行绪(Thread)中生产元素。 |
# 背压方式(Backpressure) : 输入转为输出过程以某种方式受到阻碍,此种阻力为计算处理速度,又称吞吐量(throughput)
通过以上的处理器介绍,小编依旧将先前的销售产品服务API 继续延续,我们在此套加上一个最新的三项产品推播功能,只要有新的产品会将其加入ReplyProcessor来做新产品的缓存处理器,小编称之Product OrderBook,可参照一下程序码。
配置台湾区及中国区的ReplyProcessor,并建立各自的资料流推播器(FluxSink),再将预设好的产品推播进去。
public class ProductsOrderBookServiceImpl implements ProductsOrderBookService {
ReplayProcessor<SeaFood> taiwanProductProcessor = null;
ReplayProcessor<SeaFood> chinaProductProcessor = null;
FluxSink<SeaFood> taiwanProductSink = null;
FluxSink<SeaFood> chinaProductSink = null;
@PostConstruct
public void init() {
//预设配置三项缓存
this.taiwanProductProcessor = ReplayProcessor.create(3);
//建立台湾区推播器
this.taiwanProductSink = this.taiwanProductProcessor.sink();
//将台湾区预设产品推播出去
SEA_FOOD_CACHE_TAIWAN
.asMap()
.values()
.stream()
.forEach(seaFood -> taiwanProductSink.next(seaFood));
//预设配置三项缓存
this.chinaProductProcessor = ReplayProcessor.create(3);
//建立中国区推播器
this.chinaProductSink = this.chinaProductProcessor.sink();
//将中国区预设产品推播出去
SEA_FOOD_CACHE_CHINESE
.asMap()
.values()
.stream()
.forEach(seaFood -> chinaProductSink.next(seaFood));
}
}
当API触发时,确认是新品推播到OrderBook中
@Override
public SeaFood createSeaFood(SeaFood seaFood) throws SeaFoodRetailerGenericException {
validateNullId(seaFood);
Mono.fromCallable(() ->seaFood).subscribe(
new Consumer<SeaFood>() {
@Override
public void accept(SeaFood seaFood1) {
SeaFood product = SEA_FOOD_CACHE_TAIWAN.asMap().putIfAbsent(seaFood.getId(),seaFood);
if (product == null)
//Map 产品池不存在,将产品加入OrderBook
productsOrderBookService.publishTaiwanProduct(seaFood);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
try {
logger.error("Taiwan product create fail. ex:{}",throwable.getMessage());
throw new SeaFoodRetailerGenericException("Out of EXPECT error.");
} catch (SeaFoodRetailerGenericException e) {
e.printStackTrace();
}
}
},
()->logger.info("Create Taiwan product success ! "));
return seaFood;
}
建立取得OrderBook API ,并将各自的OrderBook放置Map集合中。
@Autowired
ProductsOrderBookService productsOrderBookService;
@GetMapping(
value="/orderbook/${sea.food.api.all}",
produces = MediaType.APPLICATION_JSON_VALUE,
consumes = MediaType.APPLICATION_JSON_VALUE
)
ResponseEntity< Map<String , List<SeaFood>>> orderBookSeaFood() throws SeaFoodRetailerGenericException {
Map<String , List<SeaFood>> orderBookMap = new LinkedHashMap<String,List<SeaFood>>();
orderBookMap.put("TW",productsOrderBookService.getTaiwanOrderBook());
orderBookMap.put("CN",productsOrderBookService.getChinaOrderBook());
return new ResponseEntity<>(
orderBookMap
, HttpStatus.CREATED
);
}
透过以上范例我们可透过ReplyProcessor来实作各种区块链交易所得OrderBook,再透过Redis 进行持续存取效用,会达到最佳效益喔。
图1. Default OrderBook
图2. Create Taiwan Product
图3. Create China Product
图4. After Create Product OrderBook
其缓存分为无限存取与有限存取两种情况,其缓存是可重复向订阅者发送,元素不会随者一次向下发送而消失,缓存的型态非一般阵列,而是采用另外他种数据结构,若要取用最後一个元素,即可呼叫cacheLast方法,此项处理器还有特别之处分别为可结合历史元素的缓存数量和元素缓存时间限制来设计的API,即为createSizeAndTimeout方法。最後则为通过FluxReplay.SizeAndTimeBoundReplayBuffer实现基於时间调度的ReplayProcessor,缓存的历史元素有过期时间限制。从这边逻辑来看,我们可以看出Processor是一个中间者,当进行下发元素时,他会充当为Flux的一个订阅者角色,此时为调用它的onNext方法进行发送元素,同时,Processor作为生产者的作用也可发挥出来,并进他推播给自己在代理,即可将Flux上游发送出来的元素向下发送给订阅自己的二级订阅者。即可完成快取重复发送给订阅者行为,故整合下方结构图(图),我们可看出在没有需求的情况下,并不会主动进行发送元素的行为,所以就看不到任何输出。
图5. Reply Process架构策略图
Java 编成方法论 - 响应式Spring Reactor 3设计与实现
Reactor Processors——响应式Spring的道法术器
<<: Day25-TypeScript(TS)函式(Function)的剩余参数(Rest Parameter)
为什麽要用 k8s ? 使用 k8s 的理由有很多,退一万步来说 .... IT 就是一个由新技术驱...
大家好,欢迎来到我的学习Django暑假之旅,我在这个暑假花了一点时间,摸索了Django相关的架构...
这篇要做的:把订单日期改用 jQuery UI DatePicker + vue component...
好不容易采购了这个贵松松的东西回来,第一次碰 MacOS 肯定会不知道怎麽操作,马上来研究一下基本功...
今天开始我们来介绍一些已经有公开发布成果或是已经有成熟软件提供用户使用的公司产品。 索尼 (Sony...