[Day 6] Reactive Programming - Java 9(SubmissionPublisher、Processor)

前言

上一篇的范例中有使用到SubmissionPublisher,才更加地认识到其实Publisher需要做蛮多事情的,这边先简单介绍一下SubmissionPublisher

SubmissionPublisher

根据Java doc内容,SubmissionPublisher使用在建构时传入的Executor来做到持续推送资料给Subscribers,当Publisher推送与Subscriber消费的速度不同时能提供缓冲区(Buffer),若缓冲区到达上限可以有不同的方法来处理,最简单就是卡住(block)直到资源可用。

在范例中有用到两个method,看一下原始码来大概了解一下用途,将subscriber包装成subscription ,对应到之前说明subscriptionpublishersubscriber沟通的桥梁,透过BufferedSubscriptionnext属性(field)来达到像是linked list的效果,可以串联所有的subscriber并检核是否有重复。

public void subscribe(Subscriber<? super T> subscriber) { 
        if (subscriber == null) throw new NullPointerException(); 
        int max = maxBufferCapacity; // allocate initial array 
        Object[] array = new Object[max < INITIAL_CAPACITY ? 
                                    max : INITIAL_CAPACITY]; 
        BufferedSubscription<T> subscription = 
            new BufferedSubscription<T>(subscriber, executor, onNextHandler, 
                                        array, max); 
        synchronized (this) { 
            if (!subscribed) { 
                subscribed = true; 
                owner = Thread.currentThread(); 
            } 
            for (BufferedSubscription<T> b = clients, pred = null;;) { 
                if (b == null) { 
                    Throwable ex; 
                    subscription.onSubscribe(); 
                    if ((ex = closedException) != null) 
                        subscription.onError(ex); 
                    else if (closed) 
                        subscription.onComplete(); 
                    else if (pred == null) 
                        clients = subscription; 
                    else 
                        pred.next = subscription; 
                    break; 
                } 
                BufferedSubscription<T> next = b.next; 
                if (b.isClosed()) {   // remove 
                    b.next = null;    // detach 
                    if (pred == null) 
                        clients = next; 
                    else 
                        pred.next = next; 
                } 
                else if (subscriber.equals(b.subscriber)) { 
                    b.onError(new IllegalStateException("Duplicate subscribe")); 
                    break; 
                } 
                else 
                    pred = b; 
                b = next; 
            } 
        } 
    }

而我们将资料放到publisher的submit实际上就是去呼叫doOffer()

  public int submit(T item) { 
        return doOffer(item, Long.MAX_VALUE, null); 
    }

doOffer会根据目前的大小有重试(retry)的机制,主要推送的逻辑是在(BufferedSubscription) b.offer(item, unowned); ,会透过publisher的Executor执行ConsumerTask,在Task中run  (BufferedSubscription)consumer.consume(),在里面就会根据条件去呼叫我们熟悉的subscriber的onSubscribeonNext,这边就先大致了解一下,先不深入探讨。

 private int doOffer(T item, long nanos, 
                        BiPredicate<Subscriber<? super T>, ? super T> onDrop) { 
        if (item == null) throw new NullPointerException(); 
        int lag = 0; 
        boolean complete, unowned; 
        synchronized (this) { 
            Thread t = Thread.currentThread(), o; 
            BufferedSubscription<T> b = clients; 
            if ((unowned = ((o = owner) != t)) && o != null) 
                owner = null;                     // disable bias 
            if (b == null) 
                complete = closed; 
            else { 
                complete = false; 
                boolean cleanMe = false; 
                BufferedSubscription<T> retries = null, rtail = null, next; 
                do { 
                    next = b.next; 
                    int stat = b.offer(item, unowned); 
                    if (stat == 0) {              // saturated; add to retry list 
                        b.nextRetry = null;       // avoid garbage on exceptions 
                        if (rtail == null) 
                            retries = b; 
                        else 
                            rtail.nextRetry = b; 
                        rtail = b; 
                    } 
                    else if (stat < 0)            // closed 
                        cleanMe = true;           // remove later 
                    else if (stat > lag) 
                        lag = stat; 
                } while ((b = next) != null); 
                if (retries != null || cleanMe) 
                    lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe); 
            } 
        } 
        if (complete) 
            throw new IllegalStateException("Closed"); 
        else 
            return lag; 
    }

