ねののお庭。

かりかりもふもふ。

System.Reactiveの中身をほんのちょっと読んで見る。(その2)

使われているクラスなどを見ていく。

前回具象クラスがなんであるかわかったので、一番最初の疑問であった

public static partial class Observable
{
//〜略〜
    public static IObservable<TResult> Select<TSource, TResult>(this IObservable<TSource> source, Func<TSource, TResult> selector)
    {
        if (source == null)
        {
            throw new ArgumentNullException(nameof(source));
        }

        if (selector == null)
        {
            throw new ArgumentNullException(nameof(selector));
        }

        return s_impl.Select(source, selector);
    }
//〜略〜

return s_impl.Select(source, selector);

が実際にはこちらを呼んでいる、ということがわかります。 雑に引用すると

internal partial class QueryLanguage
{
    //略
    public virtual IObservable<TResult> Select<TSource, TResult>(IObservable<TSource> source, Func<TSource, TResult> selector)
    {
        return new Select<TSource, TResult>.Selector(source, selector);
    }
    //略
}

なるほど。Select<T,TResult>クラスに潜ってみます。実際にnewしているのはSelect<TSource, TResult>クラスの内部クラスであるところの、Selectorクラスです。若干コメント入れてます。

//Producerは後続に値を流すObservableなので、TResult
internal sealed class Selector : Producer<TResult, Selector._>
{
    private readonly IObservable<TSource> _source;
    private readonly Func<TSource, TResult> _selector;

    public Selector(IObservable<TSource> source, Func<TSource, TResult> selector)
    {
        _source = source;
        _selector = selector;
    }

    protected override _ CreateSink(IObserver<TResult> observer) => new _(_selector, observer);

    protected override void Run(_ sink) => sink.Run(_source);

    //Sinkは上流から値を受けるObserverなのでTSource
    internal sealed class _ : Sink<TSource, TResult>
    {
        private readonly Func<TSource, TResult> _selector;

        public _(Func<TSource, TResult> selector, IObserver<TResult> observer)
            : base(observer)
        {
            _selector = selector;
        }

        public override void OnNext(TSource value)
        {
            var result = default(TResult);
            try
            {
                result = _selector(value);
            }
            catch (Exception exception)
            {
                ForwardOnError(exception);
                return;
            }

            ForwardOnNext(result);
        }
    }
}

とりあえずこのクラスの各部品を眺めて行きます。 まず返り値はIObservableでなければならないので、継承しているProducerクラスがおそらくIObservableでも実装しているのかなとおもいつつ眺めると,

internal abstract class Producer<TTarget, TSink> : IProducer<TTarget>
    where TSink : IDisposable
{
    /// <summary>
    /// Publicly visible Subscribe method.
    /// </summary>
    /// <param name=&quot;observer&quot;>Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param>
    /// <returns>IDisposable to cancel the subscription. This causes the underlying sink to be notified of unsubscription, causing it to prevent further messages from being sent to the observer.</returns>
    public IDisposable Subscribe(IObserver<TTarget> observer)
    {
        if (observer == null)
        {
            throw new ArgumentNullException(nameof(observer));
        }

        return SubscribeRaw(observer, enableSafeguard: true);
    }

    public IDisposable SubscribeRaw(IObserver<TTarget> observer, bool enableSafeguard)
    {
        ISafeObserver<TTarget> safeObserver = null;

        //
        // See AutoDetachObserver.cs for more information on the safeguarding requirement and
        // its implementation aspects.
        //
        if (enableSafeguard)
        {
            observer = safeObserver = SafeObserver<TTarget>.Wrap(observer);
        }

        var sink = CreateSink(observer);

        safeObserver?.SetResource(sink);

        if (CurrentThreadScheduler.IsScheduleRequired)
        {
            CurrentThreadScheduler.Instance.ScheduleAction(
                (@this: this, sink),
                tuple => tuple.@this.Run(tuple.sink));
        }
        else
        {
            Run(sink);
        }

        return sink;
    }

    /// <summary>
    /// Core implementation of the query operator, called upon a new subscription to the producer object.
    /// </summary>
    /// <param name=&quot;sink&quot;>The sink object.</param>
    protected abstract void Run(TSink sink);

    protected abstract TSink CreateSink(IObserver<TTarget> observer);
}

ようやくSubscribeが見えてきました。Observableっぽい。プロデューサーさんが実装しているIProducerインターフェースをみると、

internal interface IProducer<out TSource> : IObservable<TSource>
{
    IDisposable SubscribeRaw(IObserver<TSource> observer, bool enableSafeguard);
}

となっていてなるほどなるほどみたいな感じにIObservableが張り付いています。 とりあえず細かいことはさておき、次にSink<TSource, TResult>を継承している、_クラス(なんでクラス名こんななんだろうか)を見ていきます。 Sinkクラスはこんな実装になっているIObserverです

/// <summary>
/// Base class for implementation of query operators, providing a lightweight sink that can be disposed to mute the outgoing observer.
/// </summary>
/// <typeparam name=&quot;TTarget&quot;>Type of the resulting sequence's elements.</typeparam>
/// <typeparam name=&quot;TSource&quot;></typeparam>
/// <remarks>Implementations of sinks are responsible to enforce the message grammar on the associated observer. Upon sending a terminal message, a pairing Dispose call should be made to trigger cancellation of related resources and to mute the outgoing observer.</remarks>
internal abstract class Sink<TSource, TTarget> : Sink<TTarget>, IObserver<TSource>
{
    protected Sink(IObserver<TTarget> observer) : base(observer)
    {
    }

