昨天聊到了 TP 告知 main Thread 任务完成的方法。今天说说 TP 本身在运行甚麽。
原始码来自以下网址 :
https://github.com/nodejs/node/blob/c61870c376e2f5b0dbaa939972c46745e21cdbdd/deps/uv/src/threadpool.c
static void init_threads(void) {
unsigned int i;
const char* val;
uv_sem_t sem;
nthreads = ARRAY_SIZE(default_threads);
val = getenv("UV_THREADPOOL_SIZE");
if (val != NULL)
nthreads = atoi(val);
if (nthreads == 0)
nthreads = 1;
if (nthreads > MAX_THREADPOOL_SIZE)
nthreads = MAX_THREADPOOL_SIZE;
threads = default_threads;
if (nthreads > ARRAY_SIZE(default_threads)) {
threads = uv__malloc(nthreads * sizeof(threads[0]));
if (threads == NULL) {
nthreads = ARRAY_SIZE(default_threads);
threads = default_threads;
}
}
if (uv_cond_init(&cond))
abort();
if (uv_mutex_init(&mutex))
abort();
QUEUE_INIT(&wq);
QUEUE_INIT(&slow_io_pending_wq);
QUEUE_INIT(&run_slow_work_message);
if (uv_sem_init(&sem, 0))
abort();
for (i = 0; i < nthreads; i++)
if (uv_thread_create(threads + i, worker, &sem))
abort();
for (i = 0; i < nthreads; i++)
uv_sem_wait(&sem);
uv_sem_destroy(&sem);
}
不用细读, 原则上就是 TP 多大就创建多少 thread , 并且 thread 运行 worker 方法
查看 worker ( 这段不特别讨论, 其牵涉到不同 case IO 处理的机制。)
static void worker(void* arg) {
struct uv__work* w;
QUEUE* q;
int is_slow_work;
uv_sem_post((uv_sem_t*) arg);
arg = NULL;
// 任务队列 1 次只能有一个 thread 访问
uv_mutex_lock(&mutex);
for (;;) {
/* `mutex` should always be locked at this point. */
/* Keep waiting while either no work is present or only slow I/O
and we're at the threshold for that. */
while (QUEUE_EMPTY(&wq) ||
(QUEUE_HEAD(&wq) == &run_slow_work_message &&
QUEUE_NEXT(&run_slow_work_message) == &wq &&
slow_io_work_running >= slow_work_thread_threshold())) {
idle_threads += 1;
uv_cond_wait(&cond, &mutex);
idle_threads -= 1;
}
q = QUEUE_HEAD(&wq);
if (q == &exit_message) {
uv_cond_signal(&cond);
uv_mutex_unlock(&mutex);
break;
}
QUEUE_REMOVE(q);
QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */
is_slow_work = 0;
if (q == &run_slow_work_message) {
/* If we're at the slow I/O threshold, re-schedule until after all
other work in the queue is done. */
if (slow_io_work_running >= slow_work_thread_threshold()) {
QUEUE_INSERT_TAIL(&wq, q);
continue;
}
/* If we encountered a request to run slow I/O work but there is none
to run, that means it's cancelled => Start over. */
if (QUEUE_EMPTY(&slow_io_pending_wq))
continue;
is_slow_work = 1;
slow_io_work_running++;
q = QUEUE_HEAD(&slow_io_pending_wq);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
/* If there is more slow I/O work, schedule it to be run as well. */
if (!QUEUE_EMPTY(&slow_io_pending_wq)) {
QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);
if (idle_threads > 0)
uv_cond_signal(&cond);
}
}
uv_mutex_unlock(&mutex);
w = QUEUE_DATA(q, struct uv__work, wq);
w->work(w);
uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL; /* Signal uv_cancel() that the work req is done
executing. */
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
/* Lock `mutex` since that is expected at the start of the next
* iteration. */
uv_mutex_lock(&mutex);
if (is_slow_work) {
/* `slow_io_work_running` is protected by `mutex`. */
slow_io_work_running--;
}
}
}
其任务就是遍历 TP 中的 task , 一个一个地完成, 当全部都完成再让 thread 进入休眠。
此外, 该资料结构也有方法给 TP 加入任务。
Node 的 TP 会维护多个 queue , 用来存放不同 case 的任务 , 并且提供方法使 C++ 层可以把非同步 IO 任务加入这些 queue 中, 此外 TP 本身由多个 thread 组成, 会不断地从这些 queue 中取出任务, 并且执行该任务。当任务完成, 用 uv_async_send 设定 async_sent 为 1 , 表示任务完成, 并且加入到 wq 中。所以此处 wq 是一个完成 IO 的任务的放置区。
回到源头, 查看在 pending 阶段中被调用的 async_cb 是什麽 ?
明天见 !
<<: [第十三天]从0开始的UnityAR手机游戏开发-如何在辨识图卡时拨放影片02
>>: 【从实作学习ASP.NET Core】Day15 | 後台 | 自定义使用者栏位
昨天角色的 CRUD 功能都完成了,接着就是要把角色指派给使用者了,先建立一个 ViewModel ...
前言 我们已经将TableView的Header给设置完毕了,那可以看到IPhone内建的画面: 两...
本系列文章从 Web API, 架设云端 VM,资料库操作到前端 app,一步一步做出一个虽然很阳春...
昨天我们提过了 Regression 的流程就是有一个初始目标 -> 检查"糟糕程...
前面介绍了如何用JSX语法建立React元素,接着这篇会记录如何建立React元件,即前面提到的Re...