ねののお庭。

かりかりもふもふ。

Reactive Extensionsのメソッドチェーンの後ろ側が気になったので見ていく。

Rx難しいね。

Rx、使う分にはぺちぺちメソッドチェーンで幸せいっぱいなのですが、IObserverとIObservableがどういうふうに連結されていっているのかイマイチ想像がつかなかったので、オペレータ自作という形で調べてみたり動かしてみたりデバッグしてみたりしていました。

ぶっちゃけ2011年や2012年に解説記事をneueccさんxin9leさんが書いていくれているので2019年にもなって書く意味なくないかとか思いつつ、バカなので一読してもイマイチわからなかったのでデバッグしていてわかりやすいように改造しつつ、ブレイクポイント置いて眺めていたりしたのでその記録を書き記します。

今回のソースコードこちら(github)

SelectとWhereを作ってみます。

AnonymousObserver/AnonymousObservableをやめる

AnonymousObserver/AnonymousObservableは実装にあたっては便利なのですが、型を分けたほうがデバッグ上追いやすいので名前を変えてみます。追いやすくするためだけなので実装は同じで型の名前だけ違うものをコピペ量産します。ObserverがこっちでObservableがこっち。 そしてSelectとWhereの拡張メソッドを以下のようにします。

public static class Extension
{
    public static IObservable<T> MyWhere<T> (this IObservable<T>  source, Func<T, bool>  predicate)
    {
        return new WhereObservable<T>(observer => 
        {
            return source.Subscribe(
                new WhereObserver<T>(value => 
                    {
                        //--- ここはOnNext実行時に呼び出される
                        if (predicate(value))
                        {
                            observer.OnNext(value);
                        }
                    },
                    observer.OnError,       //--- OnErrorと
                    observer.OnCompleted)); //--- OnCompletedは何もせず素通し
        });
    }

    public static IObservable<TResult> MySelect<T,TResult>(this IObservable<T> source, Func<T, TResult> selector)
    {
        //Observableは値を送信する側なので、変換された値を送信するためTResult型
        return new SelectObservable<TResult>(observer => 
        {
            return source.Subscribe(
                //Observerは上流のObservableから値を受け取るのでT型
                new SelectObserver<T>(value => 
                    {
                        //--- ここはOnNext実行時に呼び出される
                        observer.OnNext(selector(value));
                    },
                    observer.OnError,       //--- OnErrorと
                    observer.OnCompleted)); //--- OnCompletedは何もせず素通し
        });
    }
}

デバッグしていく。

で次のような感じで試してみます。

var subject = new Subject<string>();

var stream = subject
    .MyWhere(x => x.Contains("hoge"))
    .MySelect(x => x.Length);

stream.Subscribe(new EndPointObserver<int>(
    x => Console.WriteLine(x),
    e => Console.WriteLine(e.Message),
    () => Console.WriteLine("complted")));

最後のSubscribeでは拡張メソッドを使わず、直接Observerをnewします。これもデバッグするとき何が入ってくるのか分かりやすくするため。 streamで受けているところの実態は、SelectObservableなのでそのSubscribeメソッドにブレイクポイント置いて実行してみます。

observerにEndPointObserver。ここでステップインします。

そうするとこのような感じになり、sourceが`WhereObservable`、observerが`EndPointObserver`であることが分かります。これは`SelectObservable`にコンストラクタでSubscribeが登録されたとき、クロージャが呼び出し元であるsourceをキャプチャしているからできることですね。クロージャありがとう。そうすると次は`SelectorObserver`が生成され、`WhereObservable`の実態であるsourceにSubscribeで渡されます。そうすると、次はMyWhereで登録したクロージャが呼ばれ、以下のような感じになります。

observerが`SelectObserver`、sourceがsubjectであることが分かります。これで根本であるsubjectまで大体たどり着けたので追うのはこのあたりでやめます。 こうしてSubscribeが連鎖して、末端から根本(subject)までストリームが構築されていくのが分かりました。 こうしてsubjectのOnNextが呼ばれると、それぞれのクロージャでキャプチャobserverに登録されている、observer.OnNextが発火されOnNext->OnNext->OnNext->...みたいにOnNextが連鎖していくことがわかりました。なるほどよくできてる。

感想

英知を感じた(小並感) ここまでやってようやく冒頭のお二人が何を言っているのかを理解したのですが、Rxゴリゴリ使っている皆さんこれくらいパッと理解できてしまってるんですかね(解説記事すくない気がするし多分そういうこと)。修行が足りないなとひしひしと実感。