    public virtual void Run(IObservable<TSource> source)
    {
        SetUpstream(source.SubscribeSafe(this));
    }

    public abstract void OnNext(TSource value);

    public virtual void OnError(Exception error)
    {
        ForwardOnError(error);
    }

    public virtual void OnCompleted()
    {
        ForwardOnCompleted();
    }

    public IObserver<TTarget> GetForwarder() => new _(this);

    private sealed class _ : IObserver<TTarget>
    {
        private readonly Sink<TSource, TTarget> _forward;

        public _(Sink<TSource, TTarget> forward)
        {
            _forward = forward;
        }

        public void OnNext(TTarget value)
        {
            _forward.ForwardOnNext(value);
        }

        public void OnError(Exception error)
        {
            _forward.ForwardOnError(error);
        }

        public void OnCompleted()
        {
            _forward.ForwardOnCompleted();
        }
    }
}

sinkって命名なんだろうと思って調べたのですが、ノードを考えるときに、矢印がでている方をsource nodeというのに対して、矢印の先のノードのことをsink nodeとかいうそうです。しりませんでした。ObservableからObserverへ、なのでsinkという命名はもっともらしいですねー。

パーツが多いのでどこから眺めるか微妙なところですが、Producerの型引数にSinkを継承しているSelector._が入っているので、Sinkから眺めていくこととします。 さらにSink<TSource, TTarget>クラスはSinkを継承しているのてSinkから見ます。

internal interface ISink<in TTarget>
{
    void ForwardOnNext(TTarget value);
    void ForwardOnCompleted();
    void ForwardOnError(Exception error);
}
    
internal abstract class Sink<TTarget> : ISink<TTarget>, IDisposable
{
    private IDisposable _upstream;
    private volatile IObserver<TTarget> _observer;

    protected Sink(IObserver<TTarget> observer)
    {
        _observer = observer;
    }

    public void Dispose()
    {
        if (Interlocked.Exchange(ref _observer, NopObserver<TTarget>.Instance) != NopObserver<TTarget>.Instance)
            Dispose(true);
    }

    /// <summary>
    /// Override this method to dispose additional resources.
    /// The method is guaranteed to be called at most once.
    /// </summary>
    /// <param name=&quot;disposing&quot;>If true, the method was called from <see cref=&quot;Dispose()&quot;/>.</param>
    protected virtual void Dispose(bool disposing)
    {
        //Calling base.Dispose(true) is not a proper disposal, so we can omit the assignment here.
        //Sink is internal so this can pretty much be enforced.
        //_observer = NopObserver<TTarget>.Instance;

        Disposable.TryDispose(ref _upstream);
    }

