前端对於WebSocket
这项技术应该不陌生,以往会需要使用轮询的方式更新资料,目前大多改采用WebSocket
连线来建立双向沟通的需求。
我们使用套件 web_socket_channel 这个 WebSocket
Library 处理客端的服务请求,使用方式很简单
IOWebSocketChannel.connect(Uri.parse('WebSocket_Server_URL'));
channel.stream.listen((message) {
print(message);
});
channel.sink.add('received!');
channel.sink.close();
让我们建立一个即时通讯的聊天室吧
我们使用 nodejs 套件 ws
完成 server 端的设定,在建立连线时从 websocket
的 query string 取得 token
参数,可以透过这边方式进行身份的验证以确认是否可以建立连线。
import WebSocket, { WebSocketServer } from "ws";
import { UniqueID } from "nodejs-snowflake";
import { createServer } from "http";
import url from "url";
const uid = new UniqueID();
const server = createServer();
const wss = new WebSocketServer({
noServer: true,
});
class User {
constructor(name) {
this.name = name;
}
}
function authenticate(request, callback) {
const { query } = url.parse(request.url, true);
if (!query.token) {
// 身分验证
callback(new Error("token is no defined"));
return;
}
callback(null, new User(query.token));
}
wss.on("connection", function connection(ws, request, user) {
ws.on("message", function message(msg) {
console.log(`Received message from user ${user.name}`);
var data = msg.toString();
var json
try {
json = JSON.parse(data);
} catch (e) {
console.error(e.toString());
json = {};
}
// 根据通讯协议的格式设定处理聊天室功能
if (json.eventName === "chat:send") {
const obj = { eventName: "chat:msg", mid: uid.getUniqueID(), by: user.name, msg: json.data, time: Date.now() };
wss.clients.forEach(function each(client) {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(obj));
}
});
}
});
});
server.on("upgrade", function upgrade(request, socket, head) {
authenticate(request, (err, client) => {
if (err || !client) {
socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n");
socket.destroy();
return;
}
wss.handleUpgrade(request, socket, head, function done(ws) {
wss.emit("connection", ws, request, client);
});
});
});
server.listen(8000);
为了一些业务逻辑的处理,我们使用 Connection
包装 IOWebSocketChannel
类别,让它可以处理断线时的重连机制。WebSocket
讯息是透过 Dart stream
接口,这边可能需要先理解一下 stream
的概念 为何。
IOWebSocketChannelstream
跟以往学习RxJS
概念很相近,因此我这边另外使用了 Rxdart
套件处理讯息的转发,这样在断线重线时不会影响到已绑定的事件行为。
class Connection {
bool _connected = false;
Uri uri;
late IOWebSocketChannel _channel;
late StreamSubscription _subscription;
PublishSubject<dynamic> stream = PublishSubject();
BehaviorSubject<bool> connected = BehaviorSubject();
Connection({required this.uri}) {
_connect();
}
_connect() {
if (_connected) {
_subscription.cancel();
}
_channel = IOWebSocketChannel.connect(uri);
_connected = true;
_subscription =
_channel.stream.listen(_onMessage, onError: _onError, onDone: _onDone);
connected.add(_connected);
}
void reconnect({Uri? uri}) {
if (uri != null) {
this.uri = uri;
}
_connect();
}
trigger(dynamic message) {
_channel.sink.add(json.encode(message));
}
void _onMessage(dynamic message) {
stream.add(jsonDecode(message));
}
void _onError(e) {
stream.addError(e);
}
void _onDone() {
_connected = false;
connected.add(_connected);
}
}
我们使用聊天室的 ViewModel 处理接收到的讯息,这边定义聊天室讯息的格式, 当接收到 eventName
为 chat:msg
,会将该讯息转成 Message 资料类别,存放到 data
内,接着透过 Rxdart
的 PublishSubject
发送出去。
class ChatViewModel {
Connection connection =
Connection(uri: Uri.parse('ws://test.dev.rde:8000/?token=sm'));
PublishSubject<List<Message>> stream = PublishSubject();
final List<Message> data = [];
ChatViewModel() {
connection.stream
.where((data) => data["eventName"] == "chat:msg")
.map((data) => Message.fromJson(data))
.listen(_addData);
}
void _addData(Message msg) async {
// 存放讯息资料
data.add(msg);
// 发送讯息通知给 StreamBuilder
stream.add(data);
}
我们使用 StreamBuilder
接收发送过来的 List<Message>
资料,并使用 ListView.builder
动态创建讯息栏位。
今日学习 WebSocket
技术的应用方式,遇到比较不熟悉的应该是在 stream
的使用上,後续可能要花点时间练习语法要怎麽使用,至於想理解 RxJS
可以看看 黄升煌
大大 第12届的铁人赛
>>: D21 - 用 Swift 和公开资讯,打造投资理财的 Apps { 台股成交量实作.1 }
简介 这篇是演算大法的下半部。 有 sorting 、 search 各两种方式以及他们的差别。 C...
今天下午朋友送馨乃乐的电动挤奶器给我~~因为一直听我说挤奶很惨XD问我需不需要一台电动的,然後我就说...
Day 29 架设 Apache 其实介绍到今天简单架设一下 Apache,让虚拟机当作是一台服务器...
当事件发生的时候,如果想要阻挡事件向上传递,只要利用「事件物件」(Event Object)所提供...
今天来介绍,数位凭证,讲一点点数位签章,以及最重要的凭证绑定原理及用途 避免有人透过手机使用 Bu...