Flutter体验 Day 23-WebSocket

WebSocket

前端对於WebSocket这项技术应该不陌生,以往会需要使用轮询的方式更新资料,目前大多改采用WebSocket连线来建立双向沟通的需求。

Flutter Client - IOWebSocketChannel

我们使用套件 web_socket_channel 这个 WebSocket Library 处理客端的服务请求,使用方式很简单

  • 建立连线
IOWebSocketChannel.connect(Uri.parse('WebSocket_Server_URL'));
  • 监听讯息
    channel.stream.listen((message) {
        print(message);
    });
  • 发送讯息
    channel.sink.add('received!');
  • 连线关闭
    channel.sink.close();

实现 Websocket 聊天室范例

让我们建立一个即时通讯的聊天室吧

Server 端

我们使用 nodejs 套件 ws 完成 server 端的设定,在建立连线时从 websocket 的 query string 取得 token 参数,可以透过这边方式进行身份的验证以确认是否可以建立连线。

import WebSocket, { WebSocketServer } from "ws";
import { UniqueID } from "nodejs-snowflake";
import { createServer } from "http";
import url from "url";

const uid = new UniqueID();

const server = createServer();

const wss = new WebSocketServer({
  noServer: true,
});

class User {
  constructor(name) {
    this.name = name;
  }
}

function authenticate(request, callback) {
  const { query } = url.parse(request.url, true);

  if (!query.token) {
    // 身分验证
    callback(new Error("token is no defined"));
    return;
  }

  callback(null, new User(query.token));
}

wss.on("connection", function connection(ws, request, user) {
  ws.on("message", function message(msg) {
    console.log(`Received message from user ${user.name}`);

    var data = msg.toString();

    var json

    try {
      json = JSON.parse(data);
    } catch (e) {
      console.error(e.toString());
      json = {};
    }

    // 根据通讯协议的格式设定处理聊天室功能
    if (json.eventName === "chat:send") {
      const obj = { eventName: "chat:msg", mid: uid.getUniqueID(), by: user.name, msg: json.data, time: Date.now() };

      wss.clients.forEach(function each(client) {
        if (client.readyState === WebSocket.OPEN) {
          client.send(JSON.stringify(obj));
        }
      });
    }
  });
});

server.on("upgrade", function upgrade(request, socket, head) {
  authenticate(request, (err, client) => {
    if (err || !client) {
      socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n");
      socket.destroy();
      return;
    }

    wss.handleUpgrade(request, socket, head, function done(ws) {
      wss.emit("connection", ws, request, client);
    });
  });
});

server.listen(8000);

Client 端

为了一些业务逻辑的处理,我们使用 Connection 包装 IOWebSocketChannel 类别,让它可以处理断线时的重连机制。WebSocket讯息是透过 Dart stream接口,这边可能需要先理解一下 stream 的概念 为何。

IOWebSocketChannelstream 跟以往学习RxJS概念很相近,因此我这边另外使用了 Rxdart 套件处理讯息的转发,这样在断线重线时不会影响到已绑定的事件行为。

class Connection {
  bool _connected = false;

  Uri uri;

  late IOWebSocketChannel _channel;

  late StreamSubscription _subscription;

  PublishSubject<dynamic> stream = PublishSubject();

  BehaviorSubject<bool> connected = BehaviorSubject();

  Connection({required this.uri}) {
    _connect();
  }

  _connect() {
    if (_connected) {
      _subscription.cancel();
    }
    _channel = IOWebSocketChannel.connect(uri);
    _connected = true;

    _subscription =
        _channel.stream.listen(_onMessage, onError: _onError, onDone: _onDone);

    connected.add(_connected);
  }

  void reconnect({Uri? uri}) {
    if (uri != null) {
      this.uri = uri;
    }
    _connect();
  }

  trigger(dynamic message) {
    _channel.sink.add(json.encode(message));
  }

  void _onMessage(dynamic message) {
    stream.add(jsonDecode(message));
  }

  void _onError(e) {
    stream.addError(e);
  }

  void _onDone() {
    _connected = false;
    connected.add(_connected);
  }
}

ChatViewModel

我们使用聊天室的 ViewModel 处理接收到的讯息,这边定义聊天室讯息的格式, 当接收到 eventNamechat:msg,会将该讯息转成 Message 资料类别,存放到 data 内,接着透过 RxdartPublishSubject 发送出去。

class ChatViewModel {
  Connection connection =
      Connection(uri: Uri.parse('ws://test.dev.rde:8000/?token=sm'));

  PublishSubject<List<Message>> stream = PublishSubject();

  final List<Message> data = [];

  ChatViewModel() {
    connection.stream
      .where((data) => data["eventName"] == "chat:msg")
      .map((data) => Message.fromJson(data))
      .listen(_addData);
  }

  void _addData(Message msg) async {
    // 存放讯息资料
    data.add(msg);
    // 发送讯息通知给 StreamBuilder
    stream.add(data);
  }

我们使用 StreamBuilder 接收发送过来的 List<Message> 资料,并使用 ListView.builder 动态创建讯息栏位。

chat

小结

今日学习 WebSocket 技术的应用方式,遇到比较不熟悉的应该是在 stream 的使用上,後续可能要花点时间练习语法要怎麽使用,至於想理解 RxJS 可以看看 黄升煌 大大 第12届的铁人赛


<<:  [Day16]ISO 27001 标准:持续改善

>>:  D21 - 用 Swift 和公开资讯,打造投资理财的 Apps { 台股成交量实作.1 }

Day 16 - 演算大法好逼

简介 这篇是演算大法的下半部。 有 sorting 、 search 各两种方式以及他们的差别。 C...

出生第30天 得到快乐的电动挤奶器与仙女们的聚会

今天下午朋友送馨乃乐的电动挤奶器给我~~因为一直听我说挤奶很惨XD问我需不需要一台电动的,然後我就说...

Day 29 架设 Apache

Day 29 架设 Apache 其实介绍到今天简单架设一下 Apache,让虚拟机当作是一台服务器...

e.stopPropagation()停止事件冒泡

当事件发生的时候,如果想要阻挡事件向上传递,只要利用「事件物件」(Event Object)所提供...

Day 28. 凭证绑定 Certificate Pinning 绑起来!

今天来介绍,数位凭证,讲一点点数位签章,以及最重要的凭证绑定原理及用途 避免有人透过手机使用 Bu...