更新 - 解決済み
最終的な解決策はブランドンの提案とは少し異なりますが、彼の答えは私を正しい軌道に乗せました。
class State
{
public int Offset { get; set; }
public HashSet<string> UniqueImageUrls = new HashSet<string>();
}
public IObservable<TPicture> GetPictures(ref object _state)
{
var localState = (State) _state ?? new State();
_state = localState;
return Observable.Defer(()=>
{
return Observable.Defer(() => Observable.Return(GetPage(localState.Offset)))
.SubscribeOn(TaskPoolScheduler.Default)
.Do(x=> localState.Offset += 20)
.Repeat()
.TakeWhile(x=> x.Count > 0)
.SelectMany(x=> x)
.Where(x=> !localState.UniqueImageUrls.Contains(x.ImageUrl))
.Do(x=> localState.UniqueImageUrls.Add(x.ImageUrl));
});
}
IList<TPicture> GetPage(int offset)
{
...
return result;
}
元の質問
私は現在、次の問題に苦しんでいます。以下に示す PictureProvider の実装は、実際のデータを提供するバックエンド サービスのページング結果に使用されるオフセット変数を操作しています。私が実装したいのは、オブザーバブルの消費者が現在のオフセットを利用できるようにして、後で正しいオフセットでオブザーバブル シーケンスを再開できるようにするエレガントなソリューションです。再開は、GetPictures() のintialState引数によってすでに説明されています。
RX のような方法でコードを改善するための推奨事項も歓迎されます。ここで Task.Run() が適切かどうかは実際にはわかりません。
public class PictureProvider :
IPictureProvider<Picture>
{
#region IPictureProvider implementation
public IObservable<Picture> GetPictures(object initialState)
{
return Observable.Create<Picture>((IObserver<Picture> observer) =>
{
var state = new ProducerState(initialState);
ProducePictures(observer, state);
return state;
});
}
#endregion
void ProducePictures(IObserver<Picture> observer, ProducerState state)
{
Task.Run(() =>
{
try
{
while(!state.Terminate.WaitOne(0))
{
var page = GetPage(state.Offset);
if(page.Count == 0)
{
observer.OnCompleted();
break;
}
else
{
foreach(var picture in page)
observer.OnNext(picture);
state.Offset += page.Count;
}
}
}
catch (Exception ex)
{
observer.OnError(ex);
}
state.TerminateAck.Set();
});
}
IList<Picture> GetPage(int offset)
{
var result = new List<Picture>();
... boring web service call here
return result;
}
public class ProducerState :
IDisposable
{
public ProducerState(object initialState)
{
Terminate = new ManualResetEvent(false);
TerminateAck = new ManualResetEvent(false);
if(initialState != null)
Offset = (int) initialState;
}
public ManualResetEvent Terminate { get; private set; }
public ManualResetEvent TerminateAck { get; private set; }
public int Offset { get; set; }
#region IDisposable implementation
public void Dispose()
{
Terminate.Set();
TerminateAck.WaitOne();
Terminate.Dispose();
TerminateAck.Dispose();
}
#endregion
}
}