ねののお庭。

かりかりもふもふ。

【C#】非同期 IO の仕組み。

この記事は C# Advent Calendar 2024 24 日目の記事です。

はじめに

C# の async/await の主な用途は主に2つです。

  • 計算待ち
    • 並列/並行問わず非同期に実行される計算待ち
  • IO 待ち

他にも Unity などゲームエンジンや GUI フレームワークでは、フレームの待機なども async/await で表現する事が可能です。 C# の async/await の詳細についてはこちらの記事をご覧ください。

ともあれ async/await を用いた IO についてですが、await で IO を待機した際には IO が発生したスレッドを手放します。 そのため別の計算処理をそのスレッドで行えるため、計算資源を無駄にすることがありません。 非常に効率が良い。 そして IO の待機が終了したら、再度別の ThreadPool 上のスレッドで処理が継続します。

ここで少し気になるのが、IO が完了した時当然 navite と managed のコードやりとりが走っているはずなのですが、その仕組みはどうなっているの?そして await 後の継続処理 (continuation) はどういう風に ThreadPool に dispatch されているの?という事です。 この記事では主にそのあたりに焦点にあてて、深掘りしていきます。

ネイティブでの非同期 IO の仕組み

IO の処理には当然、ネイティブとのやりとりが必要になります。 そのネイティブでの非同期 IO はどういう仕組みで成り立っているか?というと、 Windows においては I/O completion port, Linux においては epoll、Mac OS においては kqueue などの仕組みが用意されており、これらによって成り立っています。

この記事では主に Windows の I/O completion port に焦点を当てて、話を進めていきます。 とはいえ、基本的にはどの OS でも根底にある概念は似たようなものなので(というと怒られが発生する気もしますが)、Windows 向けではないコードを追いかける際にも役立つでしょう。

I/O Completion Port

I/O completion port の公式のドキュメントはこちら。 ざっくりとはこんな具合。

  • CreateIoCompletionPort() を用いて I/O completion port (以下 completion port)を作成。
  • completion port に1つもしくは1つ以上の file handle を紐づけ。
    • file handle などという呼び名ですが、この handle はディスク上のファイルのみを取り扱うわけではなく、TCP socket や named pipe などのあらゆる I/O についても取り扱うもの。
  • I/O が完了したら I/O completion packet (以下 completion packet) が completion port に紐づく queue に FIFO で詰められる。
  • GetQueuedCompletionStatus() で completion port に completion packet が流れてくるのを待機。
    • この時 GetQueuedCompletionStatus() で completion packet が流れてくるまでは、OS が GetQueuedCompletionStatus() を呼び出したスレッドをブロック。
    • 1つの completion port に対して、複数のスレッドから GetQueuedCompletionStatus() を呼び出す事で並列に待機可能。
  • completion port に completion packet が流れてきたら、GetQueuedCompletionStatus() でブロックしていたスレッドを解放。
    • ブロックしていたスレッドの解放は LIFO。
      • ようするに GetQueuedCompletionStatus() したスレッドが複数ある場合、最も直近に待機に入ったスレッドで(LIFO)、最も古い completion packet を受け取る(FIFO)。
      • completion packet が複数同時に流れてきた際、どれだけのスレッドを同時に解放するかなども設定できる。
        • この値は NumberOfConcurrentThreads というパラメータ名 completion port 作成時に設定する。
        • NumberOfConcurrentThreads はコンカレンシー値などと呼ばれる。

C# / .NET 上での非同期 IO の仕組み

さて、I/O completion port がどのように C# / .NET の世界で取り扱われているかを見ていきましょう。 なお、.NET runtime は色々設定いれる事で挙動を変えられるようになっているのですが、ここでは .NET runtime は既定の構成である事を前提として読み進めていきます。

コード的には System/Threading/PortableThreadPool.cs 及び System/Threading/PortableThreadPool.IO.Windows.cs あたりから見ていきます。

まず PortableThreadPoolコンストラクタから眺めていきます。 すると InitializeIOOnWindows() なる関数を呼び出している事が確認できます。 InitializeIOOnWindows() の実装は以下のようになっています。

// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs#L78-L86
internal sealed partial class PortableThreadPool
{
    private void InitializeIOOnWindows()
    {
        int numConcurrentThreads = IOCompletionPollerCount / IOCompletionPortCount;
        for (int i = 0; i < IOCompletionPortCount; i++)
        {
            _ioPorts[i] = CreateIOCompletionPort(numConcurrentThreads);
        }
    }
}

