ねののお庭。

かりかりもふもふ。

C#での非同期メソッドの分析。

この文章は以下の記事を私が翻訳したものです。

Dissecting the async methods in C#

C#は開発者の生産性に優れてますし、私は高性能アプリケーションに適したものにするという最近の取り組みに満足しています。

例えば、C#5の'async'メソッドがあります。 この機能はいくつかのタスクベースの操作を一つのタスクにまとめるもので、ユーザーからみてとても便利なものです。 しかしこの抽象はコストがかかります。 Taskは参照型なので、'async'メソッドが同期的に完了しても、インスタンスが作られるたびにヒープアロケーションが発生します。 C#7では、asyncメソッドはValueTaskのようなtask-likeな型を返せるようになり、いくつかのシナリオではヒープアロケーションの削減もしく回避できます。

このようなことがどのようにして可能なのかを理解するために、内部を調べ、asyncメソッドがどのように実装されているかを見る必要があります。

しかしその前に歴史を少し。

TaskとTaskは.NET 4.0で導入されたクラスで、.NETの非同期・並列プログラミングの領域で大きなメンタルシフトをもたらしたように私には見えます。 .NET1.0のBeginXXX/EndXXXパターン(“Asynchronous Programming Model”として知られています)や.NET 2.0の BackgroundWorkerのようなイベントベース非同期パターンのような古い非同期パターンとは異なり,Taskは合成可能です。

タスクは未来に結果を返すことが約束された作業単位を表します。 その約束(promise)は、IO操作によって裏付けられるか、計算処理の表す可能性があります。 そんなのは重要ではありません。 重要なことは,オペレーションの結果がself-sufficientであり、first-class citizenであることです。 "未来"を渡すことができます、つまり、変数を保存したり、関数から返したり、他の関数にわたすことができます。 2つの「未来」を「結合」して別の「未来」を形成したり、結果を同期的に待機したり、「未来」に「継続(continuation)」を加えることで結果を「待つ(await)」こともできます。 タスクインスタンスだけで、操作が成功したか、障害が発生したか、キャンセルされたかを判断することができます。

Task Parallel Library (TPL)は我々の並行性に関する考え方を変えさせ、さらにC#5はasync/awaitを導入して一歩前進しました。 async/awaitはtaskを構成するのに役立ち、ユーザはtry/catch や using などのよく知られた構文を使うことができます。 しかし、他の抽象化機能と同様に、async/await機能にはコストがかかります。 そして、そのコストが何であるかを理解するために、その下を見る必要があります。

Async method internals

通常のメソッドは、一つのentry pointと一つのexit pointを持っています(一つ以上のreturn文を書くことができても、実行時にはexit pointは一つです)。 しかしasyncメソッド(*)とイテレータ(yield returnで書かれたメソッド)は違います。 asyncメソッドの場合、呼び出し元はすぐに結果を取得し(TaskやTaskなど),得られたtaskを介して真の結果を"await"することができます。

(*)ここでは、"async method"をasyncキーワードでマークされたものとしましょう。実際に非同期に実行されることを意味していません。またメソッドが非同期であるという意味でもありません。コンパイラがメソッドに特別な変換を加えることを意味します。

以下のasyncメソッドを考えてみましょう。

class StockPrices
{
    private Dictionary<string, decimal> _stockPrices;
    public async Task<decimal> GetStockPriceForAsync(string companyId)
    {
        await InitializeMapIfNeededAsync();
        _stockPrices.TryGetValue(companyId, out var result);
        return result;
    }
 
    private async Task InitializeMapIfNeededAsync()
    {
        if (_stockPrices != null)
            return;
 
        await Task.Delay(42);
        // Getting the stock prices from the external source and cache in memory.
        _stockPrices = new Dictionary<string, decimal> { { "MSFT", 42 } };
    }
}

GetStockPriceForAsyncメソッドは_stockPricesが初期化されたら、値をキャッシュから取得することを保証します。

コンパイラが何をしていて、何ができるか理解するため、手で変換してみましょう。

