[Day 27] Node thread pool 2

前言

昨天聊到了 TP 告知 main Thread 任务完成的方法。今天说说 TP 本身在运行甚麽。

原始码来自以下网址 :

https://github.com/nodejs/node/blob/c61870c376e2f5b0dbaa939972c46745e21cdbdd/deps/uv/src/threadpool.c

TP 的创建

static void init_threads(void) {
  unsigned int i;
  const char* val;
  uv_sem_t sem;

  nthreads = ARRAY_SIZE(default_threads);
  val = getenv("UV_THREADPOOL_SIZE");
  if (val != NULL)
    nthreads = atoi(val);
  if (nthreads == 0)
    nthreads = 1;
  if (nthreads > MAX_THREADPOOL_SIZE)
    nthreads = MAX_THREADPOOL_SIZE;

  threads = default_threads;
  if (nthreads > ARRAY_SIZE(default_threads)) {
    threads = uv__malloc(nthreads * sizeof(threads[0]));
    if (threads == NULL) {
      nthreads = ARRAY_SIZE(default_threads);
      threads = default_threads;
    }
  }

  if (uv_cond_init(&cond))
    abort();

  if (uv_mutex_init(&mutex))
    abort();

  QUEUE_INIT(&wq);
  QUEUE_INIT(&slow_io_pending_wq);
  QUEUE_INIT(&run_slow_work_message);

  if (uv_sem_init(&sem, 0))
    abort();

  for (i = 0; i < nthreads; i++)
    if (uv_thread_create(threads + i, worker, &sem))
      abort();

  for (i = 0; i < nthreads; i++)
    uv_sem_wait(&sem);

  uv_sem_destroy(&sem);
}

不用细读, 原则上就是 TP 多大就创建多少 thread , 并且 thread 运行 worker 方法

TP 运行的方法

查看 worker ( 这段不特别讨论, 其牵涉到不同 case IO 处理的机制。)

static void worker(void* arg) {
  struct uv__work* w;
  QUEUE* q;
  int is_slow_work;

  uv_sem_post((uv_sem_t*) arg);
  arg = NULL;
	// 任务队列 1 次只能有一个 thread 访问
  uv_mutex_lock(&mutex);
  for (;;) {
    /* `mutex` should always be locked at this point. */

    /* Keep waiting while either no work is present or only slow I/O
       and we're at the threshold for that. */
    while (QUEUE_EMPTY(&wq) ||
           (QUEUE_HEAD(&wq) == &run_slow_work_message &&
            QUEUE_NEXT(&run_slow_work_message) == &wq &&
            slow_io_work_running >= slow_work_thread_threshold())) {
      idle_threads += 1;
      uv_cond_wait(&cond, &mutex);
      idle_threads -= 1;
    }

    q = QUEUE_HEAD(&wq);
    if (q == &exit_message) {
      uv_cond_signal(&cond);
      uv_mutex_unlock(&mutex);
      break;
    }

    QUEUE_REMOVE(q);
    QUEUE_INIT(q);  /* Signal uv_cancel() that the work req is executing. */

    is_slow_work = 0;
    if (q == &run_slow_work_message) {
      /* If we're at the slow I/O threshold, re-schedule until after all
         other work in the queue is done. */
      if (slow_io_work_running >= slow_work_thread_threshold()) {
        QUEUE_INSERT_TAIL(&wq, q);
        continue;
      }

      /* If we encountered a request to run slow I/O work but there is none
         to run, that means it's cancelled => Start over. */
      if (QUEUE_EMPTY(&slow_io_pending_wq))
        continue;

      is_slow_work = 1;
      slow_io_work_running++;

      q = QUEUE_HEAD(&slow_io_pending_wq);
      QUEUE_REMOVE(q);
      QUEUE_INIT(q);

      /* If there is more slow I/O work, schedule it to be run as well. */
      if (!QUEUE_EMPTY(&slow_io_pending_wq)) {
        QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);
        if (idle_threads > 0)
          uv_cond_signal(&cond);
      }
    }

    uv_mutex_unlock(&mutex);

    w = QUEUE_DATA(q, struct uv__work, wq);
    w->work(w);

    uv_mutex_lock(&w->loop->wq_mutex);
    w->work = NULL;  /* Signal uv_cancel() that the work req is done
                        executing. */
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
    uv_async_send(&w->loop->wq_async);
    uv_mutex_unlock(&w->loop->wq_mutex);

    /* Lock `mutex` since that is expected at the start of the next
     * iteration. */
    uv_mutex_lock(&mutex);
    if (is_slow_work) {
      /* `slow_io_work_running` is protected by `mutex`. */
      slow_io_work_running--;
    }
  }
}

其任务就是遍历 TP 中的 task , 一个一个地完成, 当全部都完成再让 thread 进入休眠。

此外, 该资料结构也有方法给 TP 加入任务。

小结

Node 的 TP 会维护多个 queue , 用来存放不同 case 的任务 , 并且提供方法使 C++ 层可以把非同步 IO 任务加入这些 queue 中, 此外 TP 本身由多个 thread 组成, 会不断地从这些 queue 中取出任务, 并且执行该任务。当任务完成, 用 uv_async_send 设定 async_sent 为 1 , 表示任务完成, 并且加入到 wq 中。所以此处 wq 是一个完成 IO 的任务的放置区。

明天进度

回到源头, 查看在 pending 阶段中被调用的 async_cb 是什麽 ?

明天见 !


<<:  [第十三天]从0开始的UnityAR手机游戏开发-如何在辨识图卡时拨放影片02

>>:  【从实作学习ASP.NET Core】Day15 | 後台 | 自定义使用者栏位

Day26 指派角色给使用者

昨天角色的 CRUD 功能都完成了,接着就是要把角色指派给使用者了,先建立一个 ViewModel ...

Swift纯Code之旅 Day24. 「各个TableViewHeader下的Cell显示(1)」

前言 我们已经将TableView的Header给设置完毕了,那可以看到IPhone内建的画面: 两...

[Day33] 一条龙总结

本系列文章从 Web API, 架设云端 VM,资料库操作到前端 app,一步一步做出一个虽然很阳春...

Day-05 Easy Regression Example

昨天我们提过了 Regression 的流程就是有一个初始目标 -> 检查"糟糕程...

[Day28] React - 建立React 元件 (Component )

前面介绍了如何用JSX语法建立React元素,接着这篇会记录如何建立React元件,即前面提到的Re...