CreateIOCompletionPort() が呼び出されていますね...! ここで気になるのは、IOCompletionPortCountIOCompletionPollerCount が幾つに設定されているのかという事。 それぞれの定義は以下のようになっています。

// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs#L53-L54
internal sealed partial class PortableThreadPool
{
    private static readonly short IOCompletionPortCount = DetermineIOCompletionPortCount();
    private static readonly int IOCompletionPollerCount = DetermineIOCompletionPollerCount();
}

DetermineIOCompletionPortCount()DetermineIOCompletionPollerCount() を読まないと具体的にどのような値が設定されるのか分からないので、どんどん読み進めていきます。

// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs#L18-L76
internal sealed partial class PortableThreadPool
{
    private static short DetermineIOCompletionPortCount()
    {
        const short DefaultIOPortCount = 1;
        const short MaxIOPortCount = 1 << 10;

        short ioPortCount =
            AppContextConfigHelper.GetInt16Config(
                "System.Threading.ThreadPool.IOCompletionPortCount",
                "DOTNET_ThreadPool_IOCompletionPortCount",
                DefaultIOPortCount,
                allowNegative: false);
        return ioPortCount == 0 ? DefaultIOPortCount : Math.Min(ioPortCount, MaxIOPortCount);
    }

    private static int DetermineIOCompletionPollerCount()
    {
        // Named for consistency with SocketAsyncEngine.Unix.cs, this environment variable is checked to override the exact number of IO completion poller threads to use.
        // See the comment in SocketAsyncEngine.Unix.cs about its potential uses.
        // For this implementation, the ProcessorsPerIOPollerThread config option below may be preferable as it may be less machine-specific.
        int ioPollerCount;
        if (uint.TryParse(Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT"), out uint count) &&
            count != 0)
        {
            ioPollerCount = (int)Math.Min(count, (uint)MaxPossibleThreadCount);
        }
        else if (UnsafeInlineIOCompletionCallbacks)
        {
            // In this mode, default to ProcessorCount pollers to ensure that all processors can be utilized if more work
            // happens on the poller threads
            ioPollerCount = Environment.ProcessorCount;
        }
        else
        {
            int processorsPerPoller =
                AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.ProcessorsPerIOPollerThread", 12, false);
            ioPollerCount = (Environment.ProcessorCount - 1) / processorsPerPoller + 1;
        }

        if (IOCompletionPortCount == 1)
        {
            return ioPollerCount;
        }

        // Use at least one IO poller per port
        if (ioPollerCount <= IOCompletionPortCount)
        {
            return IOCompletionPortCount;
        }

        // Use the same number of IO pollers per port, align up if necessary to make it even
        int rem = ioPollerCount % IOCompletionPortCount;
        if (rem != 0)
        {
            ioPollerCount += IOCompletionPortCount - rem;
        }

        return ioPollerCount;
    }
}

まずは DetermineIOCompletionPortCount() の方ですが、既定では 1 が返ってきそうですね。

DetermineIOCompletionPollerCount() はどうでしょう。 初めに現れる DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT についてはドキュメントが存在します。 しかしながらこの値は既定では設定されていない値なので、スルーしていきます。 その下にある UnsafeInlineIOCompletionCallbacksDOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS という環境変数が 1 に設定されていれば true となるフィールドです。 DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS とはなんのための設定なのかというと、 既定では IO のイベントが終了した際、IO を await した後の continuation は ThreadPool に dispatch されるのですが、 continuation を ThreadPool に dispatch せずに、IO の完了イベントを受け取ったスレッド上で continuation を実行するための設定です。 まぁ Unsafe の prefix から分かる通り、余程の事が無い限り使わない設定です。 というわけで、ここではこれも無視。 ThreadPool に dispatch 万歳。 ということで、以下の else 文の中が実行される事になります。

// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs#L52-L54
else
{
    int processorsPerPoller = 
        AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.ProcessorsPerIOPollerThread", 12, false);
    ioPollerCount = (Environment.ProcessorCount - 1) / processorsPerPoller + 1;
}

既定値が用いられる事を前提に読み進めているので、processorsPerPoller は 12 になるとします。 とすると、ioPollerCount は論理 CPU 数(Environment.ProcessorCount) が 12 個毎に 1 つの poller という感じです。 poller が何を指しているかについては後述しますが、まぁ ProcessorsPerIOPollerThread なんて名前の config が 12 なので ioPollerCount がこのような値になるのは妥当でしょう。

else を抜けて続きを見ていくと以下のようなコードがあるので、DetermineIOCompletionPollerCount() は既定の構成である事に加え、論理 CPU 数を 12 以下と仮定すると、1 を返す事が分かります。

// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs#L57-L60
if (IOCompletionPortCount == 1)
{
    return ioPollerCount;
}

なので結果的にこんな具合に。

internal sealed partial class PortableThreadPool
{
    private static readonly short IOCompletionPortCount = 1;
    private static readonly int IOCompletionPollerCount = 1;
}

さて、ここまで読んだ事でようやく InitializeIOOnWindows() でどのような値が用いられているか分かるようになりました。 IOCompletionPortCount が 1 なので当然作成される completion port は 1 つです。

// 再掲
// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs#L78-L86
internal sealed partial class PortableThreadPool
{
    private void InitializeIOOnWindows()
    {
        int numConcurrentThreads = IOCompletionPollerCount / IOCompletionPortCount;
        for (int i = 0; i < IOCompletionPortCount; i++)
        {
            _ioPorts[i] = CreateIOCompletionPort(numConcurrentThreads);
        }
    }
}

今度は InitializeIOOnWindows() 内で用いられている CreateIOCompletionPort() を覗いてみましょう。

// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs#L88-L99
internal sealed partial class PortableThreadPool
{
    private static nint CreateIOCompletionPort(int numConcurrentThreads)
    {
        nint port =
            Interop.Kernel32.CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, UIntPtr.Zero, numConcurrentThreads);
        if (port == 0)
        {
            int hr = Marshal.GetHRForLastWin32Error();
            Environment.FailFast($"Failed to create an IO completion port. HR: {hr}");
        }