非同期メソッドを手動で分解する

TPLは、タスクの構築と結合を助ける2つの主要なbuilding blockを提供しています。 Task.ContinueWithを使ったタスクの継続と、手動でtaskを構成するためのTaskCompletionSource<T>クラスです。

class GetStockPriceForAsync_StateMachine
{
    enum State { Start, Step1, }
    private readonly StockPrices @this;
    private readonly string _companyId;
    private readonly TaskCompletionSource<decimal> _tcs;
    private Task _initializeMapIfNeededTask;
    private State _state = State.Start;
 
    public GetStockPriceForAsync_StateMachine(StockPrices @this, string companyId)
    {
        this.@this = @this;
        _companyId = companyId;
    }
 
    public void Start()
    {
        try
        {
            if (_state == State.Start)
            {
                // メソッドの開始から最初の'await'までのコード。
 
                if (string.IsNullOrEmpty(_companyId))
                    throw new ArgumentNullException();
 
                _initializeMapIfNeededTask = @this.InitializeMapIfNeeded();
 
                // 状態の更新とcontinuationのスケジュール。
                _state = State.Step1;
                _initializeMapIfNeededTask.ContinueWith(_ => Start());
            }
            else if (_state == State.Step1)
            {
                // 最初にエラーとキャンセルケースを確認する必要があります。
                if (_initializeMapIfNeededTask.Status == TaskStatus.Canceled)
                    _tcs.SetCanceled();
                else if (_initializeMapIfNeededTask.Status == TaskStatus.Faulted)
                    _tcs.SetException(_initializeMapIfNeededTask.Exception.InnerException);
                else
                {
                    // 最初のawaitと残りのメソッド間
 
                    @this._store.TryGetValue(_companyId, out var result);
                    _tcs.SetResult(result);
                }
            }
        }
        catch (Exception e)
        {
            _tcs.SetException(e);
        }
    }
 
    public Task<decimal> Task => _tcs.Task;
}
 
public Task<decimal> GetStockPriceForAsync(string companyId)
{
    var stateMachine = new GetStockPriceForAsync_StateMachine(this, companyId);
    stateMachine.Start();
    return stateMachine.Task;
}

コードは冗長ですが、比較的わかりやすいです。 全てのGetStockPriceForAsyncからのロジックは、“continuation passing style”を利用するGetStockPriceForAsync_StateMachine.Startメソッドに移動しました。 非同期変換の一般的なアルゴリズムは、元のメソッドをawait の境界で分割することです。 最初のブロックはメソッドの開始から最初のawaitまでです。 2つめのブロックは一つめのawaitから2つ目のawaitまでです。 3つめのブロックは-上の部分から3番め、もしくはメソッドの終了まで、など。

// Step 1 of the generated state machine:
 
if (string.IsNullOrEmpty(_companyId)) throw new ArgumentNullException();
_initializeMapIfNeededTask = @this.InitializeMapIfNeeded();

待機中の全てのtaskはステートマシンのフィールドになり、Startメソッドはcontinuationとして自身をsubscribeします。

_state = State.Step1;
_initializeMapIfNeededTask.ContinueWith(_ => Start());

そしてタスクが終了すると、Startメソッドが呼び出され、_stateフィールドを確認して自身がどの段階にいるのかを把握します。 そしてロジックはタスクが正常に終了したか、キャンセルされたか成功したかをチェックします。 後者の場合、ステートマシンは前にすすみ、次のコードブロックを実行します。 すべてが完了すると、ステートマシンはTaskCompletionSourceインスタンスに結果をセットし、GetStockPricesForAsyncから返された結果のタスクは状態をcompletedに変更します。

// The code between first await and the rest of the method
 
@this._stockPrices.TryGetValue(_companyId, out var result);
_tcs.SetResult(result); // The caller gets the result back

