上一篇的范例中有使用到SubmissionPublisher
,才更加地认识到其实Publisher需要做蛮多事情的,这边先简单介绍一下SubmissionPublisher
。
根据Java doc内容,SubmissionPublisher
使用在建构时传入的Executor
来做到持续推送资料给Subscribers
,当Publisher
推送与Subscriber
消费的速度不同时能提供缓冲区(Buffer),若缓冲区到达上限可以有不同的方法来处理,最简单就是卡住(block)直到资源可用。
在范例中有用到两个method,看一下原始码来大概了解一下用途,将subscriber
包装成subscription
,对应到之前说明subscription
是publisher
与subscriber
沟通的桥梁,透过BufferedSubscription
的next
属性(field)来达到像是linked list的效果,可以串联所有的subscriber
并检核是否有重复。
public void subscribe(Subscriber<? super T> subscriber) {
if (subscriber == null) throw new NullPointerException();
int max = maxBufferCapacity; // allocate initial array
Object[] array = new Object[max < INITIAL_CAPACITY ?
max : INITIAL_CAPACITY];
BufferedSubscription<T> subscription =
new BufferedSubscription<T>(subscriber, executor, onNextHandler,
array, max);
synchronized (this) {
if (!subscribed) {
subscribed = true;
owner = Thread.currentThread();
}
for (BufferedSubscription<T> b = clients, pred = null;;) {
if (b == null) {
Throwable ex;
subscription.onSubscribe();
if ((ex = closedException) != null)
subscription.onError(ex);
else if (closed)
subscription.onComplete();
else if (pred == null)
clients = subscription;
else
pred.next = subscription;
break;
}
BufferedSubscription<T> next = b.next;
if (b.isClosed()) { // remove
b.next = null; // detach
if (pred == null)
clients = next;
else
pred.next = next;
}
else if (subscriber.equals(b.subscriber)) {
b.onError(new IllegalStateException("Duplicate subscribe"));
break;
}
else
pred = b;
b = next;
}
}
}
而我们将资料放到publisher的submit实际上就是去呼叫doOffer()
public int submit(T item) {
return doOffer(item, Long.MAX_VALUE, null);
}
doOffer
会根据目前的大小有重试(retry)的机制,主要推送的逻辑是在(BufferedSubscription
) b.offer(item, unowned);
,会透过publisher的Executor
执行ConsumerTask
,在Task中run (BufferedSubscription)consumer.consume()
,在里面就会根据条件去呼叫我们熟悉的subscriber的onSubscribe
、onNext
,这边就先大致了解一下,先不深入探讨。
private int doOffer(T item, long nanos,
BiPredicate<Subscriber<? super T>, ? super T> onDrop) {
if (item == null) throw new NullPointerException();
int lag = 0;
boolean complete, unowned;
synchronized (this) {
Thread t = Thread.currentThread(), o;
BufferedSubscription<T> b = clients;
if ((unowned = ((o = owner) != t)) && o != null)
owner = null; // disable bias
if (b == null)
complete = closed;
else {
complete = false;
boolean cleanMe = false;
BufferedSubscription<T> retries = null, rtail = null, next;
do {
next = b.next;
int stat = b.offer(item, unowned);
if (stat == 0) { // saturated; add to retry list
b.nextRetry = null; // avoid garbage on exceptions
if (rtail == null)
retries = b;
else
rtail.nextRetry = b;
rtail = b;
}
else if (stat < 0) // closed
cleanMe = true; // remove later
else if (stat > lag)
lag = stat;
} while ((b = next) != null);
if (retries != null || cleanMe)
lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
}
}
if (complete)
throw new IllegalStateException("Closed");
else
return lag;
}
而上一篇还没介绍到的Processor,doc的说明表示Processor同时是Subscriber 也是Publisher,其实蛮好理解的,因为介於中间,相对於Publisher是Subscriber,相对於Subscriber则是Publisher,主要是用来做资料的转换。
/**
* A component that acts as both a Subscriber and Publisher.
*
* @param <T> the subscribed item type
* @param <R> the published item type
*/
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
衔接上一篇的实作,我们在中间加上ApplePodcastProcessor
,让每一集输出的时候都会加上apple的字样,在建构时传入转换的function,onNext
的时候做转换。
@EqualsAndHashCode(callSuper = true)
@Data
public class ApplePodcastProcessor<T, R> extends SubmissionPublisher<R> implements Processor<T, R> {
private Subscription subscription;
private Function<T, R> function;
public ApplePodcastProcessor(Function<T, R> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
submit(function.apply(item));
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError:" + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
}
最终测试成功让每一集都有加上apple字样。
@Test
void testJava9Reactive() throws InterruptedException {
Member<String> member = new Member<>();
SubmissionPublisher<String> podcastChannel = new SubmissionPublisher<>();
ApplePodcastProcessor<String, String> processor =
new ApplePodcastProcessor<>(item -> "apple : " + item);
podcastChannel.subscribe(processor);
processor.subscribe(member);
Assertions.assertEquals(1, podcastChannel.getSubscribers().size());
List<String> episodes = List.of("1", "2", "3", "4");
episodes.forEach(podcastChannel::submit);
System.out.println(member.getEpisodes());
Thread.sleep(1000);
podcastChannel.close();
System.out.println(member.getEpisodes());
/* output:
[]
onNext:apple : 1
onNext:apple : 2
onNext:apple : 3
onNext:apple : 4
[apple : 1, apple : 2, apple : 3, apple : 4]
onComplete
*/
}
虽然java9推出了符合Reactive Steam的Flow Api,但可以看出如果要在公司的专案中使用还是要自己打造不少轮子,所以接下来就来介绍其他可以帮助我们Reactive Programming in Java的好工具。
<<: [NestJS 带你飞!] DAY05 - Module
Windows Event Log & FullEventLogView LOG 是监识调查...
New Soundcloud function: artists from the streamin...
图片去背 ( 路径 ) 教学原文参考:图片去背 ( 路径 ) 这篇文章会介绍使用 GIMP 的路径工...
D28. 题目练习 这次一样是练3n+1的题目,之前是用C语言,这次用C++来写 #include&...
Masking 需要把填充的部分标记为0,其余部分标记为1,才不会导致填充的部分被误认为是输入 de...