[Day 9] Reactive Programming - Backpressure

前言

大部分介绍Reactive Programming都一定会提到Backpressure,可能放在第九天有点稍晚,但我觉得有基本的Reactor观念後再来看也不迟。

Backpressure

Reactive Programming 常常提到的backpressure,这在现实里的管道中也会有,在软件提到的背压(backpressure)是指当生产者(producer )生产超过消费者(consumer),为了避免消费者被过多的资料所淹没的一个保护机制。例如拿宝特瓶来喝水,背压就是正常的喝水,而没有了这个机制,就像是喝到一半有人用力捏宝特瓶,水就会来不及喝下去喷出来。

用程序码来看介绍,interval会随着时间不断的emits data,delay会延迟Subscriber去接受,导致渐渐的DATA溢出报错。

 Flux.interval(Duration.ofMillis(1)) 
        .log()   
        .delayElements(Duration.ofMillis(100))
        .blockLast();

https://ithelp.ithome.com.tw/upload/images/20210923/201414183GhaiLRwes.png
https://ithelp.ithome.com.tw/upload/images/20210923/20141418fQ5SNsX0Cj.png

reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks) 
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:233) 
	at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:132) 
	at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59) 
	at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73) 
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) 
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) 
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) 
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
	at java.base/java.![https://ithelp.ithome.com.tw/upload/images/20210923/201414186VL7Y0MpU6.png](https://ithelp.ithome.com.tw/upload/images/20210923/201414186VL7Y0MpU6.png)til.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
	at java.base/java.lang.Thread.run(Thread.java:834)

稍微调整一下,加上一个buffer就能避免太快速的报错,但若是Buffer被塞满了仍然是会出错,因为interval是根据时间不断推送资料,较难去维持backpressure的机制,这边只是为了demo而使用。

    Flux.interval(Duration.ofMillis(1)) 
        .log() 
        .onBackpressureBuffer() 
        .concatMap(x -> Mono.delay(Duration.ofMillis(100))) 
        .blockLast();

https://ithelp.ithome.com.tw/upload/images/20210923/20141418Ee7woewTDF.png

Range

range则会定义出总共emit的量,这时候透过limitRate(3)是subscribers 只会每次要求3个而不会因为太多的资料量而导致错误,也就是如果publisher可以根据下游(downstream)的要求来决定产生资料的速度,就算是有backpressure的机制

Flux.range(1, 20) 
        .log() 
        .limitRate(3) 
        .subscribe();

https://ithelp.ithome.com.tw/upload/images/20210923/20141418jUtmCQSbzL.png
https://ithelp.ithome.com.tw/upload/images/20210923/20141418r4LVAsJNLo.png

结语

Backpressure在Reactive Programming是蛮重要的观念,後面的文章也会陆续提到Reactor的Api对应Backpressure会有一些处理机制。

资料来源


<<:  ui li 列出清单标签-基础语法

>>:  [Day 10] Sass - Values

11.移转 Aras PLM大小事- 汇出 Mutiple Level BOM

汇出 Mutiple Level BOM 如果要制作出下图表格,可以利用原厂的程序码稍微修改 这样就...

@Day9 | C# WixToolset + WPF 帅到不行的安装包 [自订动作介接画面-安装後执行]

好啦! 将三天的工给做最後整理 花了几天在讲自订动作与页面这个东西,也该有个结束了! 我们现在将De...

vok-orm 关联性资料的新增/查询 (下篇) + Vaadin 自订样式 - d09

承上篇 Refactor 上篇实作完成执行结果如上图所示,蓝色区块为该学生成绩,红色区块为新增成绩编...

【day5】二林&员林特色小吃

疫情前期看到千千拍摄《我们回家吧》系列 刚好跟男友回员林 所以就照着千千的推荐名单走一次罗 阳光老店...

.NET 新手 无痛入职 _ Day2 环境与框架

Visual Studio Code Visual Studio Code 是微软开发且跨平台的免费...