この"実装"にはいくつかの深刻な欠点があります。

  • たくさんのヒープアロケーション:
    • ステートマシンで1アロケーション、TaskCompletionSourceで1アロケーション, TaskCompletionSourceの内部で1アロケーション, continuation delegateで1アロケーション。
  • “hot path optimizations”の欠如:
    • すでにawaitしているtaskが終了している場合、 continuationを作成する必要はありません。
  • 拡張生の欠如
    • この実装ではTask-baseクラスと密結合しているため、TaskやTask以外をawaitしたり返り値とする別のシナリオで使うことができません。

では、これらの懸念事項がどのように対処されているのか、実際の非同期機械を見てみましょう。

Async machinery

コンパイラがasyncメソッドを変換する全体的なアプローチは上記のものと非常に似ています。望ましい動作を得るために、コンパイラは以下のタイプに従います。

  • asyncメソッドのスタックフレームのように振る舞い、元のasyncメソッドのすべてのロジックを含む生成されたステートマシン。(**)
  • 完了したTaskを保持し、ステートマシンの状態遷移を管理するAsyncTaskMethodBuilder(これはとてもTaskCompletionSource<T>に似ています)
  • TaskAwaiterは、タスクをラップし、必要に応じてそのタスクのcontinuationをスケジュールします。
  • MoveNextRunnerは正しい実行コンテキストでIAsyncStateMachine.MoveNextmethodを呼び出します。

生成されたステートマシンはreleaseモードではstruct, debugモードではclassです。 他の(MoveNextRunner classは除く)全てはBCLに構造体として定義されています。

(**)コンパイラはステートマシンの型名を<YourMethodNameAsync>d__1 のように生成します。 名前の衝突を避けるために、生成された名前にはユーザが定義できない無向な識別文字が含まれています。以下の例では簡単にするために、<と>を_に置き換えてわかりやすくします。(つまりclass <YourMethodNameAsync>d__1をclass_YourMethodNameAsync_d__1と表記)

The original method

オリジナルの“asynchronous”メソッドはステートマシンを作成し、キャプチャした状態(メソッドが静的でない場合、thisを含む)で初期化し、参照でステートマシンインスタンスをAsyncTaskMethodBuilder.Startに渡して呼び出すことで実行を開始します。

[AsyncStateMachine(typeof(_GetStockPriceForAsync_d__1))]
public Task<decimal> GetStockPriceFor(string companyId)
{
    _GetStockPriceForAsync_d__1 _GetStockPriceFor_d__;
    _GetStockPriceFor_d__.__this = this;
    _GetStockPriceFor_d__.companyId = companyId;
    _GetStockPriceFor_d__.__builder = AsyncTaskMethodBuilder<decimal>.Create();
    _GetStockPriceFor_d__.__state = -1;
    var __t__builder = _GetStockPriceFor_d__.__builder;
    __t__builder.Start<_GetStockPriceForAsync_d__1>(ref _GetStockPriceFor_d__);
    return _GetStockPriceFor_d__.__builder.Task;
}

参照で渡すことは重要な最適化です。なぜならステートマシンは100バイト以上あるため、参照で渡すことで冗長なコピーを避けることができるからです。

The state machine

struct _GetStockPriceForAsync_d__1 : IAsyncStateMachine
{
    public StockPrices __this;
    public string companyId;
    public AsyncTaskMethodBuilder<decimal> __builder;
    public int __state;
    private TaskAwaiter __task1Awaiter;
 
    public void MoveNext()
    {
        decimal result;
        try
        {
            TaskAwaiter awaiter;
            if (__state != 0)
            {
                // State 1 of the generated state machine:
                if (string.IsNullOrEmpty(companyId))
                    throw new ArgumentNullException();
 
                awaiter = __this.InitializeLocalStoreIfNeededAsync().GetAwaiter();
 
                // Hot path optimization: if the task is completed,
                // the state machine automatically moves to the next step
                if (!awaiter.IsCompleted)
                {
                    __state = 0;
                    __task1Awaiter = awaiter;
 
                    // The following call will eventually cause boxing of the state machine.
                    __builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
                    return;
                }
            }
            else
            {
                awaiter = __task1Awaiter;
                __task1Awaiter = default(TaskAwaiter);
                __state = -1;
            }
 
            // GetResult returns void, but it'll throw if the awaited task failed.
            // This exception is catched later and changes the resulting task.
            awaiter.GetResult();
            __this._stocks.TryGetValue(companyId, out result);
        }
        catch (Exception exception)
        {
            // Final state: failure
            __state = -2;
            __builder.SetException(exception);
            return;
        }
 
        // Final state: success
        __state = -2;
        __builder.SetResult(result);
    }
 