        return port;
    }
}

Interop.Kernel32.CreateIoCompletionPort() はお察しの通り、win32 の CreateIoCompletionPort() に対する P/Invoke です。 なのでここで completion port が作成されているわけですが、IOCompletionPollerCount は 1 だったので、numConcurrentThreads も 1 です。 numConcurrentThreads がどういう値かについては、CreateIoCompletionPort() のドキュメントを読むと分かります。

NumberOfConcurrentThreads [in]

The maximum number of threads that the operating system can allow to concurrently process I/O completion packets for the I/O completion port. This parameter is ignored if the ExistingCompletionPort parameter is not NULL.

If this parameter is zero, the system allows as many concurrently running threads as there are processors in the system.

さて、ここまでで completion port がどのように作成されるのか見てきました。 次は作成した completion port から completion packet が流れてくるのを待機し、completion packet が流れて来た際に continuation をどのように ThreadPool に dispatch していくかを見ていきます。

上記の責務は、PortableThreadPool の class 内 class として存在する IOCompletionPoller という class が負っています。 先ほどからちょこちょこ登場している poller という語彙はこの IOCompletionPoller を指しています。 とりあえずコンストラクタを読んでいきましょう。

// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs#L195-L235
private sealed unsafe class IOCompletionPoller
{
    public IOCompletionPoller(nint port)
    {
        Debug.Assert(port != 0);
        _port = port;

        if (!UnsafeInlineIOCompletionCallbacks)
        {
            _nativeEvents =
                (Interop.Kernel32.OVERLAPPED_ENTRY*)
                NativeMemory.Alloc(NativeEventCapacity, (nuint)sizeof(Interop.Kernel32.OVERLAPPED_ENTRY));
            _events = new ThreadPoolTypedWorkItemQueue<Event, Callback>();

            // These threads don't run user code, use a smaller stack size
            _thread = new Thread(Poll, SmallStackSizeBytes);

            // Poller threads are typically expected to be few in number and have to compete for time slices with all
            // other threads that are scheduled to run. They do only a small amount of work and don't run any user code.
            // In situations where frequently, a large number of threads are scheduled to run, a scheduled poller thread
            // may be delayed artificially quite a bit. The poller threads are given higher priority than normal to
            // mitigate that issue. It's unlikely that these threads would starve a system because in such a situation
            // IO completions would stop occurring. Since the number of IO pollers is configurable, avoid having too
            // many poller threads at higher priority.
            if (IOCompletionPollerCount * 4 < Environment.ProcessorCount)
            {
                _thread.Priority = ThreadPriority.AboveNormal;
            }
        }
        else
        {
            // These threads may run user code, use the default stack size
            _thread = new Thread(PollAndInlineCallbacks);
        }

        _thread.IsThreadPoolThread = true;
        _thread.IsBackground = true;
        _thread.Name = ".NET ThreadPool IO";

        // Thread pool threads must start in the default execution context without transferring the context, so
        // using UnsafeStart() instead of Start()
        _thread.UnsafeStart();
    }
}

