[Day 11] .Net Task 底层(4)

前言

今天要聊到的是 Task 把超过自己承载能力的任务放入 TP 交给别条 thread 执行的过程发生了甚麽 ?

参考函数 : ThreadPool.UnsafeQueueCustomWorkItem

正文

翻看原始码 ThreadPool.UnsafeQueueCustomWorkItem

[SecurityCritical]
internal static void UnsafeQueueCustomWorkItem(IThreadPoolWorkItem workItem, bool forceGlobal)
{
    Contract.Assert(null != workItem);
    EnsureVMInitialized();

    //
    // Enqueue needs to be protected from ThreadAbort
    //
    try { }
    finally
    {
        ThreadPoolGlobals.workQueue.Enqueue(workItem, forceGlobal);
    }
}

先来看参数

ThreadPool.UnsafeQueueCustomWorkItem(new CompletionActionInvoker(singleTaskCompletionAction, this), forceGlobal: false);

CompletionActionInvoker 也就是原始码的 IThreadPoolWorkItem workItem在 Task 的第一集就有提到, 创建时会把待完成任务存入, 并且留下介面供别人触发此任务。

再来看看 ThreadPoolGlobals

internal static class ThreadPoolGlobals
{
    //Per-appDomain quantum (in ms) for which the thread keeps processing
    //requests in the current domain.
    public static uint tpQuantum = 30U;

    public static int processorCount = Environment.ProcessorCount;

    public static bool tpHosted = ThreadPool.IsThreadPoolHosted();

    public static volatile bool vmTpInitialized;
    public static bool enableWorkerTracking;

    [SecurityCritical]
    public static ThreadPoolWorkQueue workQueue = new ThreadPoolWorkQueue();

    [System.Security.SecuritySafeCritical] // static constructors should be safe to call
    static ThreadPoolGlobals()
    {
    }
}

发现这是一个 static 的变数, 其成员变数有一个 workQueue

而之前所说的, 把待完成任务放入 TP 由别条 thread 执行, 原来指的就是把任务 push 进 workQueue 这个资料结构。

此刻, 相信碰过 linux 的读者会很快了联想起 workQueue 这个资料结构所代表的意义, 所以我心急如焚的察看Enqueue method。

[SecurityCritical]
public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal)
{
		// forceGlobal=false ,获取当前 thread 的任务列表
    ThreadPoolWorkQueueThreadLocals tl = null;
    if (!forceGlobal)
        tl = ThreadPoolWorkQueueThreadLocals.threadLocals;

    if (loggingEnabled)
        System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback);
		// 任务列表不为空, 加入新任务
    if (null != tl)
    {
        tl.workStealingQueue.LocalPush(callback);
    }
    else
    {
				// 任务列表为空, 加入新任务
        QueueSegment head = queueHead;

        while (!head.TryEnqueue(callback))
        {
            Interlocked.CompareExchange(ref head.Next, new QueueSegment(), null);

            while (head.Next != null)
            {
                Interlocked.CompareExchange(ref queueHead, head.Next, head);
                head = queueHead;
            }
        }
    }
		// 调用 worker thread 进行 schedule (此段程序码利用 dll 引入)
    EnsureThreadRequested();
}

除了利用 CAS 方法 lock-free 的把存放进属於 thread 的私人任务存放区外

果然找到了 , worker Thread 的调用, 以下简单聊聊我对这段的理解。

小结

我会先说说何谓 linux 中的 workQueue , 因为在这段程序码最後, 我们发现 .Net 向外引用了 worker thread 而这是外面的程序码, 我看不到, 所以此刻只能用 workQueue来猜测其功能。接着说说我对 .Net ThreadPool的理解。

workQueue

以 linux 的workQueue为例(注:我程度不足,仅简单说说)

workQueue创建时, 会创建 thread pool 作为 queue pool , 每个 queue 又可以放入多个任务, 所以呈现以下结构。

workQueue

  1. queue 1 ( thread 1 )
    1. task 1
    2. task 2
    3. task 3
  2. queue 2 ( thread 2 )
    1. NULL
  3. queue 3 ( thread 3 )
    1. task 4
  4. worker thread ( thread 4 )

发现里面还多了一条 thread 名叫 worker thread , 其功能是排程 thread 1~3 , 包含 :

  1. 闲置没有任务的 thread (ex thread 2 )
  2. 唤醒有任务的 thread 运行 等等。( ex thread 1,3 )

利用这样的作法, 增进 multi-thread CPU 使用率, 不要有很多 thread 明明没事做, 却还在运作。

ThreadPool

以下从前天 Task 运行完成, 发现连续任务区有任务开始, 按照步骤陈述

  1. Task 进入完成阶段
  2. Task 检查连续任务区
  3. Task 发现连续任务区有任务
  4. Task 想直接接下去用同一条 thread 继续执行, 但被禁止 ( 若没禁止, 进入昨天的情况 )
  5. Task 调用 UnsafeQueueCustomWorkItem 把任务放入全域变数 ThreadPoolGlobals中的 workQueue
  6. 任务被放到 workQueue後被按照 thread 挂载到 thread 的任务区中
  7. 外来 method RequestWorkerThread 此时被呼叫, 会创建 workerThread
  8. workerThread 检查各条 thread 的任务区, 使任务区为空的 thread 闲置, 唤醒任务区有任务的 thread 等等....。

至於被唤醒的 thread 内的任务如何被触发, 会在之後说明。

到此 .Net thread pool 的探底算是告一段落, 可以发现, 若是用 linux 的 workQueue 的角度来理解, .Net 的 thread pool 就是实践了上半部分的 workQueue , 包含维护各个 thread 的 queue, 并且把待执行的任务, 依情况放入各个 thread 的 queue 中。但是 .Net 本身不去排程这些 queue 和他们里面的任务。

.Net 调用外来方法创建一条排程 thread , 这条排程 thread 会完成 workQueue 的排程任务, 包含

  1. 排程 ⇒ 调换各条 thread 的执行顺序, 中断卡住的 thread , 闲置没有任务的 thread 等等......

注 : 关於外来排程 thread 的功能, 我仅能猜测, 因为原始码不好找, 其调用了外部程序。有错可以直接向我说~~

明天进度

透过这几天的努力, 我们了解了 Task 执行完本身任务後处理连续任务区的任务的方法。

明天我们就开始来看看 Task 是怎麽执行本身的任务吧

明天见 !


<<:  Day 11 实作 create_app

>>:  [13th][Day1] 前言

[18] [烧瓶里的部落格] 08. 撰写测试

写单元测试可以检查程序是否按预期执行,Flask 可以模拟发送请求并回传资料 应当尽可能多进行测试,...

php没法连线资料库

在连资料库的时候 跑出这个错误 请问各位大大是哪里有问题? 这是错误的程序 说第八行可是我不知道哪...

[机派X] Day4 - Bash 指令原来如此容易

引言 今天是机派X系列文章的第四天。 昨天介绍了 Bash 的基本概念,大家都熟悉了吗? 本篇将介绍...

[Day14] 测试与迭代

现在,基於我们现有的初始对话流与打造完成的语音应用程序。 来试着让它变得更好! 现在我们进入设计对...

企划实现(13)

GOOGLE登入 第一步:在firebase添加一个新的专案 第二步:选取android专案 第三步...