    void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
    {
        __builder.SetStateMachine(stateMachine);
    }
}

生成されたステートマシンは複雑に見えますが、本質的には手作業で展開したものと同じです。

とはいえ、重要な違いがいくつかあります。

1. “Hot path” optimization

私達のナイーブなアプローチと違い、生成されたステートマシンはawaitしているtaskが完了済みかどうかを認識しています。

awaiter = __this.InitializeLocalStoreIfNeededAsync().GetAwaiter();
 
// Hot path optimization: if the task is completed,
// the state machine automatically moves to the next step
if (!awaiter.IsCompleted)
{
    // Irrelevant stuff
 
    // The following call will eventually cause boxing of the state machine.
    __builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
    return;
}

待機しているtaskがすでに完了している場合(成功かどうかに関わらず)ステートマシンは杉の状態に進みます。

// GetResult returns void, but it'll throw if the awaited task failed.
// This exception is catched later and changes the resulting task.
awaiter.GetResult();
__this._stocks.TryGetValue(companyId, out result);

これは待機しているtaskがすでに待機している場合、ステートマシン全体はスタックに残ることを意味します。 現在のasyncメソッドでも、待機中のtaskがすでに完了していたり、同期的に終わる場合はメモリのオーバーヘッドが非常に少ないです。 その場合残されたアロケーションはtaskそのものだけです。

2.Error handling

待機中のタスクの失敗やキャンセルをカバーする特別なロジックはありません。 ステートマシンはawaiter.GetResult()をコールしますが、キャンセルされた場合はTaskCancelledException、タスクが失敗した場合は別の例外を投げます。 これはうまく機能するエレガントな解決策で、なぜならGetResult()はtask.Wait()やtask.Resultとは少し異なるためです。

task.Wait()とtask.Resultは失敗した時、一つの例外しかなくてもAggregateExceptionを投げる可能性があります。 その理由はとてもシンプルで、タスクは1つのIO操作だけでなく、並列計算の結果も表現することができるからです。 後者の場合、操作には複数のエラーが発生する可能性があり、AggregateExceptionはこれを一箇所に投げるようデザインされています。

しかしasync/awaitは最大でも一つの例外が発生する非同期操作のために設計されています。 そのため、言語の設計者はawaiter.GetResult()がAggregateExceptionを"unwrap"して最初の失敗だけスローする方が意味があると判断しました。 このデザインの決定は完璧ではなく、次のポストでこの抽象がどのようにリークするか見ます。

非同期ステートマシンはパズルの一部に過ぎません。 全体像を理解するために、ステートマシンがTaskAwaiterとAsyncTaskMethodBuilderにどのような相互作用があるか理解する必要があります。

How different pieces are glued together?

f:id:nenoNaninu:20200607220734p:plain

この図は複雑に見えますが、それぞれのピースが上手くデザインされていて、重要な役割を果たしています。もっともおもしろいのは、待機しているtaskが完了していない場合に起きています(図の茶色の長方形でマークされている部分)

  • ステートマシンは__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);を呼び出し、自分自身をタスクの継続として登録しています。
  • ビルダーはタスクが終了した時、IAsyncStateMachine.MoveNextメソッドが呼び出されることを確認します。
    • ビルダーは現在の実行コンテキストをキャプチャし、ステートマシンのインスタンスに紐付いたMoveNextRunnerを作成します。次にキャプチャした実行コンテキストのもとで、ステートマシンを前進させるMoveNextRunner.RunからActionインスタンスを生成しします。
    • ビルダーはTaskAwaiter.UnsafeOnCompleted(action)を呼び出して、待っているタスクの継続として、与えられたactionをスケジュールします。

