[Day 2] 一个非同步案例 httpServer

前言

或许有些人会有所困惑, 同步非同步的实践难在哪里, 为甚麽要一直巴拉巴拉, 但事实上, 非同步就是创建出多条 thread 无序的占用资源进行运算, 而当资源被多个消费者无序调用时, Race condition 等问题油然而生, 所以几乎要考虑到绝大多数的 concurrency 议题。

今天我会用一个举例来演示, 当不考虑 Race condition 就要同步非同步会产生的问题。以及猜测一个基本的非同步框架的实践需要具有甚麽功能。

举例

我利用 C++ 撰写了一个 http server , 其功能单一, 仅可以提供本地静态资源。但当我想要令其以非同步的方式运行时却遇到了问题, 感觉上每次过来的 request 都是独立的, 为甚麽当我利用 multi-thread 运行多个 server 时却出现了问题呢 ?

(不需要特别阅读原始码, 看下面中文说明即可)

// 利用 thread 直接创建多个 http server
void startServer(int num) {
	httpServer server;
	server.start(num);
}

int main(int argc, char** argv)
{
	srand(time(NULL));
	thread threadList[THREADPOOL_SIZE];
	for (int i = 0; i < THREADPOOL_SIZE; i++)
		threadList[i] = thread(startServer, i);

	for (int i = 0; i < THREADPOOL_SIZE; i++)
		threadList[i].join();
}

报错截图
https://ithelp.ithome.com.tw/upload/images/20210902/20131164hEHxIBvfTg.png

原来一个port 只能 bind 一个 socket , 该 socket 可以用来监听 http request。

至於同一个 port 不能拥有 2 个 socket 的原因, 我猜测出在 listen(listenSocket, SOMAXCONN) 这句, 因为 Win Socket 怕在监听网际网路时, 有 2 个以上的消费者, 导致 Race condition 。 所以乾脆在创建 socket 时就禁止建立两个, 强迫使用者用同一个介面来读取网路资讯。

白话版说明 :

流程

  • 要求 OS 监听网路孔
  • OS 把监听到的资讯放在记忆体
  • 读取记忆体获取来自网路的资讯

其中在第二条 OS 把监听到的资讯放在记忆体 若是有两个以上的人同时把监听到的资料放入记忆体, 因为他们彼此不能沟通导致监听了相同内容, 放入了一样的资料, 就会导致错误的结果(资料变2倍), 此即为 Race condition 为了避免这个状况, win socket 禁止建立两个监听介面, 强迫使用者用同一个介面来读取网路资讯。

最後解决方案就是在处理资讯与回传时才 multi-thread , 建立单一的生产者来监听 request 以及发放任务。概念如图

https://ithelp.ithome.com.tw/upload/images/20210902/20131164l3XQIH4WpD.png
白话版说明:

  • 要求 OS 监听网路孔
  • OS 把监听到的资讯放在记忆体(只有一个人做)
  • 多个人去读取记忆体, 获取各自被分配到的部分资料, 彼此不会重复(冲突)

这样就在不会导致资料错误的情况下, 让多个 thread 以 非同步的方式处理 http request 了。

(原始码可略过, 上方中文已阐述了我想表达的概念。)

