2

更新 - 解決済み

最終的な解決策はブランドンの提案とは少し異なりますが、彼の答えは私を正しい軌道に乗せました。

class State
{
  public int Offset { get; set; }
  public HashSet<string> UniqueImageUrls = new HashSet<string>();
}

public IObservable<TPicture> GetPictures(ref object _state)
{
  var localState = (State) _state ?? new State();
  _state = localState;

  return Observable.Defer(()=>
  {
    return Observable.Defer(() => Observable.Return(GetPage(localState.Offset)))
      .SubscribeOn(TaskPoolScheduler.Default)
      .Do(x=> localState.Offset += 20)
      .Repeat()
      .TakeWhile(x=> x.Count > 0)
      .SelectMany(x=> x)
      .Where(x=> !localState.UniqueImageUrls.Contains(x.ImageUrl))
      .Do(x=> localState.UniqueImageUrls.Add(x.ImageUrl));
  });
}

IList<TPicture> GetPage(int offset)
{
  ... 
  return result;
}

元の質問

私は現在、次の問題に苦しんでいます。以下に示す PictureProvider の実装は、実際のデータを提供するバックエンド サービスのページング結果に使用されるオフセット変数を操作しています。私が実装したいのは、オブザーバブルの消費者が現在のオフセットを利用できるようにして、後で正しいオフセットでオブザーバブル シーケンスを再開できるようにするエレガントなソリューションです。再開は、GetPictures() のintialState引数によってすでに説明されています。

RX のような方法でコードを改善するための推奨事項も歓迎されます。ここで Task.Run() が適切かどうかは実際にはわかりません。

  public class PictureProvider :
    IPictureProvider<Picture>
  {
    #region IPictureProvider implementation

    public IObservable<Picture> GetPictures(object initialState)
    {
      return Observable.Create<Picture>((IObserver<Picture> observer) =>
      {
        var state = new ProducerState(initialState);
        ProducePictures(observer, state);
        return state;
      });
    }

    #endregion

    void ProducePictures(IObserver<Picture> observer, ProducerState state)
    {
      Task.Run(() =>
      {
        try
        {
          while(!state.Terminate.WaitOne(0))
          {
            var page = GetPage(state.Offset);

            if(page.Count == 0)
            {
              observer.OnCompleted();
              break;
            }

            else
            {
              foreach(var picture in page)
                observer.OnNext(picture);


              state.Offset += page.Count;
            }
          }
        }

        catch (Exception ex)
        {
          observer.OnError(ex);
        }

        state.TerminateAck.Set();
      });
    }

    IList<Picture> GetPage(int offset)
    {
      var result = new List<Picture>();

      ... boring web service call here

      return result;
    }

    public class ProducerState :
      IDisposable
    {
      public ProducerState(object initialState)
      {
        Terminate = new ManualResetEvent(false);
        TerminateAck = new ManualResetEvent(false);

        if(initialState != null)
          Offset = (int) initialState;
      }

      public ManualResetEvent Terminate { get; private set; }
      public ManualResetEvent TerminateAck { get; private set; }

      public int Offset { get; set; }

      #region IDisposable implementation

      public void Dispose()
      {
        Terminate.Set();
        TerminateAck.WaitOne();

        Terminate.Dispose();
        TerminateAck.Dispose();
      }

      #endregion
    }
  }
4

2 に答える 2

1

インターフェイスをリファクタリングして、状態をデータの一部として生成することをお勧めします。これで、クライアントは中断したところから再サブスクライブするために必要なものを手に入れました。

ManualResetEventまた、Rx の使用を開始すると、同期プリミティブの使用がほとんど必要ないことがわかるはずです。各ページの取得が独自のものになるようにコードをリファクタリングするとTask、その同期コードをすべて削除できます。

また、 で「退屈な Web サービス」を呼び出している場合は、GetPage非同期にします。Task.Runこれにより、他の利点の中でも電話をかける必要がなくなります。

これは、.NET 4.5 async/await 構文を使用してリファクタリングされたバージョンです。async/await なしで実行することもできます。Webサービス呼び出しを非同期に変換できない場合に備えGetPageAsyncて使用するメソッドも追加しましたObservable.Run

/// <summary>A set of pictures</summary>
public struct PictureSet
{
    public int Offset { get; private set; }
    public IList<Picture> Pictures { get; private set; }

    /// <summary>Clients will use this property if they want to pick up where they left off</summary>
    public int NextOffset { get { return Offset + Pictures.Count; } }
    public PictureSet(int offset, IList<Picture> pictures)
        :this() { Offset = offset; Pictures = pictures; }
}

public class PictureProvider : IPictureProvider<PictureSet>
{
    public IObservable<PictureSet> GetPictures(int offset = 0)
    {
        // use Defer() so we can capture a copy of offset
        // for each observer that subscribes (so multiple
        // observers do not update each other's offset
        return Observable.Defer<PictureSet>(() =>
        {
            var localOffset = offset;
            // Use Defer so we re-execute GetPageAsync()
            // each time through the loop.
            // Update localOffset after each GetPageAsync()
            // completes so that the next call to GetPageAsync()
            // uses the next offset
            return Observable.Defer(() => GetPageAsync(localOffset))
                .Select(pictures =>
                    {
                        var s = new PictureSet(localOffset, pictures);
                        localOffset += pictures.Count;
                    })
                .Repeat()
                .TakeWhile(pictureSet => pictureSet.Pictures.Count > 0);
        });
    }

    private async Task<IList<Picture>> GetPageAsync(int offset)
    {
        var data = await BoringWebServiceCallAsync(offset);
        result = data.Pictures.ToList();
    }

    // this version uses Observable.Run() (which just uses Task.Run under the hood)
    // in case you cannot convert your
    // web service call to be asynchronous
    private IObservable<IList<Picture>> GetPageAsync(int offset)
    {
        return Observable.Run(() =>
        {
            var result = new List<Picture>();
            ... boring web service call here
            return result;
        });
    }
}

クライアントは、SelectMany呼び出しを追加してIObservable<Picture>. 必要に応じて保存することもできますpictureSet.NextOffset

pictureProvider
    .GetPictures()
    .SelectMany(pictureSet => pictureSet.Pictures)
    .Subscribe(picture => whatever);
于 2013-07-01T03:26:00.493 に答える
0

サブスクリプションの状態を保存する方法を考える代わりに、入力の状態を再生する方法を考えます (つまり、再開時に再サブスクライブして現在の状態に追いつくだけのシリアル化可能な ReplaySubject を作成しようとします)。 )。

于 2013-06-30T18:44:50.730 に答える