0

私はコンテンツをプッシュする頻度が高い Observable をサブスクライブしました。これらのコンテンツはネットワーク I/O からのものであるため、各プッシュはもともと異なるスレッドからのものでし。他のコンテンツは受信されないため、コード サンプルは次のようになります。

        IDisposable dsp = null;
        dsp = TargetObservable.Subscribe((incomingContent) =>
        {
            if (incomingContent == "something")
            {
                myList.Add(incomingContent);
                dsp.Dispose();
            }
            else
            {
                otherList.Add(incomingContent);
            }
        });

今のところ、OnNext は明らかにスレッド セーフではありません。つまり、Observer が「何か」を取得し、Dispose() を呼び出す直前に、他のコンテンツがまだ着信して「otherList」に追加される可能性があります。 .)' 'onNext(...)' 全体。
これは私たちが望んでいないので、これを避けるためのアイデアはありますか? 私が考えることができる 1 つの方法は、(「ロック」を使用して) コンテンツを 1 つずつプッシュするように Observable を変更することです。これにより、パフォーマンスが大幅に低下するはずです。ありがとう。

4

3 に答える 3

5

Rx を使用するには、 Rx ガイドラインに従う必要があります。あなたの場合、4.2に問題があります。オブザーバーインスタンスがシリアル化された方法で呼び出され、解決策は、Synchronize基本的lockに回避したいものを導入することです。コードにステートメントを含める余裕がない場合は、lockネットワーク イベントを Rx に送信する前に、独自の「簡単な」同期を記述する必要があります。

同期シーケンスを使用すると、次のOnNextような Rx LINQ 演算子を使用してハンドラーのコードを簡素化できTakeWhileます。

var subscription = TargetObservable
  .Synchronize()
  .TakeWhile(incomingContent => incomingContent != "something"))
  .Subscribe( ... );

または、独自の演算子を作成TakeWhileInclusiveして、述語が false である最後の項目を含めることができます。

static class ObservableExtensions {

  public static IObservable<TSource> TakeWhileInclusive<TSource>(
       this IObservable<TSource> source, 
       Func<TSource, Boolean> predicate) {
    return Observable
      .Create<TSource>(
        observer => source.Subscribe(
          item => {
            observer.OnNext(item);
            if (!predicate(item))
              observer.OnCompleted();
          },
          observer.OnError,
          observer.OnCompleted
        )
      );
  }
}
于 2013-09-16T08:39:12.950 に答える
1

非 Rx コードと Rx コードを混在させています。サブスクライブ内でサブスクリプションを破棄することは避けてください。

やりたいことをする私の好みの方法は次のとおりです。

TargetObservable
    .TakeWhile(x => x != "something")
    .Subscribe(otherList.Add);

TargetObservable
    .Where(x => x == "something")
    .Take(1)
    .Subscribe(myList.Add);

両方のサブスクリプションが自動的にサブスクライブ解除されるため、これで完了です。TargetObservableソースが共有ストリームを生成しない場合Publish、共有オブザーバブルを作成するために使用する必要がある場合があります。

または、次のようにすることもできます。

TargetObservable
    .Do(x =>
    {
        if (x != "something") otherList.Add(x);
    })
    .Where(x => x == "something")
    .Take(1)
    .Subscribe(x => myList.Add(x));
于 2013-09-18T03:42:33.713 に答える
-1

Throttle はその仕事を行うことができるかもしれません: http://rxwiki.wikidot.com/101samples#toc30

于 2013-09-16T07:15:37.170 に答える