class httpServer {
public:
	struct sockaddr_in localSocketSetting;
	SOCKET listenSocket;
	WSADATA windowsSocketData;
	vector<thread> threadPool;
	vector<vector<SOCKET>> tasksList;
	void start(int poolNumber) {
		printf("Start.......\n");
	rebuild:
		// 监听网际网路, 读取 request
		if (listen(listenSocket, SOMAXCONN) == SOCKET_ERROR)
			errorHandle("listen");
		forever
		{
			// 取得该自己处理的 request
			SOCKET messageSocket = getSocket(poolNumber);
			if (messageSocket == INVALID_SOCKET || messageSocket == FAIL_CODE)
				goto rebuild;
			// 处理 request
			request req = request(messageSocket);
			cout << endl << "thread " << poolNumber << " : " << req.filePath;
			if (req.messageLength == 0)
				continue;
			// 寄回 response
			int sentResult = responseClient(req, messageSocket);
			if (sentResult == 0)
				break;
			else if (sentResult == FAIL_CODE)
				goto rebuild;
		}
	}
	void accepter() {
		forever{
			struct sockaddr_in clientSocketSetting;
			int clientSocketSettingLength;
			clientSocketSettingLength = sizeof(clientSocketSetting);
			// 获取所有 request
			SOCKET socket = accept(listenSocket, (struct sockaddr*)&clientSocketSetting, &clientSocketSettingLength);
			// 随机分配 request 给各个消费者
			int poolNumber = rand() % THREADPOOL_SIZE;
			tasksList[poolNumber].push_back(socket);
		}
	}
	void go() {
		// 创建一个供给者, 利用单一介面监听 request
		thread worker = thread(&httpServer::accepter, this);
		// 多个消费者, 不断的处理 request , 回复 response
		for (int i = 0; i < THREADPOOL_SIZE; i++)
			threadPool.push_back(thread(&httpServer::start, this, i));
		for (auto& i : threadPool)
			i.join();
		worker.join();
	}
#pragma warning(disable: 26495)
	httpServer() {
		// 初始设定
		for (int i = 0; i < THREADPOOL_SIZE; i++) {
			vector<SOCKET> tasks;
			tasksList.push_back(tasks);
		}
		if (WSAStartup(MAKEWORD(2, 2), &windowsSocketData) == SOCKET_ERROR)
			errorHandle("WSAStartup");
		// Fill in the address structure
		localSocketSetting.sin_family = AF_INET;
		localSocketSetting.sin_addr.s_addr = INADDR_ANY;
		localSocketSetting.sin_port = htons(DEFAULT_PORT);
		listenSocket = socket(AF_INET, SOCK_STREAM, 0);
		if (listenSocket == INVALID_SOCKET)
			errorHandle("socket");
		// 创建 网路孔介面
		if (bind(listenSocket, (struct sockaddr*)&localSocketSetting, sizeof(localSocketSetting)) == SOCKET_ERROR)
			errorHandle("bind");
	}
	~httpServer() {
		WSACleanup();
	}
private:
	void errorHandle(string str) {
		cout << "error : " << str << endl;
		exit(FAIL_CODE);
	}
	int responseClient(request req, SOCKET& messageSocket) {
		// 略, 回传
	}
	SOCKET getSocket(int poolNumber) {
		forever
		{
			if (tasksList[poolNumber].size() == 0)
				continue;
			SOCKET node = tasksList[poolNumber].back();
			tasksList[poolNumber].pop_back();
			return node;
		}
	}
};

int main(int argc, char** argv)
{
	srand(time(NULL));
	httpServer server;
	server.go();
}

观念整理

综合上述范例, 我们可以发现这个非同步 http server 为了完成非同步, 增加了以下几个功能, 他们就是实践基本的非同步框架所需的功能。

  1. 一种执行绪使用模式

    我们需要设计某种资料结构, 可以同时被多个执行绪 access 也不会发生错误, 并且以此做为介面连结只能同时被一条执行绪调用的资源以及可以被非同步运行的部分。

    以上方 httpServer 为例:

    "只能同时被一条执行绪调用的资源"是读取 http request 的 socket , 可以被非同步运行的部分是"处理每一段 http request , 以及 response 给寄出者" 所以我整合了 1-to-Many modal 以及 thread pool 制作了一个资料结构, 使用上只要 socket 不断地放入 http request , 就可以不断的把任务分配进 thread pool 後利用非同步方法处理。

  2. 一种执行绪 schedule 方法

    当多个任务被分配到 thread pool 时, 若假设只有一个 processor , 实际上也只能一次做一件事, 所以需要有一个 scheduler 告知 thread pool 什麽时间要切换到哪一个 thread , 甚麽时间要切换到下一个 thread ,才能完成真正的非同步运行, 在该切换执行绪时切换, 使 CPU 使用率最大化。

    以上方 httpServer 为例:

    我仅是调用了 C++ 的 thread library 创建大量 thread , 却没有加入合理的调度机制, 使他们只是无脑的平均分配 processor 使用时间, 这样明显不能提高速度, 还会因为大量的 switch context 拖慢速度。若要给这支程序加上 schedule , 我应该想出机制能把闲置的 thread 踢出 thread pool , 把有任务没做的 thread 加回 thread pool , 把...... 能做的事很多, 留到未来讨论吧。

明日进度

明天开始会回过头来跟大家聊聊一些理解底层框架需要具备的基本知识, 以此衔接更之後较为复杂的部份。

明天见 !


<<:  Day02 测试写起乃 - Rails 测试推荐资源

>>:  测试表格,请忽略

Day16_HTML语法13

有时候我们希望使用者输入的资料符合我们想要的格式,因此我们可以做一些设定,当使用者输入的资料不符合格...

【网路概论】L6-2~4 路由相关

路由 默认网关:将本地讯息经由路由连接出去 显示路由表指令:route print,netstat ...

iT铁人赛完赛感想 - 30天的结束不是完结

今天原本要发表的内容是「用Keycloak学习JWT权杖格式」,然後应该还会有1-2篇与JWT相关...

【第四天 - Flutter BottomNavigationBar(上)Animation】

前言 一般来说,写 BottomNavigationBar 会使用这个方法,官方文件,这个是官方的范...

Day24 Uptime And Heartbeat

今日我们要来学习的重点是Elastic Uptime,Uptime主要是针对你的应用和服务进行监控,...