既定では UnsafeInlineIOCompletionCallbacks は false ですから、最初の if の中が実行される前提で読んでいきます。

_nativeEvents は後で登場する GetQueuedCompletionStatusEx で用いるバッファなのですが、このバッファに流れてきた completion packet の情報が詰め込まれることになります。

_events の型は ThreadPoolTypedWorkItemQueue です。 名前のまんまで、強く型付けされた ThreadPool の WorkItem の Queue なのですが、これは非常に面白い class なので詳しくは後述します。

_thread では新しいスレッドを作成し、Poll メソッドが渡されている事が確認できます。 また、_thread では大した処理を行わないので、最大の stack size を抑えていたりしているのも面白ポイント。 ここで確認したいのは Poll メソッドの中身です。

// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs#L237-L267
private sealed unsafe class IOCompletionPoller
{
    private const int NativeEventCapacity =
#if DEBUG
      32;
#else
      1024;
#endif

    private void Poll()
    {
        Debug.Assert(_nativeEvents != null);
        Debug.Assert(_events != null);

        while (
            Interop.Kernel32.GetQueuedCompletionStatusEx(
                _port,
                _nativeEvents,
                NativeEventCapacity,
                out int nativeEventCount,
                Timeout.Infinite,
                false))
        {
            Debug.Assert(nativeEventCount > 0);
            Debug.Assert(nativeEventCount <= NativeEventCapacity);

            for (int i = 0; i < nativeEventCount; ++i)
            {
                Interop.Kernel32.OVERLAPPED_ENTRY* nativeEvent = &_nativeEvents[i];
                if (nativeEvent->lpOverlapped != null) // shouldn't be null since null is not posted
                {
                    _events.BatchEnqueue(new Event(nativeEvent->lpOverlapped, nativeEvent->dwNumberOfBytesTransferred));
                }
            }

            _events.CompleteBatchEnqueue();
        }

        ThrowHelper.ThrowApplicationException(Marshal.GetHRForLastWin32Error());
    }
}

まず GetQueuedCompletionStatusEx() ですが、これは GetQueuedCompletionStatus() が completion packet を1つずつ受け取るのに対して、completion packet をバッチで受け取るための関数です。 バッチで受け取った方が効率がいいのは間違いないですからね、当然 GetQueuedCompletionStatusEx() の方が好ましいです。 そして while の中では _nativeEvents[i] で受け取った OVERLAPPED_ENTRYEvent に詰めて BatchEnqueue し、一通り enqueue し終えたら CompleteBatchEnqueue を呼び出すという実装になっています。 BatchEnqueue では event を enqueue するだけで、CompleteBatchEnqueueBatchEnqueue で enqueue した event を ThreadPool に dispatch する事を保証するために呼び出すものです。 状況によっては CompleteBatchEnqueue を呼び出さずとも BatchEnqueue した直後から ThreadPool に dispatch され可能性はありますが、このあたりは ThreadPoolTypedWorkItemQueue の実装を把握した方がはやいでしょう。という事で ThreadPoolTypedWorkItemQueue の実装を覗いていきます。

ThreadPoolTypedWorkItemQueue

ThreadPoolTypedWorkItemQueue の実装は以下のような感じ。

// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs#L1247-L1398

// A strongly typed callback for ThreadPoolTypedWorkItemQueue<T, TCallback>.
// This way we avoid the indirection of a delegate call.
internal interface IThreadPoolTypedWorkItemQueueCallback<T>
{
    static abstract void Invoke(T item);
}

