ねののお庭。

かりかりもふもふ。

System.Reactiveの中身をほんのちょっと読んで見る。(その1)

Rxの中身をほんのちょっと読んで見る。

前々回にふと気になったのでRxのチェーンがどういう感じなのかなと思ってうんたららみたいな記事を書いたのですが、今度はちゃんとSystem.Reactiveの中身をちらっと読んでいきます。これらのソースコードはApache License, Version 2.0で提供されたものをコピペしたりしています。2,3回に分けて書きます。今回は拡張メソッドの

Select(this IObservable<TSource> source, Func<TSource, TResult> selector)

を起点にたどっていきます。

s_implはどちら様?

Select関数の中身を見るとこうなっています。

public static partial class Observable
{
//〜略〜
    public static IObservable<TResult> Select<TSource, TResult>(this IObservable<TSource> source, Func<TSource, TResult> selector)
    {
        if (source == null)
        {
            throw new ArgumentNullException(nameof(source));
        }

        if (selector == null)
        {
            throw new ArgumentNullException(nameof(selector));
        }

        return s_impl.Select(source, selector);
    }
//〜略〜

s_implとは一体...。別のファイルに定義されてますね。

    public static partial class Observable
    {
#pragma warning disable IDE1006 // Naming Styles: 3rd party code is known to reflect for this specific field name
        private static IQueryLanguage s_impl = QueryServices.GetQueryImpl<IQueryLanguage>(new QueryLanguage());
#pragma warning restore IDE1006 // Naming Styles
    }

IQueryLanguageにはこんな感じの定義がズラーーーっとされています。

IObservable<TResult> Select<TSource, TResult>(
  IObservable<TSource> source,
  Func<TSource, TResult> selector);

IObservable<TResult> Select<TSource, TResult>(
  IObservable<TSource> source,
  Func<TSource, int, TResult> selector);

intがあるやつはindexとかを返すselectorですね。 そんなことより、さっさと具象クラスを探します。

QueryServices.GetQueryImpl<IQueryLanguage>(new QueryLanguage());

QueryServicesの方を実装を眺めてみます。

internal static class QueryServices
{
    private static readonly IQueryServices Services = Initialize();

    public static T GetQueryImpl<T>(T defaultInstance) => Services.Extend(defaultInstance);

    private static IQueryServices Initialize()
    {
#pragma warning disable CS0618 // Type or member is obsolete
        return PlatformEnlightenmentProvider.Current.GetService<IQueryServices>() ?? new DefaultQueryServices();
#pragma warning restore CS0618 // Type or member is obsolete
    }
}

Type or member is obsoleteとか書いてあるけどとりあえずスルー。GetQueryImpl()はServices.Extend(defaultInstance)を返しています。ServicesはIQueryServicesで受け止められています。IQueryServicesにはどんな定義がなされているか見てみると、

internal interface IQueryServices
{
    T Extend<T>(T baseImpl);
}

なんとこれだけです。baseImplとか言ってるので実際に実装されたものが投げ込まれるっぽいですね。返り値もT,引数もT。ふむ。

実際のServicesを作っているのはInitialize()なので、中身をみると、PlatformEnlightenmentProviderからいろいろ生えているので、これを眺めます。

[EditorBrowsable(EditorBrowsableState.Never)]
public static class PlatformEnlightenmentProvider
{
    private static IPlatformEnlightenmentProvider _current = CreatePlatformProvider();

    /// <summary>
    /// (Infrastructure) Gets the current enlightenment provider. If none is loaded yet, accessing this property triggers provider resolution.
    /// </summary>
    /// <remarks>
    /// This member is used by the Rx infrastructure and not meant for public consumption or implementation.
    /// </remarks>
    [Obsolete(&quot;This mechanism will be removed in the next major version&quot;, false)]
    public static IPlatformEnlightenmentProvider Current
    {
        get
        {
            return _current;
        }

        set
        {
            _current = value ?? throw new ArgumentNullException(nameof(value));
        }

    }