    public void ForwardOnNext(TTarget value)
    {
        _observer.OnNext(value);
    }

    public void ForwardOnCompleted()
    {
        _observer.OnCompleted();
        Dispose();
    }

    public void ForwardOnError(Exception error)
    {
        _observer.OnError(error);
        Dispose();
    }

    protected void SetUpstream(IDisposable upstream)
    {
        Disposable.SetSingle(ref _upstream, upstream);
    }

    protected void DisposeUpstream()
    {
        Disposable.TryDispose(ref _upstream);
    }
}

コンストラクタで与えられるobserverは後続の(今回ならSelectオペレータのあとに続くオペレータの)ものです。 Sink後続のObserverにOnNext,OnError,OnConpletedの値を流すことの他に、いろいろDisposeする機能もあるみたい。_upstreamが?という感じはありますが、Sink<TSource, TTarget>を一目瞭然です。 なので次にSink<TSource, TTarget>クラスを眺めます。 まえの_upstreamは、

internal abstract class Sink<TSource, TTarget> : Sink<TTarget>, IObserver<TSource>
{
    //略
    public virtual void Run(IObservable<TSource> source)
    {
        SetUpstream(source.SubscribeSafe(this));
    }
    //略
}

から分かるように、上流のsource.Subscribeが返すIDisposableオブジェクト(source.SubscribeSafeの中はnullチェックとかいろいろはいってるけど、結局帰ってくるのはsource.Subscribeの戻り値)なので、上流のDisposeするべきものが渡されているということがわかります。名前通り。

IObserverがくっつけられるわけですが、基本的にはSinkの関数をそのまま呼び出しています。OnNextはオペレータによって様々なので、abstructのままですね。

そしてこのあたりが若干消化不良を起こします。GetForwarderは一体なんの役割が...?

internal abstract class Sink<TSource, TTarget> : Sink<TTarget>, IObserver<TSource>
{   
    //略
    public IObserver<TTarget> GetForwarder() => new _(this);

    private sealed class _ : IObserver<TTarget>
    {
        private readonly Sink<TSource, TTarget> _forward;

        public _(Sink<TSource, TTarget> forward)
        {
            _forward = forward;
        }

        public void OnNext(TTarget value)
        {
            _forward.ForwardOnNext(value);
        }

        public void OnError(Exception error)
        {
            _forward.ForwardOnError(error);
        }

        public void OnCompleted()
        {
            _forward.ForwardOnCompleted();
        }
    }
    //略
}

よくわからんので次に行きます。Sink<TSource, TTarget>を継承しているSelectorクラスの内部クラスの_クラスを見ていきます。 再掲になりますが。

internal sealed class _ : Sink<TSource, TResult>
{
    private readonly Func<TSource, TResult> _selector;

    public _(Func<TSource, TResult> selector, IObserver<TResult> observer)
        : base(observer)
    {
        _selector = selector;
    }