internal sealed class ThreadPoolTypedWorkItemQueue<T, TCallback> : IThreadPoolWorkItem
    // https://github.com/dotnet/runtime/pull/69278#discussion_r871927939
    where T : struct
    where TCallback : struct, IThreadPoolTypedWorkItemQueueCallback<T>
{
    // The scheme works as follows:
    // - From NotScheduled, the only transition is to Scheduled when new items are enqueued and a TP work item is enqueued to process them.
    // - From Scheduled, the only transition is to Determining right before trying to dequeue an item.
    // - From Determining, it can go to either NotScheduled when no items are present in the queue (the previous TP work item processed all of them)
    //   or Scheduled if the queue is still not empty (let the current TP work item handle parallelization as convinient).
    //
    // The goal is to avoid enqueueing more TP work items than necessary, while still ensuring that all items are processed.
    // Another TP work item isn't enqueued to the thread pool hastily while the state is Determining,
    // instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time.
    private enum QueueProcessingStage
    {
        NotScheduled,
        Determining,
        Scheduled
    }

    private QueueProcessingStage _queueProcessingStage;
    private readonly ConcurrentQueue<T> _workItems = new ConcurrentQueue<T>();

    public int Count => _workItems.Count;

    public void Enqueue(T workItem)
    {
        BatchEnqueue(workItem);
        CompleteBatchEnqueue();
    }

    public void BatchEnqueue(T workItem) => _workItems.Enqueue(workItem);

    public void CompleteBatchEnqueue()
    {
        // Only enqueue a work item if the stage is NotScheduled.
        // Otherwise there must be a work item already queued or another thread already handling parallelization.
        if (Interlocked.Exchange(
            ref _queueProcessingStage,
            QueueProcessingStage.Scheduled) == QueueProcessingStage.NotScheduled)
        {
            ThreadPool.UnsafeQueueHighPriorityWorkItemInternal(this);
        }
    }

    private void UpdateQueueProcessingStage(bool isQueueEmpty)
    {
        if (!isQueueEmpty)
        {
            // There are more items to process, set stage to Scheduled and enqueue a TP work item.
            _queueProcessingStage = QueueProcessingStage.Scheduled;
        }
        else
        {
            // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining
            // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer
            // would not have scheduled a work item to process the work, so schedule one one.
            QueueProcessingStage stageBeforeUpdate =
                Interlocked.CompareExchange(
                    ref _queueProcessingStage,
                    QueueProcessingStage.NotScheduled,
                    QueueProcessingStage.Determining);
            Debug.Assert(stageBeforeUpdate != QueueProcessingStage.NotScheduled);
            if (stageBeforeUpdate == QueueProcessingStage.Determining)
            {
                return;
            }
        }

        ThreadPool.UnsafeQueueHighPriorityWorkItemInternal(this);
    }

    void IThreadPoolWorkItem.Execute()
    {
        T workItem;
        while (true)
        {
            Debug.Assert(_queueProcessingStage == QueueProcessingStage.Scheduled);

            // The change needs to be visible to other threads that may request a worker thread before a work item is attempted to be dequeued by the current thread.
            // In particular, if an enqueuer queues a work item and does not request a thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of Scheduled, and try to dequeue again or request another thread.
            _queueProcessingStage = QueueProcessingStage.Determining;
            Interlocked.MemoryBarrier();

            if (_workItems.TryDequeue(out workItem))
            {
                break;
            }

            // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining otherwise.
            // If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer would not have scheduled a work item to process the work, so try to dequeue a work item again.
            QueueProcessingStage stageBeforeUpdate = Interlocked.CompareExchange(ref _queueProcessingStage, QueueProcessingStage.NotScheduled, QueueProcessingStage.Determining);
            Debug.Assert(stageBeforeUpdate != QueueProcessingStage.NotScheduled);
            if (stageBeforeUpdate == QueueProcessingStage.Determining)
            {
                return;
            }
        }

        UpdateQueueProcessingStage(_workItems.IsEmpty);

        ThreadPoolWorkQueueThreadLocals tl = ThreadPoolWorkQueueThreadLocals.threadLocals!;
        Debug.Assert(tl != null);
        Thread currentThread = tl.currentThread;
        Debug.Assert(currentThread == Thread.CurrentThread);
        uint completedCount = 0;
        int startTimeMs = Environment.TickCount;
        while (true)
        {
            TCallback.Invoke(workItem);

            // This work item processes queued work items until certain conditions are met, and tracks some things:
            // - Keep track of the number of work items processed, it will be added to the counter later
            // - Local work items take precedence over all other types of work items, process them first
            // - This work item should not run for too long.
            //   It is processing a specific type of work in batch, but should not starve other thread pool work items.
            //   Check how long it has been since this work item has started, and yield to the thread pool after some time.
            //   The threshold used is half of the thread pool's dispatch quantum, which the thread pool uses for doing periodic work.
            if (++completedCount == uint.MaxValue ||
                tl.workStealingQueue.CanSteal ||
                (uint)(Environment.TickCount - startTimeMs) >= ThreadPoolWorkQueue.DispatchQuantumMs / 2 ||
                !_workItems.TryDequeue(out workItem))
            {
                break;
            }

            // Return to clean ExecutionContext and SynchronizationContext. This may call user code (AsyncLocal value
            // change notifications).
            ExecutionContext.ResetThreadPoolThread(currentThread);

            // Reset thread state after all user code for the work item has completed
            currentThread.ResetThreadPoolThread();
        }

        ThreadInt64PersistentCounter.Add(tl.threadLocalCompletionCountObject!, completedCount);
    }
}

