[Day 5] Reactive Programming - Java 9(Publisher、Subscribers)

前言

java 9的时候新增支援Reactive Stream,所以在介绍Spring ReactorWebFlux之前先来认识一下Java 原生的Flow Api。

java 9

java 9更新推出对应Reactive Streams 规格的分别有三个interface,Publisher负责产生item来推送给一个或多个Subscribers去使用(consumed),每一个item都被Subscription所管理,中间有一或多个Processor来处理转换或是特别的逻辑。

https://ithelp.ithome.com.tw/upload/images/20210919/20141418I9FzwzGbi9.png

Subscriber订阅PublisherPublisher负责产生推送资料,透过Subscription 居中管理, Subscriber会提出需要的资料量,这样就有背压(backpressure)机制(之後介绍),来避免Subscriber来不及消化大量资料导致系统异常。下方是java.util.concurrent.Flow 移除掉文件说明的部分,从程序码可以看出

  1. Publisher提供订阅
  2. Subscriber 有四个方法 分别为
    1. onSubscribe(Subscription subscription):  订阅时同步Subscription
    2. onNext(T item): 处理item
    3. onError(Throwable throwable): 错误处理,这部分就是单纯Java 8 Stream所没有的
    4. onComplete(): Publisher结束时
/* 
public final class Flow { 
    private Flow() {} // uninstantiable 
    
    @FunctionalInterface 
    public static interface Publisher<T> { 
       
        public void subscribe(Subscriber<? super T> subscriber); 
    } 
    public static interface Subscriber<T> { 
      
        public void onSubscribe(Subscription subscription); 
       
        public void onNext(T item); 
      
        public void onError(Throwable throwable); 
       
        public void onComplete(); 
    } 
     
    public static interface Subscription { 
    
        public void request(long n); 
   
        public void cancel(); 
    } 
       
}

DEMO

实际动手做一个简单听Podcast要取得节目内容的DEMO
建立Member会员实作Subscriber,假设会员要订阅Podcast频道,订阅时同步subscription并要求一个item,每接受到一个item就存入episodes 并要求下一个,最後当Publisher结束时则会印出onComplete。

@Data 
public class Member <T>  implements Subscriber<T> { 
  private Subscription subscription; 
  private List<T> episodes = new LinkedList<>();
  @Override 
  public void onSubscribe(Subscription subscription) { 
    this.subscription = subscription; 
    subscription.request(1); 
  } 
  @Override 
  public void onNext(T item) { 
    System.out.println("onNext:" + item); 
    episodes.add(item);
    subscription.request(1); 
  } 
  @Override 
  public void onError(Throwable throwable) { 
    System.out.println("onError:" + throwable); 
  } 
  @Override 
  public void onComplete() { 
    System.out.println("onComplete"); 
  } 
}

因为Publisher比较复杂我们直接拿原生的SubmissionPublisher来使用,实际DEMO情况如下
新增一个会员跟一个Podcast频道并订阅,将内容推送到频道当中,在第一时间印出是可以发现尚未收到任何集数,因为整个流程是非同步的在背後进行,所以当我们暂停一秒钟後就可以正常到看到结果。

@Test
void testJava9Reactive() throws InterruptedException {
  Member<String> member = new Member<>();
  SubmissionPublisher<String> podcastChannel = new SubmissionPublisher<>();

  podcastChannel.subscribe(member);
  Assertions.assertEquals(1, podcastChannel.getSubscribers().size());

  List<String> episodes = List.of("1", "2", "3", "4");

  episodes.forEach(podcastChannel::submit);
  System.out.println(member.getEpisodes());
  Thread.sleep(1000);
  podcastChannel.close();
  System.out.println(member.getEpisodes());

  /* output:
      []
  onNext:1
  onNext:2
  onNext:3
  onNext:4
  [1, 2, 3, 4]
  onComplete
       */
}

结语

今天终於开始进入到Reactive Programming并带有一点实作,下一篇会继续介绍今天用到的SubmissionPublisher与一开始提到的Processor。

资料来源


<<:  Day 4:透过 npm、Hexo 指令在本机端安装你的 Hexo 部落格

>>:  03 - Alacritty - 终端机

Day13 Docker compose

当每个Class(Image)都写的简洁有力又好用、且权责都分得乾乾净净的时候就可以加入一个Serv...

day25 矮额是callback,把它变成flow好了 简单的callbackFlow

终於写道flow的最後一篇了 尽管我们在自己的开发上,能够高兴地写coroutine和suspend...

【Side Project】 赛後检讨

经历了一个月的洗礼,又再一次完成了铁人赛。 当然不免俗的,最後来一篇赛後检讨。 这篇分成三个大部分来...

[Day 26] 专案执行(下)

接续上一章 规划阶段 专案工作分解 将工作分成小项目,以便分工、时间规划 目标分解结构 OBS (o...

iOS工程师面试深入浅出- 物件导向的三大特性?

iOS工程师面试深入浅出- 物件导向的三大特性? 这题乍看之下是很本科系的问题,但事实上,当你在开发...