ねののお庭。

かりかりもふもふ。

System.Reactiveの中身をほんのちょっと読んで見る。(その3)

動作を追う。

メソッドチェーン時(Subscribe前)

サブスクライブが走るまでは、単にIObservableを返し、生成されたオブジェクトとしては、上流のObservableと、Func<TSource, TResult>の関数を保存するだけで、これ以上のことはしません。

Subscribe時

Rxはここが重要ですよね。とりあえずsubject.Select().Subscribe(do something)みたいな感じを想定します。 また再掲まみれになりますが。。。 SelectorクラスのSubscribeが呼ばれると、Producerクラスのここが反応します。

public IDisposable Subscribe(IObserver<TTarget> observer)
{
    if (observer == null)
    {
        throw new ArgumentNullException(nameof(observer));
    }

    return SubscribeRaw(observer, enableSafeguard: true);
}

public IDisposable SubscribeRaw(IObserver<TTarget> observer, bool enableSafeguard)
{
    ISafeObserver<TTarget> safeObserver = null;

    if (enableSafeguard)
    {
        observer = safeObserver = SafeObserver<TTarget>.Wrap(observer);
    }

    //後続のオブザーバーからSinkオブジェクトが生成される。
    //引数のobserverはIObserver<TTarget>
    //戻ってくるsinkはIObserver<TSource>
    //要するに次にOnNextが送られるべきIObserver<TTarget>を内包するIObserver<TSource>が出来上がる。
    var sink = CreateSink(observer);
    
    //後続のオブザーバにsinkオブジェクトの参照をもたせる
    safeObserver?.SetResource(sink);

    if (CurrentThreadScheduler.IsScheduleRequired)
    {
        //こっちはとりあえず無視
        CurrentThreadScheduler.Instance.ScheduleAction(
            (@this: this, sink),
            tuple => tuple.@this.Run(tuple.sink));
    }
    else
    {
        //重要
        Run(sink);
    }

    return sink;
}

Run(sink)は

protected override void Run(_ sink) => sink.Run(_source);

となっており、sinkオブジェクトのRunを呼ぶのでした。Runは以下です。

public virtual void Run(IObservable<TSource> source)
{
    SetUpstream(source.SubscribeSafe(this));
}

またRunを呼んだときのsinkオブジェクトは、IObserverでもあるため、subscribe(safe)にわたすことができます。こうして上流のsource.Subscribe(safe)を呼び出し、どんどん上流につなげて行くことができるわけですね。なるほど。その上流で発生するIDisposableを実装しているsinkオブジェクトが返され、_upstreamにそれらの参照は保持される、と。こうしてsubscribeが上流に伝播し、根っこのsubjectなりにつながることがわかります。

Dispose時

購読停止するためSubscribe()で帰ってきたIDisposableなオブジェクトをdisposeしたときのことを考えます。リソースがどういうふうに開放されていくかは気になりますよね。Subscribeの方さへわかれば、まぁこちらは比較的カンタンで、末端のsinkオブジェクトがsubscribe時には返却されていることがわかります。そこで、SinkのDisposeがどのようになっているかをみればよく、

public void Dispose()
{
if (Interlocked.Exchange(ref _observer, NopObserver<TTarget>.Instance) != NopObserver<TTarget>.Instance)
    Dispose(true);
}

protected virtual void Dispose(bool disposing)
{
    Disposable.TryDispose(ref _upstream);
}

となります。Disposable.TryDisposeは以下のようになっていて、_upstreamのDisposeが連鎖的に呼ばれるようになっています。

internal static bool TryDispose(ref IDisposable fieldRef)
{
    var old = Interlocked.Exchange(ref fieldRef, BooleanDisposable.True);

    if (old == BooleanDisposable.True)
    {
        return false;
    }

    old?.Dispose();
    return true;
}

とりあえずこれくらいで満足しておくこととします。

感想

だいたいの流れはわかりました。 しかしSink<TSource, TTarget>クラスのGetForwarder関数は結局一体なにものなのか、とか、safeObserver?.SetResource(sink)は一体なんの意味が...とか、まぁ気になることはこの時点でもいくつかあるのですが、まぁ初回の探索なのでまぁいいでしょう、という感じにしておきます。 だいたいのクラスの役割とか動作はわかったけど、これ一体どういう思想でこういう設計してるのかなーみたいなところがいまいち察することができないのでココらへんは精進したいですね。。。 あと非同期を考慮しているからInterlockedが頻発して大変。