    private static IPlatformEnlightenmentProvider CreatePlatformProvider() => new CurrentPlatformEnlightenmentProvider();
}

CurrentはCreatePlatformProvider()によって作られているそうですね。 呼び出されているCurrentPlatformEnlightenmentProviderクラスに飛びます。

[EditorBrowsable(EditorBrowsableState.Never)]
public class CurrentPlatformEnlightenmentProvider : IPlatformEnlightenmentProvider
{
    /// <summary>
    /// (Infrastructure) Tries to gets the specified service.
    /// </summary>
    /// <typeparam name=&quot;T&quot;>Service type.</typeparam>
    /// <param name=&quot;args&quot;>Optional set of arguments.</param>
    /// <returns>Service instance or <c>null</c> if not found.</returns>
    public virtual T GetService<T>(object[] args) where T : class
    {
        var t = typeof(T);

        if (t == typeof(IExceptionServices))
        {
            return (T)(object)new ExceptionServicesImpl();
        }

#if !NO_THREAD || WINDOWS
        if (t == typeof(IConcurrencyAbstractionLayer))
        {
            return (T)(object)new ConcurrencyAbstractionLayerImpl();
        }
#endif

        if (t == typeof(IScheduler) &amp;&amp; args != null)
        {
            switch ((string)args[0])
            {
                case &quot;ThreadPool&quot;:
                    return (T)(object)ThreadPoolScheduler.Instance;
                case &quot;TaskPool&quot;:
                    return (T)(object)TaskPoolScheduler.Default;
                case &quot;NewThread&quot;:
                    return (T)(object)NewThreadScheduler.Default;
            }
        }

#if WINDOWS
        if (t == typeof(IHostLifecycleNotifications))
        {
            return (T)(object)new HostLifecycleNotifications();
        }
#endif

        if (t == typeof(IQueryServices))
        {
            //
            // We perform this Debugger.IsAttached check early rather than deferring
            // the decision to intercept query operator methods to the debugger
            // assembly that's dynamically discovered here. Also, it's a reasonable
            // expectation it'd be pretty hard to turn on interception dynamically
            // upon a debugger attach event, so we should make this check early.
            //
            // In the initial release of v2.0 (RTM), we won't have the corresponding
            // debugger assembly available yet, so the dynamic load would always
            // fail. We also don't want to take the price of (an attempt to) a dynamic
            // assembly load for the regular production case.
            //
            if (Debugger.IsAttached)
            {

#if (CRIPPLED_REFLECTION &amp;&amp; HAS_WINRT)
                var ifType = t.GetTypeInfo();
#else
                var ifType = t;
#endif
                var asm = new AssemblyName(ifType.Assembly.FullName)
                {
                    Name = &quot;System.Reactive&quot;
                };
                var name = &quot;System.Reactive.Linq.QueryDebugger, &quot; + asm.FullName;

                var dbg = Type.GetType(name, false);
                if (dbg != null)
                {
                    return (T)Activator.CreateInstance(dbg);
                }
            }
        }

        return null;
    }
}

なるほど。すこし前に示したQueryServicesクラスでは、.GetService()を呼び出していました。なのでGetService関数を追います。IQueryServicesはその他のインターフェースを実装されていないので、実際に処理が行われているのは入っていくのは

    if (t == typeof(IQueryServices))
    {
        //
        // We perform this Debugger.IsAttached check early rather than deferring
        // the decision to intercept query operator methods to the debugger
        // assembly that's dynamically discovered here. Also, it's a reasonable
        // expectation it'd be pretty hard to turn on interception dynamically
        // upon a debugger attach event, so we should make this check early.
        //
        // In the initial release of v2.0 (RTM), we won't have the corresponding
        // debugger assembly available yet, so the dynamic load would always
        // fail. We also don't want to take the price of (an attempt to) a dynamic
        // assembly load for the regular production case.
        //
        if (Debugger.IsAttached)
        {

#if (CRIPPLED_REFLECTION &amp;&amp; HAS_WINRT)
            var ifType = t.GetTypeInfo();
#else
            var ifType = t;
#endif
            var asm = new AssemblyName(ifType.Assembly.FullName)
            {
                Name = &quot;System.Reactive&quot;
            };
            var name = &quot;System.Reactive.Linq.QueryDebugger, &quot; + asm.FullName;

            var dbg = Type.GetType(name, false);
            if (dbg != null)
            {
                return (T)Activator.CreateInstance(dbg);
            }
        }
    }

    return null;
}

Debugger.IsAttachedで分岐してますね。コメントには

We perform this Debugger.IsAttached check early rather than deferring
the decision to intercept query operator methods to the debugger
assembly that's dynamically discovered here. Also, it's a reasonable
expectation it'd be pretty hard to turn on interception dynamically
upon a debugger attach event, so we should make this check early.
In the initial release of v2.0 (RTM), we won't have the corresponding
debugger assembly available yet, so the dynamic load would always
fail. We also don't want to take the price of (an attempt to) a dynamic
assembly load for the regular production case.

デバッガがオペレータを中断するときまで待つのではなく、デバッガがアタッチされているかをみてどうするかを決めていますと。それはデバッガが動的にインターセプションするよう切り替えるのが大変だからだと。たぶんそんな感じ。なので今回はデバッガはアタッチされていないということで(たぶんデバッグ用の工夫がなされていて複雑になっていそうなので)読んでいくこととします。 アセンブリ読みこんでdbg返してるようですが,明らかに追っかけづらい。 興味があるのは実際の挙動なのでシンプルな方がいい。というわけでここまで読んできたのですが、returnはnullらしいので、`class QueryServices`のInitialize関数はDefaultQueryServicesクラスが渡されていることになります。Defaultっていってるし先にこっちから読めばよかったね()

DefaultQueryServicesクラスはQueryServicesクラスの直下に以下のように定義されています。

internal sealed class DefaultQueryServices : IQueryServices
{
    public T Extend<T>(T baseImpl) => baseImpl;
}

引数で渡されたものそのまま返しているだけっぽいという...。

なのでいろいろなものを省くと、s_implはこういうことだとわかります。

IQueryLanguage s_impl = new QueryLanguage();

遠回りしたのがアホくさくなるくらいにはシンプルになる。ていうか名前から最初から割と明らか感がありますね。。。 QueryLanguageクラス、あちこちに実装が散らばっているのでおっかけづらいですが、とりあえずIQueryLanguageは実装されていることがわかります。なので上記で問題なさそうですね。ではその2で内部で使われているクラスやその役割を見ていきます。