    public override void OnNext(TSource value)
    {
        var result = default(TResult);
        try
        {
            result = _selector(value);
        }
        catch (Exception exception)
        {
            ForwardOnError(exception);
            return;
        }

        ForwardOnNext(result);
    }
}

ここでようやくOnNextが実装されました。親クラスで定義されたForwardOnNextが呼ばれてOnNextがつながっていく感じがつかめます。

Selectorクラスまで戻ってきました。が、Selectorクラス内部の処理は一旦おいておいて、Selectorクラスが継承しているProducerクラスを手繰っていきます。

これも再掲になりますが。

/// <summary>
/// Interface with variance annotation; allows for better type checking when detecting capabilities in SubscribeSafe.
/// </summary>
/// <typeparam name=&quot;TSource&quot;>Type of the resulting sequence's elements.</typeparam>
internal interface IProducer<out TSource> : IObservable<TSource>
{
    IDisposable SubscribeRaw(IObserver<TSource> observer, bool enableSafeguard);
}


internal abstract class Producer<TTarget, TSink> : IProducer<TTarget>
    where TSink : IDisposable
{
    /// <summary>
    /// Publicly visible Subscribe method.
    /// </summary>
    /// <param name=&quot;observer&quot;>Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param>
    /// <returns>IDisposable to cancel the subscription. This causes the underlying sink to be notified of unsubscription, causing it to prevent further messages from being sent to the observer.</returns>
    public IDisposable Subscribe(IObserver<TTarget> observer)
    {
        if (observer == null)
        {
            throw new ArgumentNullException(nameof(observer));
        }

        return SubscribeRaw(observer, enableSafeguard: true);
    }

    public IDisposable SubscribeRaw(IObserver<TTarget> observer, bool enableSafeguard)
    {
        ISafeObserver<TTarget> safeObserver = null;

        //
        // See AutoDetachObserver.cs for more information on the safeguarding requirement and
        // its implementation aspects.
        //
        if (enableSafeguard)
        {
            observer = safeObserver = SafeObserver<TTarget>.Wrap(observer);
        }

        var sink = CreateSink(observer);

        safeObserver?.SetResource(sink);

        if (CurrentThreadScheduler.IsScheduleRequired)
        {
            CurrentThreadScheduler.Instance.ScheduleAction(
                (@this: this, sink),
                tuple => tuple.@this.Run(tuple.sink));
        }
        else
        {
            Run(sink);
        }

        return sink;
    }

    /// <summary>
    /// Core implementation of the query operator, called upon a new subscription to the producer object.
    /// </summary>
    /// <param name=&quot;sink&quot;>The sink object.</param>
    protected abstract void Run(TSink sink);

    protected abstract TSink CreateSink(IObserver<TTarget> observer);
}

Schedulerまわりは今回はスルーします(これは今後の課題ということで...) safeObserver?.SetResourceはこんな具合で、

public void SetResource(IDisposable resource)
{
    Disposable.SetSingle(ref _disposable, resource);
}

Disposable.SetSingleはこんな感じの実装。

public static class Disposable
{
    //略
    /// <summary>
    /// Assigns <paramref name=&quot;value&quot; /> to <paramref name=&quot;fieldRef&quot; />.
    /// </summary>
    /// <returns>true if <paramref name=&quot;fieldRef&quot; /> was assigned to <paramref name=&quot;value&quot; /> and has not
    /// been assigned before.</returns>
    /// <returns>false if <paramref name=&quot;fieldRef&quot; /> has been already disposed.</returns>
    /// <exception cref=&quot;InvalidOperationException&quot;><paramref name=&quot;fieldRef&quot; /> has already been assigned a value.</exception>
    internal static bool SetSingle(ref IDisposable fieldRef, IDisposable value)
    {
        var result = TrySetSingle(ref fieldRef, value);

        if (result == TrySetSingleResult.AlreadyAssigned)
        {
            throw new InvalidOperationException(Strings_Core.DISPOSABLE_ALREADY_ASSIGNED);
        }

        return result == TrySetSingleResult.Success;
    }

    /// <summary>
    /// Tries to assign <paramref name=&quot;value&quot; /> to <paramref name=&quot;fieldRef&quot; />.
    /// </summary>
    /// <returns>A <see cref=&quot;TrySetSingleResult&quot;/> value indicating the outcome of the operation.</returns>
    internal static TrySetSingleResult TrySetSingle(ref IDisposable fieldRef, IDisposable value)
    {
        var old = Interlocked.CompareExchange(ref fieldRef, value, null);
        if (old == null)
        {
            return TrySetSingleResult.Success;
        }

        if (old != BooleanDisposable.True)
        {
            return TrySetSingleResult.AlreadyAssigned;
        }

        value?.Dispose();
        return TrySetSingleResult.Disposed;
    }
    //略
}

といった具合でobserverにsink(IDisposable)を参照として持たせている感じになりますね。その他の関数はabstructなので実際の実装であるところのSelectorを見ると、Sink型をnewして、sinkのRunを呼んでいるだけだとわかります。

ということで一通り実装を眺めることに成功しましたが、個々の実装が頭の中でまだつながっていないので次回に流れを追います。