この記事は C# Advent Calendar 2024 24 日目の記事です。
はじめに
C# の async/await の主な用途は主に2つです。
- 計算待ち
- 並列/並行問わず非同期に実行される計算待ち
- IO 待ち
他にも Unity などゲームエンジンや GUI フレームワークでは、フレームの待機なども async/await で表現する事が可能です。 C# の async/await の詳細についてはこちらの記事をご覧ください。
ともあれ async/await を用いた IO についてですが、await で IO を待機した際には IO が発生したスレッドを手放します。 そのため別の計算処理をそのスレッドで行えるため、計算資源を無駄にすることがありません。 非常に効率が良い。 そして IO の待機が終了したら、再度別の ThreadPool 上のスレッドで処理が継続します。
ここで少し気になるのが、IO が完了した時当然 navite と managed のコードやりとりが走っているはずなのですが、その仕組みはどうなっているの?そして await 後の継続処理 (continuation) はどういう風に ThreadPool に dispatch されているの?という事です。 この記事では主にそのあたりに焦点にあてて、深掘りしていきます。
ネイティブでの非同期 IO の仕組み
IO の処理には当然、ネイティブとのやりとりが必要になります。 そのネイティブでの非同期 IO はどういう仕組みで成り立っているか?というと、 Windows においては I/O completion port, Linux においては epoll、Mac OS においては kqueue などの仕組みが用意されており、これらによって成り立っています。
この記事では主に Windows の I/O completion port に焦点を当てて、話を進めていきます。 とはいえ、基本的にはどの OS でも根底にある概念は似たようなものなので(というと怒られが発生する気もしますが)、Windows 向けではないコードを追いかける際にも役立つでしょう。
I/O Completion Port
I/O completion port の公式のドキュメントはこちら。 ざっくりとはこんな具合。
CreateIoCompletionPort()
を用いて I/O completion port (以下 completion port)を作成。- completion port に1つもしくは1つ以上の file handle を紐づけ。
- file handle などという呼び名ですが、この handle はディスク上のファイルのみを取り扱うわけではなく、TCP socket や named pipe などのあらゆる I/O についても取り扱うもの。
- I/O が完了したら I/O completion packet (以下 completion packet) が completion port に紐づく queue に FIFO で詰められる。
GetQueuedCompletionStatus()
で completion port に completion packet が流れてくるのを待機。- この時
GetQueuedCompletionStatus()
で completion packet が流れてくるまでは、OS がGetQueuedCompletionStatus()
を呼び出したスレッドをブロック。 - 1つの completion port に対して、複数のスレッドから
GetQueuedCompletionStatus()
を呼び出す事で並列に待機可能。
- この時
- completion port に completion packet が流れてきたら、
GetQueuedCompletionStatus()
でブロックしていたスレッドを解放。- ブロックしていたスレッドの解放は LIFO。
- ようするに
GetQueuedCompletionStatus()
したスレッドが複数ある場合、最も直近に待機に入ったスレッドで(LIFO)、最も古い completion packet を受け取る(FIFO)。 - completion packet が複数同時に流れてきた際、どれだけのスレッドを同時に解放するかなども設定できる。
- この値は
NumberOfConcurrentThreads
というパラメータ名 completion port 作成時に設定する。 NumberOfConcurrentThreads
はコンカレンシー値などと呼ばれる。
- この値は
- ようするに
- ブロックしていたスレッドの解放は LIFO。
C# / .NET 上での非同期 IO の仕組み
さて、I/O completion port がどのように C# / .NET の世界で取り扱われているかを見ていきましょう。 なお、.NET runtime は色々設定いれる事で挙動を変えられるようになっているのですが、ここでは .NET runtime は既定の構成である事を前提として読み進めていきます。
コード的には System/Threading/PortableThreadPool.cs 及び System/Threading/PortableThreadPool.IO.Windows.cs あたりから見ていきます。
まず PortableThreadPool
のコンストラクタから眺めていきます。
すると InitializeIOOnWindows()
なる関数を呼び出している事が確認できます。
InitializeIOOnWindows()
の実装は以下のようになっています。
// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs#L78-L86 internal sealed partial class PortableThreadPool { private void InitializeIOOnWindows() { int numConcurrentThreads = IOCompletionPollerCount / IOCompletionPortCount; for (int i = 0; i < IOCompletionPortCount; i++) { _ioPorts[i] = CreateIOCompletionPort(numConcurrentThreads); } } }
CreateIOCompletionPort()
が呼び出されていますね...!
ここで気になるのは、IOCompletionPortCount
と IOCompletionPollerCount
が幾つに設定されているのかという事。
それぞれの定義は以下のようになっています。
// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.cs#L53-L54 internal sealed partial class PortableThreadPool { private static readonly short IOCompletionPortCount = DetermineIOCompletionPortCount(); private static readonly int IOCompletionPollerCount = DetermineIOCompletionPollerCount(); }
DetermineIOCompletionPortCount()
と DetermineIOCompletionPollerCount()
を読まないと具体的にどのような値が設定されるのか分からないので、どんどん読み進めていきます。
// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs#L18-L76 internal sealed partial class PortableThreadPool { private static short DetermineIOCompletionPortCount() { const short DefaultIOPortCount = 1; const short MaxIOPortCount = 1 << 10; short ioPortCount = AppContextConfigHelper.GetInt16Config( "System.Threading.ThreadPool.IOCompletionPortCount", "DOTNET_ThreadPool_IOCompletionPortCount", DefaultIOPortCount, allowNegative: false); return ioPortCount == 0 ? DefaultIOPortCount : Math.Min(ioPortCount, MaxIOPortCount); } private static int DetermineIOCompletionPollerCount() { // Named for consistency with SocketAsyncEngine.Unix.cs, this environment variable is checked to override the exact number of IO completion poller threads to use. // See the comment in SocketAsyncEngine.Unix.cs about its potential uses. // For this implementation, the ProcessorsPerIOPollerThread config option below may be preferable as it may be less machine-specific. int ioPollerCount; if (uint.TryParse(Environment.GetEnvironmentVariable("DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT"), out uint count) && count != 0) { ioPollerCount = (int)Math.Min(count, (uint)MaxPossibleThreadCount); } else if (UnsafeInlineIOCompletionCallbacks) { // In this mode, default to ProcessorCount pollers to ensure that all processors can be utilized if more work // happens on the poller threads ioPollerCount = Environment.ProcessorCount; } else { int processorsPerPoller = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.ProcessorsPerIOPollerThread", 12, false); ioPollerCount = (Environment.ProcessorCount - 1) / processorsPerPoller + 1; } if (IOCompletionPortCount == 1) { return ioPollerCount; } // Use at least one IO poller per port if (ioPollerCount <= IOCompletionPortCount) { return IOCompletionPortCount; } // Use the same number of IO pollers per port, align up if necessary to make it even int rem = ioPollerCount % IOCompletionPortCount; if (rem != 0) { ioPollerCount += IOCompletionPortCount - rem; } return ioPollerCount; } }
まずは DetermineIOCompletionPortCount()
の方ですが、既定では 1 が返ってきそうですね。
DetermineIOCompletionPollerCount()
はどうでしょう。
初めに現れる DOTNET_SYSTEM_NET_SOCKETS_THREAD_COUNT
についてはドキュメントが存在します。
しかしながらこの値は既定では設定されていない値なので、スルーしていきます。
その下にある UnsafeInlineIOCompletionCallbacks
は DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS
という環境変数が 1 に設定されていれば true となるフィールドです。
DOTNET_SYSTEM_NET_SOCKETS_INLINE_COMPLETIONS
とはなんのための設定なのかというと、
既定では IO のイベントが終了した際、IO を await した後の continuation は ThreadPool に dispatch されるのですが、
continuation を ThreadPool に dispatch せずに、IO の完了イベントを受け取ったスレッド上で continuation を実行するための設定です。
まぁ Unsafe の prefix から分かる通り、余程の事が無い限り使わない設定です。
というわけで、ここではこれも無視。
ThreadPool に dispatch 万歳。
ということで、以下の else 文の中が実行される事になります。
// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs#L52-L54 else { int processorsPerPoller = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.ProcessorsPerIOPollerThread", 12, false); ioPollerCount = (Environment.ProcessorCount - 1) / processorsPerPoller + 1; }
既定値が用いられる事を前提に読み進めているので、processorsPerPoller
は 12 になるとします。
とすると、ioPollerCount
は論理 CPU 数(Environment.ProcessorCount
) が 12 個毎に 1 つの poller という感じです。
poller が何を指しているかについては後述しますが、まぁ ProcessorsPerIOPollerThread
なんて名前の config が 12 なので ioPollerCount
がこのような値になるのは妥当でしょう。
else を抜けて続きを見ていくと以下のようなコードがあるので、DetermineIOCompletionPollerCount()
は既定の構成である事に加え、論理 CPU 数を 12 以下と仮定すると、1 を返す事が分かります。
// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs#L57-L60 if (IOCompletionPortCount == 1) { return ioPollerCount; }
なので結果的にこんな具合に。
internal sealed partial class PortableThreadPool { private static readonly short IOCompletionPortCount = 1; private static readonly int IOCompletionPollerCount = 1; }
さて、ここまで読んだ事でようやく InitializeIOOnWindows()
でどのような値が用いられているか分かるようになりました。
IOCompletionPortCount
が 1 なので当然作成される completion port は 1 つです。
// 再掲 // https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs#L78-L86 internal sealed partial class PortableThreadPool { private void InitializeIOOnWindows() { int numConcurrentThreads = IOCompletionPollerCount / IOCompletionPortCount; for (int i = 0; i < IOCompletionPortCount; i++) { _ioPorts[i] = CreateIOCompletionPort(numConcurrentThreads); } } }
今度は InitializeIOOnWindows()
内で用いられている CreateIOCompletionPort()
を覗いてみましょう。
// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs#L88-L99 internal sealed partial class PortableThreadPool { private static nint CreateIOCompletionPort(int numConcurrentThreads) { nint port = Interop.Kernel32.CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, UIntPtr.Zero, numConcurrentThreads); if (port == 0) { int hr = Marshal.GetHRForLastWin32Error(); Environment.FailFast($"Failed to create an IO completion port. HR: {hr}"); } return port; } }
Interop.Kernel32.CreateIoCompletionPort()
はお察しの通り、win32 の CreateIoCompletionPort()
に対する P/Invoke です。
なのでここで completion port が作成されているわけですが、IOCompletionPollerCount
は 1 だったので、numConcurrentThreads
も 1 です。
numConcurrentThreads
がどういう値かについては、CreateIoCompletionPort()
のドキュメントを読むと分かります。
NumberOfConcurrentThreads [in]
The maximum number of threads that the operating system can allow to concurrently process I/O completion packets for the I/O completion port. This parameter is ignored if the ExistingCompletionPort parameter is not NULL.
If this parameter is zero, the system allows as many concurrently running threads as there are processors in the system.
さて、ここまでで completion port がどのように作成されるのか見てきました。 次は作成した completion port から completion packet が流れてくるのを待機し、completion packet が流れて来た際に continuation をどのように ThreadPool に dispatch していくかを見ていきます。
上記の責務は、PortableThreadPool
の class 内 class として存在する IOCompletionPoller
という class が負っています。
先ほどからちょこちょこ登場している poller という語彙はこの IOCompletionPoller
を指しています。
とりあえずコンストラクタを読んでいきましょう。
// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs#L195-L235 private sealed unsafe class IOCompletionPoller { public IOCompletionPoller(nint port) { Debug.Assert(port != 0); _port = port; if (!UnsafeInlineIOCompletionCallbacks) { _nativeEvents = (Interop.Kernel32.OVERLAPPED_ENTRY*) NativeMemory.Alloc(NativeEventCapacity, (nuint)sizeof(Interop.Kernel32.OVERLAPPED_ENTRY)); _events = new ThreadPoolTypedWorkItemQueue<Event, Callback>(); // These threads don't run user code, use a smaller stack size _thread = new Thread(Poll, SmallStackSizeBytes); // Poller threads are typically expected to be few in number and have to compete for time slices with all // other threads that are scheduled to run. They do only a small amount of work and don't run any user code. // In situations where frequently, a large number of threads are scheduled to run, a scheduled poller thread // may be delayed artificially quite a bit. The poller threads are given higher priority than normal to // mitigate that issue. It's unlikely that these threads would starve a system because in such a situation // IO completions would stop occurring. Since the number of IO pollers is configurable, avoid having too // many poller threads at higher priority. if (IOCompletionPollerCount * 4 < Environment.ProcessorCount) { _thread.Priority = ThreadPriority.AboveNormal; } } else { // These threads may run user code, use the default stack size _thread = new Thread(PollAndInlineCallbacks); } _thread.IsThreadPoolThread = true; _thread.IsBackground = true; _thread.Name = ".NET ThreadPool IO"; // Thread pool threads must start in the default execution context without transferring the context, so // using UnsafeStart() instead of Start() _thread.UnsafeStart(); } }
既定では UnsafeInlineIOCompletionCallbacks
は false ですから、最初の if の中が実行される前提で読んでいきます。
_nativeEvents
は後で登場する GetQueuedCompletionStatusEx
で用いるバッファなのですが、このバッファに流れてきた completion packet の情報が詰め込まれることになります。
_events
の型は ThreadPoolTypedWorkItemQueue
です。
名前のまんまで、強く型付けされた ThreadPool の WorkItem の Queue なのですが、これは非常に面白い class なので詳しくは後述します。
_thread
では新しいスレッドを作成し、Poll
メソッドが渡されている事が確認できます。
また、_thread
では大した処理を行わないので、最大の stack size を抑えていたりしているのも面白ポイント。
ここで確認したいのは Poll
メソッドの中身です。
// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/PortableThreadPool.IO.Windows.cs#L237-L267 private sealed unsafe class IOCompletionPoller { private const int NativeEventCapacity = #if DEBUG 32; #else 1024; #endif private void Poll() { Debug.Assert(_nativeEvents != null); Debug.Assert(_events != null); while ( Interop.Kernel32.GetQueuedCompletionStatusEx( _port, _nativeEvents, NativeEventCapacity, out int nativeEventCount, Timeout.Infinite, false)) { Debug.Assert(nativeEventCount > 0); Debug.Assert(nativeEventCount <= NativeEventCapacity); for (int i = 0; i < nativeEventCount; ++i) { Interop.Kernel32.OVERLAPPED_ENTRY* nativeEvent = &_nativeEvents[i]; if (nativeEvent->lpOverlapped != null) // shouldn't be null since null is not posted { _events.BatchEnqueue(new Event(nativeEvent->lpOverlapped, nativeEvent->dwNumberOfBytesTransferred)); } } _events.CompleteBatchEnqueue(); } ThrowHelper.ThrowApplicationException(Marshal.GetHRForLastWin32Error()); } }
まず GetQueuedCompletionStatusEx()
ですが、これは GetQueuedCompletionStatus()
が completion packet を1つずつ受け取るのに対して、completion packet をバッチで受け取るための関数です。
バッチで受け取った方が効率がいいのは間違いないですからね、当然 GetQueuedCompletionStatusEx()
の方が好ましいです。
そして while の中では _nativeEvents[i]
で受け取った OVERLAPPED_ENTRY
を Event
に詰めて BatchEnqueue
し、一通り enqueue し終えたら CompleteBatchEnqueue
を呼び出すという実装になっています。
BatchEnqueue
では event を enqueue するだけで、CompleteBatchEnqueue
は BatchEnqueue
で enqueue した event を ThreadPool に dispatch する事を保証するために呼び出すものです。
状況によっては CompleteBatchEnqueue
を呼び出さずとも BatchEnqueue
した直後から ThreadPool に dispatch され可能性はありますが、このあたりは ThreadPoolTypedWorkItemQueue
の実装を把握した方がはやいでしょう。という事で ThreadPoolTypedWorkItemQueue
の実装を覗いていきます。
ThreadPoolTypedWorkItemQueue
ThreadPoolTypedWorkItemQueue
の実装は以下のような感じ。
// https://github.com/dotnet/runtime/blob/v9.0.0/src/libraries/System.Private.CoreLib/src/System/Threading/ThreadPoolWorkQueue.cs#L1247-L1398 // A strongly typed callback for ThreadPoolTypedWorkItemQueue<T, TCallback>. // This way we avoid the indirection of a delegate call. internal interface IThreadPoolTypedWorkItemQueueCallback<T> { static abstract void Invoke(T item); } internal sealed class ThreadPoolTypedWorkItemQueue<T, TCallback> : IThreadPoolWorkItem // https://github.com/dotnet/runtime/pull/69278#discussion_r871927939 where T : struct where TCallback : struct, IThreadPoolTypedWorkItemQueueCallback<T> { // The scheme works as follows: // - From NotScheduled, the only transition is to Scheduled when new items are enqueued and a TP work item is enqueued to process them. // - From Scheduled, the only transition is to Determining right before trying to dequeue an item. // - From Determining, it can go to either NotScheduled when no items are present in the queue (the previous TP work item processed all of them) // or Scheduled if the queue is still not empty (let the current TP work item handle parallelization as convinient). // // The goal is to avoid enqueueing more TP work items than necessary, while still ensuring that all items are processed. // Another TP work item isn't enqueued to the thread pool hastily while the state is Determining, // instead the parallelizer takes care of that. We also ensure that only one thread can be parallelizing at any time. private enum QueueProcessingStage { NotScheduled, Determining, Scheduled } private QueueProcessingStage _queueProcessingStage; private readonly ConcurrentQueue<T> _workItems = new ConcurrentQueue<T>(); public int Count => _workItems.Count; public void Enqueue(T workItem) { BatchEnqueue(workItem); CompleteBatchEnqueue(); } public void BatchEnqueue(T workItem) => _workItems.Enqueue(workItem); public void CompleteBatchEnqueue() { // Only enqueue a work item if the stage is NotScheduled. // Otherwise there must be a work item already queued or another thread already handling parallelization. if (Interlocked.Exchange( ref _queueProcessingStage, QueueProcessingStage.Scheduled) == QueueProcessingStage.NotScheduled) { ThreadPool.UnsafeQueueHighPriorityWorkItemInternal(this); } } private void UpdateQueueProcessingStage(bool isQueueEmpty) { if (!isQueueEmpty) { // There are more items to process, set stage to Scheduled and enqueue a TP work item. _queueProcessingStage = QueueProcessingStage.Scheduled; } else { // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining // otherwise. If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer // would not have scheduled a work item to process the work, so schedule one one. QueueProcessingStage stageBeforeUpdate = Interlocked.CompareExchange( ref _queueProcessingStage, QueueProcessingStage.NotScheduled, QueueProcessingStage.Determining); Debug.Assert(stageBeforeUpdate != QueueProcessingStage.NotScheduled); if (stageBeforeUpdate == QueueProcessingStage.Determining) { return; } } ThreadPool.UnsafeQueueHighPriorityWorkItemInternal(this); } void IThreadPoolWorkItem.Execute() { T workItem; while (true) { Debug.Assert(_queueProcessingStage == QueueProcessingStage.Scheduled); // The change needs to be visible to other threads that may request a worker thread before a work item is attempted to be dequeued by the current thread. // In particular, if an enqueuer queues a work item and does not request a thread because it sees a Determining or Scheduled stage, and the current thread is the last thread processing work items, the current thread must either see the work item queued by the enqueuer, or it must see a stage of Scheduled, and try to dequeue again or request another thread. _queueProcessingStage = QueueProcessingStage.Determining; Interlocked.MemoryBarrier(); if (_workItems.TryDequeue(out workItem)) { break; } // The stage here would be Scheduled if an enqueuer has enqueued work and changed the stage, or Determining otherwise. // If the stage is Determining, there's no more work to do. If the stage is Scheduled, the enqueuer would not have scheduled a work item to process the work, so try to dequeue a work item again. QueueProcessingStage stageBeforeUpdate = Interlocked.CompareExchange(ref _queueProcessingStage, QueueProcessingStage.NotScheduled, QueueProcessingStage.Determining); Debug.Assert(stageBeforeUpdate != QueueProcessingStage.NotScheduled); if (stageBeforeUpdate == QueueProcessingStage.Determining) { return; } } UpdateQueueProcessingStage(_workItems.IsEmpty); ThreadPoolWorkQueueThreadLocals tl = ThreadPoolWorkQueueThreadLocals.threadLocals!; Debug.Assert(tl != null); Thread currentThread = tl.currentThread; Debug.Assert(currentThread == Thread.CurrentThread); uint completedCount = 0; int startTimeMs = Environment.TickCount; while (true) { TCallback.Invoke(workItem); // This work item processes queued work items until certain conditions are met, and tracks some things: // - Keep track of the number of work items processed, it will be added to the counter later // - Local work items take precedence over all other types of work items, process them first // - This work item should not run for too long. // It is processing a specific type of work in batch, but should not starve other thread pool work items. // Check how long it has been since this work item has started, and yield to the thread pool after some time. // The threshold used is half of the thread pool's dispatch quantum, which the thread pool uses for doing periodic work. if (++completedCount == uint.MaxValue || tl.workStealingQueue.CanSteal || (uint)(Environment.TickCount - startTimeMs) >= ThreadPoolWorkQueue.DispatchQuantumMs / 2 || !_workItems.TryDequeue(out workItem)) { break; } // Return to clean ExecutionContext and SynchronizationContext. This may call user code (AsyncLocal value // change notifications). ExecutionContext.ResetThreadPoolThread(currentThread); // Reset thread state after all user code for the work item has completed currentThread.ResetThreadPoolThread(); } ThreadInt64PersistentCounter.Add(tl.threadLocalCompletionCountObject!, completedCount); } }
注目するべきは ThreadPoolTypedWorkItemQueue
自体に IThreadPoolWorkItem
が実装されている事です。
これにより ThreadPoolTypedWorkItemQueue
のインスタンス自身を ThreadPool.UnsafeQueueHighPriorityWorkItemInternal()
に ThreadPool に投げる事が出来ています。
非常に効率的な作りです。
また、このあたりの実装は C# 11 で導入された interface static abstract method だったり、.NET 9 で可能になった enum に対する Interlocked.CompareExchange()
等が使われていたりして Span<T>
こそ登場していませんが、とっても現代的。
IOCompletionPoller
から利用される際の大まかな流れは以下のような感じになります。
面白ポイントが沢山あるのですが、日本語で正しく伝えるのは難しいのでコードは一読した方が良いかもです。
BatchEnqueue()
が呼び出される- すると evnet が
ConcurrentQueue
に enqueue される。- この時
ThreadPoolTypedWorkItemQueue
の_queueProcessingStage
がNotScheduled
なら真に enqueue されるだけ。 NotScheduled
で無かった場合、ThreadPoolTypedWorkItemQueue
内部の queue から work item を dequeue して ThreadPool に dispatch する処理が回っているので、即時に処理される可能性アリ。
- この時
- すると evnet が
CompleteBatchEnqueue()
が呼び出される_queueProcessingStage
がNotScheduled
ならThreadPool.UnsafeQueueHighPriorityWorkItemInternal(this)
が呼び出される_queueProcessingStage
がNotScheduled
以外なら既にThreadPoolTypedWorkItemQueue
内部の queue から work item を dequeue して ThreadPool に dispatch する処理が回っているので、ThreadPool.UnsafeQueueHighPriorityWorkItemInternal(this)
は呼ばない
ThreadPool.UnsafeQueueHighPriorityWorkItemInternal(this)
した事により、Execute()
が ThreadPool の thread 上から呼び出されるTryDequeue()
した後TCallback.Invoke(workItem)
する事で continuation を実行- ちなみに
TryDequeue()
とTCallback.Invoke(workItem)
で呼ばれているUpdateQueueProcessingStage()
が非常に賢いUpdateQueueProcessingStage()
が呼び出された際、queue が空でなければ、ThreadPool.UnsafeQueueHighPriorityWorkItemInternal(this)
を実行し、ThreadPool 上の別の thread でExecute
が叩かれる事により、TryDequeue()
しTCallback.Invoke(workItem)
する処理が実行される。- つまり queue に work item があれば次々と ThreadPool に dispatch していく事になる。
- しかも ThreadPool に dispatch する際には
this
を渡しているだけなので、無駄がない。
TCallback.Invoke(workItem)
した後、幾つかの条件が全て揃っていれば再度 work item を dequeue してTCallback.Invoke(workItem)
を実行。条件は以下の通り。- work item の処理中が uint.MaxValue 未満の時
WorkStealingQueue
の CanSteal が false の時- 特定のスレッドでの
Execute
の処理時間が一定時間未満の時- ThreadPool の thread が長時間占有されているのは良くないため
- 長時間占有すると他の work item が実行されなくなってしまう恐れ出てくる
- ThreadPool の thread が長時間占有されているのは良くないため
おわりに
まとめると、既定では以下のようになります。
- 作成される completion port は1つ
- completion port に completion packet が流れてくるのを
IOCompletionPoller
で待機 IOCompletionPoller
は 12 個の論理 CPU 毎に1つ。IOCompletionPoller
内では completion packet を batch で取得- completion packet を取得した後、
ThreadPoolTypedWorkItemQueue
に work item を enqueue ThreadPoolTypedWorkItemQueue
は enqueue された work item を次々と ThreadPool に dispatch- 実際に ThreadPool に投げ込んでいるのは
IThreadPoolWorkItem
を実装しているThreadPoolTypedWorkItemQueue
自分自身
- 実際に ThreadPool に投げ込んでいるのは
- completion packet が複数同時に流れてきた場合には
- 1: それぞれの work item が別々の ThreadPool 上の thread で実行されるようスケジューリング
- 2: 特定の条件の元では1つの thrad の中で複数の work item を実行