java 9的时候新增支援Reactive Stream,所以在介绍Spring Reactor
、WebFlux
之前先来认识一下Java 原生的Flow Api。
java 9更新推出对应Reactive Streams 规格的分别有三个interface,Publisher
负责产生item来推送给一个或多个Subscribers
去使用(consumed),每一个item都被Subscription
所管理,中间有一或多个Processor
来处理转换或是特别的逻辑。
Subscriber
订阅Publisher
,Publisher
负责产生推送资料,透过Subscription
居中管理, Subscriber
会提出需要的资料量,这样就有背压(backpressure)机制(之後介绍),来避免Subscriber来不及消化大量资料导致系统异常。下方是java.util.concurrent.Flow 移除掉文件说明的部分,从程序码可以看出
Publisher
提供订阅Subscriber
有四个方法 分别为
Subscription
Publisher
结束时/*
public final class Flow {
private Flow() {} // uninstantiable
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Subscription {
public void request(long n);
public void cancel();
}
}
实际动手做一个简单听Podcast要取得节目内容的DEMO
建立Member会员实作Subscriber
,假设会员要订阅Podcast频道,订阅时同步subscription
并要求一个item,每接受到一个item就存入episodes 并要求下一个,最後当Publisher
结束时则会印出onComplete。
@Data
public class Member <T> implements Subscriber<T> {
private Subscription subscription;
private List<T> episodes = new LinkedList<>();
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
System.out.println("onNext:" + item);
episodes.add(item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError:" + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
}
因为Publisher
比较复杂我们直接拿原生的SubmissionPublisher
来使用,实际DEMO情况如下
新增一个会员跟一个Podcast频道并订阅,将内容推送到频道当中,在第一时间印出是可以发现尚未收到任何集数,因为整个流程是非同步的在背後进行,所以当我们暂停一秒钟後就可以正常到看到结果。
@Test
void testJava9Reactive() throws InterruptedException {
Member<String> member = new Member<>();
SubmissionPublisher<String> podcastChannel = new SubmissionPublisher<>();
podcastChannel.subscribe(member);
Assertions.assertEquals(1, podcastChannel.getSubscribers().size());
List<String> episodes = List.of("1", "2", "3", "4");
episodes.forEach(podcastChannel::submit);
System.out.println(member.getEpisodes());
Thread.sleep(1000);
podcastChannel.close();
System.out.println(member.getEpisodes());
/* output:
[]
onNext:1
onNext:2
onNext:3
onNext:4
[1, 2, 3, 4]
onComplete
*/
}
今天终於开始进入到Reactive Programming并带有一点实作,下一篇会继续介绍今天用到的SubmissionPublisher
与一开始提到的Processor。
<<: Day 4:透过 npm、Hexo 指令在本机端安装你的 Hexo 部落格
当每个Class(Image)都写的简洁有力又好用、且权责都分得乾乾净净的时候就可以加入一个Serv...
终於写道flow的最後一篇了 尽管我们在自己的开发上,能够高兴地写coroutine和suspend...
经历了一个月的洗礼,又再一次完成了铁人赛。 当然不免俗的,最後来一篇赛後检讨。 这篇分成三个大部分来...
接续上一章 规划阶段 专案工作分解 将工作分成小项目,以便分工、时间规划 目标分解结构 OBS (o...
iOS工程师面试深入浅出- 物件导向的三大特性? 这题乍看之下是很本科系的问题,但事实上,当你在开发...