Processor

而上一篇还没介绍到的Processor,doc的说明表示Processor同时是Subscriber 也是Publisher,其实蛮好理解的,因为介於中间,相对於Publisher是Subscriber,相对於Subscriber则是Publisher,主要是用来做资料的转换。

    /** 
     * A component that acts as both a Subscriber and Publisher. 
     * 
     * @param <T> the subscribed item type 
     * @param <R> the published item type 
     */ 
    public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { 
    }

https://ithelp.ithome.com.tw/upload/images/20210920/201414181ItXG03RVO.png

DEMO

衔接上一篇的实作,我们在中间加上ApplePodcastProcessor,让每一集输出的时候都会加上apple的字样,在建构时传入转换的function,onNext的时候做转换。

@EqualsAndHashCode(callSuper = true)
@Data
public class ApplePodcastProcessor<T, R> extends SubmissionPublisher<R> implements Processor<T, R> {
  private Subscription subscription;
  private Function<T, R> function;

  public ApplePodcastProcessor(Function<T, R> function) {
    super();
    this.function = function;
  }

  @Override
  public void onSubscribe(Subscription subscription) {
    this.subscription = subscription;
    subscription.request(1);
  }

  @Override
  public void onNext(T item) {
    submit(function.apply(item));
    subscription.request(1);
  }

  @Override
  public void onError(Throwable throwable) {
    System.out.println("onError:" + throwable);
  }

  @Override
  public void onComplete() {
    System.out.println("onComplete");
  }
}

最终测试成功让每一集都有加上apple字样。

  @Test 
  void testJava9Reactive() throws InterruptedException { 
    Member<String> member = new Member<>(); 
    SubmissionPublisher<String> podcastChannel = new SubmissionPublisher<>(); 
    ApplePodcastProcessor<String, String> processor = 
        new ApplePodcastProcessor<>(item -> "apple : " + item); 
    podcastChannel.subscribe(processor); 
    processor.subscribe(member); 
    Assertions.assertEquals(1, podcastChannel.getSubscribers().size()); 
    List<String> episodes = List.of("1", "2", "3", "4"); 
    episodes.forEach(podcastChannel::submit); 
    System.out.println(member.getEpisodes()); 
    Thread.sleep(1000); 
    podcastChannel.close(); 
    System.out.println(member.getEpisodes()); 
    /* output: 
         [] 
    onNext:apple : 1 
    onNext:apple : 2 
    onNext:apple : 3 
    onNext:apple : 4 
    [apple : 1, apple : 2, apple : 3, apple : 4] 
    onComplete 
             */ 
  }

结语

虽然java9推出了符合Reactive Steam的Flow Api,但可以看出如果要在公司的专案中使用还是要自己打造不少轮子,所以接下来就来介绍其他可以帮助我们Reactive Programming in Java的好工具。

资料来源


<<:  [NestJS 带你飞!] DAY05 - Module

>>:  认识网页元素 HTML、CSS

成为工具人应有的工具包-01 FullEventLogView

Windows Event Log & FullEventLogView LOG 是监识调查...

Soundcloud artists can distribute music to other services

New Soundcloud function: artists from the streamin...

Day 6 图片去背 ( 路径 )

图片去背 ( 路径 ) 教学原文参考:图片去背 ( 路径 ) 这篇文章会介绍使用 GIMP 的路径工...

D28. 学习基础C、C++语言

D28. 题目练习 这次一样是练3n+1的题目,之前是用C语言,这次用C++来写 #include&...

Day 24 利用transformer自己实作一个翻译程序(六) Masking

Masking 需要把填充的部分标记为0,其余部分标记为1,才不会导致填充的部分被误认为是输入 de...