注目するべきは ThreadPoolTypedWorkItemQueue 自体に IThreadPoolWorkItemが実装されている事です。 これにより ThreadPoolTypedWorkItemQueue のインスタンス自身を ThreadPool.UnsafeQueueHighPriorityWorkItemInternal() に ThreadPool に投げる事が出来ています。 非常に効率的な作りです。 また、このあたりの実装は C# 11 で導入された interface static abstract method だったり、.NET 9 で可能になった enum に対する Interlocked.CompareExchange() 等が使われていたりして Span<T> こそ登場していませんが、とっても現代的。

IOCompletionPoller から利用される際の大まかな流れは以下のような感じになります。 面白ポイントが沢山あるのですが、日本語で正しく伝えるのは難しいのでコードは一読した方が良いかもです。

  • BatchEnqueue() が呼び出される
    • すると evnet が ConcurrentQueue に enqueue される。
      • この時 ThreadPoolTypedWorkItemQueue_queueProcessingStageNotScheduled なら真に enqueue されるだけ。
      • NotScheduled で無かった場合、ThreadPoolTypedWorkItemQueue 内部の queue から work item を dequeue して ThreadPool に dispatch する処理が回っているので、即時に処理される可能性アリ。
  • CompleteBatchEnqueue() が呼び出される
    • _queueProcessingStageNotScheduled なら ThreadPool.UnsafeQueueHighPriorityWorkItemInternal(this) が呼び出される
    • _queueProcessingStageNotScheduled 以外なら既に ThreadPoolTypedWorkItemQueue 内部の queue から work item を dequeue して ThreadPool に dispatch する処理が回っているので、ThreadPool.UnsafeQueueHighPriorityWorkItemInternal(this) は呼ばない
  • ThreadPool.UnsafeQueueHighPriorityWorkItemInternal(this) した事により、Execute() が ThreadPool の thread 上から呼び出される
    • TryDequeue() した後 TCallback.Invoke(workItem) する事で continuation を実行
    • ちなみに TryDequeue()TCallback.Invoke(workItem) で呼ばれている UpdateQueueProcessingStage() が非常に賢い
      • UpdateQueueProcessingStage() が呼び出された際、queue が空でなければ、ThreadPool.UnsafeQueueHighPriorityWorkItemInternal(this) を実行し、ThreadPool 上の別の thread で Execute が叩かれる事により、TryDequeue()TCallback.Invoke(workItem) する処理が実行される。
        • つまり queue に work item があれば次々と ThreadPool に dispatch していく事になる。
        • しかも ThreadPool に dispatch する際には this を渡しているだけなので、無駄がない。
    • TCallback.Invoke(workItem) した後、幾つかの条件が全て揃っていれば再度 work item を dequeue して TCallback.Invoke(workItem) を実行。条件は以下の通り。
      • work item の処理中が uint.MaxValue 未満の時
      • WorkStealingQueue の CanSteal が false の時
      • 特定のスレッドでの Execute の処理時間が一定時間未満の時
        • ThreadPool の thread が長時間占有されているのは良くないため
          • 長時間占有すると他の work item が実行されなくなってしまう恐れ出てくる

おわりに

まとめると、既定では以下のようになります。

  • 作成される completion port は1つ
  • completion port に completion packet が流れてくるのを IOCompletionPoller で待機
  • IOCompletionPoller は 12 個の論理 CPU 毎に1つ。
  • IOCompletionPoller 内では completion packet を batch で取得
  • completion packet を取得した後、ThreadPoolTypedWorkItemQueue に work item を enqueue
  • ThreadPoolTypedWorkItemQueue は enqueue された work item を次々と ThreadPool に dispatch
    • 実際に ThreadPool に投げ込んでいるのは IThreadPoolWorkItem を実装している ThreadPoolTypedWorkItemQueue 自分自身
  • completion packet が複数同時に流れてきた場合には
    • 1: それぞれの work item が別々の ThreadPool 上の thread で実行されるようスケジューリング
    • 2: 特定の条件の元では1つの thrad の中で複数の work item を実行

非同期・マルチスレッド関連記事

blog.neno.dev blog.neno.dev

References