待機中のタスクが完了すると、与えられたコールバックが呼び出され、非同期メソッドの次のコードブロックを実行します。

Execution Context

ある人は疑問に思うかもしれません。 実行コンテキストとはなんなのか。なぜそのような複雑なものが必要なのか、と。

同期の世界では、それぞれのスレッドはスレッドローカルストレージに周辺情報を維持します。それはセキュリティ関連情報だったり、culture情報だったり、いろいろです。 3つのメソッドが1つのスレッドでシーケンシャルに呼ばれた場合、これらの情報は自然に流れます。 しかし、これは非同期メソッドでは当てはまりません。 非同期メソッドのそれぞれの"section"では、別々のスレッドで実行されるため、thread-localな情報は使えなくなってしまいます。

実行コンテキストは複数のスレッドにまたがって、一つの論理制御フローを保持します。

Task.Runや、ThreadPool.QueueUserWorkItemは自動的にこれを行います。 Task.Runは呼び出したスレッドの実行コンテキストをキャプチャし、taskインスタンスに保存します。 タスクに関連付けられたタスクスケジューラが与えられたdelegateを実行する時、それは保存されたコンテキストを利用して、ExcutionContext.Runを通して実行されます。

AsyncLocalを使ってこの概念を実行することができます。

static Task ExecutionContextInAction()
{
    var li = new AsyncLocal<int>();
    li.Value = 42;
 
    return Task.Run(() =>
    {
        // Task.Run restores the execution context
        Console.WriteLine("In Task.Run: " + li.Value);
    }).ContinueWith(_ =>
    {
        // The continuation restores the execution context as well
        Console.WriteLine("In Task.ContinueWith: " + li.Value);
    });
}

この例の場合、実行コンテキストはTask.Run、Task.ContinueWithメソッドに流れます。なのでこのメソッドを実行すると以下のような結果になります。

In Task.Run: 42
In Task.ContinueWith: 42

しかしBCLの全てが自動的に実行コンテキストをキャプチャ/リストアするわけではありません。 TaskAwaiter.UnsafeOnCompleteandとAsyncMethodBuilder.AwaitUnsafeOnCompleteの2つの例外があります。 言語の作成者がAwaitTaskContinuationのような組み込み機能に頼らず、AsyncMethodBuilderやMoveNextRunnerを使って、 "unsafe"メソッドの実行コンテキストをマニュアルで流すようにしたのは、奇妙に見えます。 パフォーマンス上の理由か、なにか制限があったのではないかと思います。

その違いを示す例をご紹介します。

static async Task ExecutionContextInAsyncMethod()
{
    var li = new AsyncLocal<int>();
    li.Value = 42;
    await Task.Delay(42);

    // The context is implicitely captured. li.Value is 42
    Console.WriteLine("After first await: " + li.Value);

    var tsk2 = Task.Yield();
    tsk2.GetAwaiter().UnsafeOnCompleted(() =>
    {
        // The context is not captured: li.Value is 0
        Console.WriteLine("Inside UnsafeOnCompleted: " + li.Value);
    });

    await tsk2;

    // The context is captured: li.Value is 42
    Console.WriteLine("After second await: " + li.Value);
}

これの出力は以下のようになります。

After first await: 42
Inside UnsafeOnCompleted: 0
After second await: 42

Conclusion

  • Asyncメソッドは同期メソッドと大きく異なります。
  • コンパイラは各メソッドごとにステートマシンを生成し、もともとのメソッドのロジックをステートマシンに移します。
  • 生成されたコードは同期シナリオ用に高度に最適化されます。タスクがすでに完了済みの場合、asyncメソッドのオーバーヘッドは最小限に抑えられます。
  • 待機中のタスクが完了しない場合、ロジックは多くのヘルパー型に依存して完了されます。

References

実行コンテキストについてもっと詳しく知りたい場合、以下の2つを強くおすすめします。