Day 11 | Dart 非同步 - Stream

Stream

简单来说就是一群iterable的非同步事件。

像是每秒输出一个数字,但是你可能会想说就算我不用 Stream 我也有办法办到:

Timer.periodic(Duration(seconds: 1), (timer) {
    print(timer.tick);
  });

没错其实 Timer 就有办法达成这个需求,但如果今天突然想要数到10就好,或者某几个数要跳过,甚至想要旁边按个按钮可以暂停想要继续数时再继续数,那我们使用Stream 就会比较简单且优雅的达成这些需求。

final myStream = NumberCreator().stream; //这是一个会每秒输出递增数字的Stream
final subscription = myStream.listen((event) {
    print('event $event');
  }, onError: (err) {
    print(err);
  }, onDone: () {
    print('subscription done!!');
  },cancelOnError: false);

NumberCreator 是我自己建立的一个每一秒会输出一个数字的Stream 类似於上面的 Timer.periodic 的效果,如果我们想要取得Stream 的值那我们必须要对这个「监听」这个Stream ,在Dart中就是利用 Stream.listen 来达成。

而输出会是:

event 1
event 2
ERROR!!!
//....
event 19
subscription done!!

Stream.listen 会回传一个StreamSubscription 让我们可以管理所订阅的StreamStream.listen 有提供三个可以管理Stream 状态的参数分别是 onData (positional)、 onErroronDone (named)

其实从名字就可以看出功用,onData 就是当Stream 有了结果回传後就执行的callback、onErroronDone就是当有error时及stream完成时。

而且还有提供一个参数cancelOnError让我们决定发生错误时要不要继续监听 。

建立 Stream

那我们该如何建立自己的Stream 呢?

Stream来说有几个比较重要的介面(或者该说是abstract class)要了解:

  1. StreamController
  2. StreamSink
  3. Stream
  4. StreamSubscription

扣掉已经讲过的StreamStreamSubscription

  1. StreamController

    通常是拿来控制及检查这个Stream 的各种状态,像是完成、错误、暂停、有无订阅者等等。

  2. StreamSink

    就是Stream 来接收「事件」的地方,所谓事件就像前几篇文章提到的,可能是同步的可能是非同步的,可能是随着时间来的可能是使用者操作而产生的。

所以当我们想要自己建立一个 Stream 来进行操作的话:

class NumberCreator {
  NumberCreator() {
    Timer.periodic(Duration(seconds: 1), (timer) {
      if (timer.tick == 3) {
        _controller.addError('ERROR!!!');
      } else if (timer.tick == 20) {
        timer.cancel();
        _controller.close();
      } else {
        _controller.sink.add(timer.tick);
      }
    });
  }

  final _controller = StreamController<int>();

  Stream<int> get stream => _controller.stream;
}

我们先建立一个StreamController ,然後利用StreamSink 接收「事件」,这边为了测试各种状态所以自己实作了错误及完成状态,最後宣告一个 get 让外部获取这个 StreamControllerStream

但如果只是单纯要一个随着时间输出的 Stream ,也是有较为简单的方式:

Stream<int> counterStream() {
  return Stream<int>.periodic(const Duration(seconds: 1), (x) => x);
}

Stream 的其他操作

既然 Stream 是 iterable 的,那也就代表我们是可以对各别「事件」进行一些操作的:

final myStream2 =
      counterStream().map((event) => event * 2).take(3).listen(print);
  final myStream3 = counterStream()
      .map((event) => event +3)
      .takeWhile((element) => element <= 15)
      .listen(print);

(是不是有点像RxDart/RxJS的感觉)

像是我们可以利用 mapStream 中每个值都乘2,或者要拿几个值就好之类的操作。

当然还有很多方法可以使用这里就不一一说明了

StreamSubscription 的操作

前面说到StreamSubscription可以拿来控制Stream 相关的状态,当然也包含要不要继续监听或者暂停等等之类的操作:

void pauseStream5sAndResume(StreamSubscription subscription) {
  Future.delayed(Duration(seconds: 5), () {
    subscription.pause();
  }).then((_) {
    subscription.resume();
  });
}

像是可以利用 pause 或者 resume 进行更多细节操作。

Stream 通常用在哪?

坦白说可能是因为做过专案规模的问题,我很少操作Stream 相关的语法,但大多数时候就是需要管理到复杂的非同步操作时,又或者是用於状态管理像是BloC。


今天的程序码:

https://github.com/zxc469469/dart-playground/tree/Day11/stream

讲完Stream 就算把Dart中的非同步的大多数概念都提到了,虽然还有 generator 等语法但因为没有实际使用过的经验就不写相关的内容。

至此Dart语法相关的内容也讲得差不多了,剩下最後最後的null safety及我自己额外想提到的Dart中的常用的一些FP概念就要开始进入到Flutter章节。


参考资料

  1. https://dart.cn/tutorials/language/streams
  2. https://wizardforcel.gitbooks.io/gsyflutterbook/content/Flutter-11.html
  3. https://www.youtube.com/watch?v=nQBpOIHE4eE&t=258s

<<:  Day 11 AWS云端实作起手式第一弹 开始拼拼图吧

>>:  DAY12-JavaScript(二)

【从实作学习ASP.NET Core】Day11 | 後台 | 详细资料与 ViewModel

今天要来实作 Details 详细资料页面 ViewModel ViewModel 是拿来对 MVC...

Day 14:动态 Route 对号入座

上篇完成了巢状路由的设置之後,紧接着新需求又出现了!接续会员後台的收藏纪录页面,我们要进一步让收藏的...

【从实作学习ASP.NET Core】Day28 | 前台 | 管理我的订单

已经做到烂掉的 CRUD 又来了,最後还是要把订单管理的页面建出来 我的订单 沿用前面建立的 Ord...

菜鸟日记Day 30-用JSON-Server自建云端资料库

铁人赛终於来到最後一天了! 为响应JavaScript菜鸟研究室的主题,最近一个月我们尝试串接过各种...

GKE (一)

GKE简介 今天要说的是GKE GKE是什麽?由於近年微服务崛起使用k8s的需求大增,衍生各大小公司...