序章
最初に認めておきたいのは、私は .NET の専門家であり、このアプローチが Java に直接対応するものがないいくつかのイディオムを使用していることを知っていることです。しかし、私はあなたの言葉を信じて、これは .NET 関係者が喜ぶ素晴らしい質問であり、私が見たことのない rx-java の正しい道にあなたを導いてくれることを願っています。これは非常に長い回答ですが、ほとんどが説明です。ソリューション コード自体はかなり短いです。
どちらかの使用
このソリューションを支援するために、最初にいくつかのツールを整理する必要があります。1つ目は、Either<TLeft, TRight>
型の使用です。これは重要です。なぜなら、各呼び出しには 2 つの可能な結果 (良い結果またはエラー) があるからです。ただし、これらを 1 つの型にラップする必要があります。OnError を使用してエラーを送り返すことはできません。これは結果ストリームを終了させるためです。どちらも Tuple に少し似ているため、この状況に対処しやすくなります。Rxx ライブラリにはの非常に完全で優れた実装Either
がありますが、ここでは単純な一般的な使用例に続いて、目的に適した単純な実装を示します。
var goodResult = Either.Right<Exception,int>(1);
var exception = Either.Left<Exception,int>(new Exception());
/* base class for LeftValue and RightValue types */
public abstract class Either<TLeft, TRight>
{
public abstract bool IsLeft { get; }
public bool IsRight { get { return !IsLeft; } }
public abstract TLeft Left { get; }
public abstract TRight Right { get; }
}
public static class Either
{
public sealed class LeftValue<TLeft, TRight> : Either<TLeft, TRight>
{
TLeft _leftValue;
public LeftValue(TLeft leftValue)
{
_leftValue = leftValue;
}
public override TLeft Left { get { return _leftValue; } }
public override TRight Right { get { return default(TRight); } }
public override bool IsLeft { get { return true; } }
}
public sealed class RightValue<TLeft, TRight> : Either<TLeft, TRight>
{
TRight _rightValue;
public RightValue(TRight rightValue)
{
_rightValue = rightValue;
}
public override TLeft Left { get { return default(TLeft); } }
public override TRight Right { get { return _rightValue; } }
public override bool IsLeft { get { return false; } }
}
// Factory functions to create left or right-valued Either instances
public static Either<TLeft, TRight> Left<TLeft, TRight>(TLeft leftValue)
{
return new LeftValue<TLeft, TRight>(leftValue);
}
public static Either<TLeft, TRight> Right<TLeft, TRight>(TRight rightValue)
{
return new RightValue<TLeft, TRight>(rightValue);
}
}
慣例により、成功または失敗をモデル化するためにどちらかを使用する場合、右側はもちろん「右側」であるため、成功した値に使用されることに注意してください:)
いくつかのヘルパー関数
いくつかのヘルパー関数を使用して、問題の 2 つの側面をシミュレートします。まず、ここにパラメータを生成するファクトリがあります。呼び出されるたびに、1 から始まる整数のシーケンスの次の整数が返されます。
// An infinite supply of parameters
private static int count = 0;
public int ParameterFactory()
{
return ++count;
}
次に、Rest 呼び出しを IObservable としてシミュレートする関数を次に示します。この関数は整数を受け入れます。
- 整数が偶数の場合、すぐに OnError を送信する Observable を返します。
- 整数が奇数の場合、整数を「-ret」で連結した文字列を返しますが、1 秒経過した後でのみです。これを使用して、ポーリング間隔が要求どおりに動作しているかどうかを確認します。通常の間隔ではなく、完了した呼び出し間の一時停止として、時間がかかります。
ここにあります:
// A asynchronous function representing the REST call
public IObservable<string> SomeRestCall(int x)
{
return x % 2 == 0
? Observable.Throw<string>(new Exception())
: Observable.Return(x + "-ret").Delay(TimeSpan.FromSeconds(1));
}
ナウ・ザ・グッド・ビット
以下は、私が呼び出したかなり一般的な再利用可能な関数ですPoll
。これは、ポーリングされる非同期関数、その関数のパラメーター ファクトリ、目的の休憩 (しゃれは意図されていません!) 間隔、そして最後に使用する IScheduler を受け入れます。
私が思いついた最も簡単な方法Observable.Create
は、スケジューラを使用して結果ストリームを駆動することです。ScheduleAsync
.NET async/await フォームを使用するスケジューリングの方法です。これは、命令型の方法で非同期コードを記述できるようにする .NET イディオムです。このキーワードは、本体で 1 つ以上の非同期呼び出しasync
を実行できる非同期関数を導入し、呼び出しが完了したときにのみ続行します。この質問で、このスタイルのスケジューリングについて長い説明を書きました。これには、rx-java アプローチで実装しやすい古い再帰的なスタイルが含まれます。コードは次のようになります。await
public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
Func<TArg, IObservable<TResult>> asyncFunction,
Func<TArg> parameterFactory,
TimeSpan interval,
IScheduler scheduler)
{
return Observable.Create<Either<Exception, TResult>>(observer =>
{
return scheduler.ScheduleAsync(async (ctrl, ct) => {
while(!ct.IsCancellationRequested)
{
try
{
var result = await asyncFunction(parameterFactory());
observer.OnNext(Either.Right<Exception,TResult>(result));
}
catch(Exception ex)
{
observer.OnNext(Either.Left<Exception, TResult>(ex));
}
await ctrl.Sleep(interval, ct);
}
});
});
}
これを分解するとObservable.Create
、一般に IObservables を作成するためのファクトリがあり、結果がオブザーバーにポストされる方法を大幅に制御できます。プリミティブの不必要に複雑な構成を支持して見落とされることがよくあります。
この場合、これを使用して のストリームを作成しEither<TResult, Exception>
、成功したポーリング結果と失敗したポーリング結果を返すことができるようにします。
このCreate
関数は、OnNext/OnError/OnCompleted を介して結果を渡すサブスクライバーを表すオブザーバーを受け入れます。IDisposable
呼び出し内でを返す必要がありCreate
ます。.NET では、これはサブスクライバーがサブスクリプションをキャンセルできるハンドルです。ここで特に重要なのは、そうでなければ Polling が永遠に続くか、少なくとも永遠に続くことはないからですOnComplete
。
ScheduleAsync
(またはプレーン)の結果は、Schedule
そのようなハンドルです。破棄されると、スケジュールされた保留中のイベントがキャンセルされ、ポーリング ループが終了します。この場合、間隔を管理するために使用するのはキャンセル可能な操作ですが、Poll 関数は a も受け入れるSleep
キャンセル可能な操作を受け入れるように簡単に変更できます。asyncFunction
CancellationToken
ScheduleAsync メソッドは、イベントをスケジュールするために呼び出される関数を受け入れます。2 つの引数が渡されます。最初の引数ctrl
はスケジューラ自体です。2 つ目ct
は、キャンセルが要求されたかどうかを確認するために使用できる CancellationToken です (サブスクライバーがサブスクリプション ハンドルを破棄することによって)。
ポーリング自体は、キャンセルが要求されたことを CancellationToken が示している場合にのみ終了する無限の while ループを介して実行されます。
ループでは、async/await の魔法を使用して、ポーリング関数を非同期的に呼び出しながら、例外ハンドラーでラップすることができます。これはすごい!エラーがないと仮定すると、結果を an の正しい値としてEither
を介してオブザーバに送信しOnNext
ます。例外があった場合、それを an の左の値としてEither
オブザーバーに送信します。最後にSleep
、スケジューラの関数を使用して、休息間隔の後にウェイクアップ コールをスケジュールします。コールと混同しないようにThread.Sleep
、通常、このコールはスレッドをブロックしません。CancellationToken
Sleep は、中止される有効化も受け入れることに注意してください。
これは、非常にトリッキーな問題を単純化するための async/await の非常に優れた使用法であることに同意していただけると思います。
使用例
Poll
最後に、サンプル出力とともに、を呼び出すテスト コードをいくつか示します。
void Main()
{
var subscription = Poll(SomeRestCall,
ParameterFactory,
TimeSpan.FromSeconds(5),
ThreadPoolScheduler.Instance)
.TimeInterval()
.Subscribe(x => {
Console.Write("Interval: " + x.Interval);
var result = x.Value;
if(result.IsRight)
Console.WriteLine(" Success: " + result.Right);
else
Console.WriteLine(" Error: " + result.Left.Message);
});
Console.ReadLine();
subscription.Dispose();
}
Interval: 00:00:01.0027668 Success: 1-ret
Interval: 00:00:05.0012461 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0009684 Success: 3-ret
Interval: 00:00:05.0003127 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0113053 Success: 5-ret
Interval: 00:00:05.0013136 Error: Exception of type 'System.Exception' was thrown.
結果の間隔は、エラーがすぐに返された場合は 5 秒 (ポーリング間隔)、成功した場合は 6 秒 (ポーリング間隔とシミュレートされた REST 呼び出しの期間) のいずれかであることに注意してください。
編集 - これは、 ScheduleAsync を使用しない代替実装ですが、古いスタイルの再帰スケジューリングを使用し、async/await 構文を使用しません。ご覧のとおり、かなり面倒ですが、asyncFunction オブザーバブルのキャンセルもサポートしています。
public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
Func<TArg, IObservable<TResult>> asyncFunction,
Func<TArg> parameterFactory,
TimeSpan interval,
IScheduler scheduler)
{
return Observable.Create<Either<Exception, TResult>>(
observer =>
{
var disposable = new CompositeDisposable();
var funcDisposable = new SerialDisposable();
bool cancelRequested = false;
disposable.Add(Disposable.Create(() => { cancelRequested = true; }));
disposable.Add(funcDisposable);
disposable.Add(scheduler.Schedule(interval, self =>
{
funcDisposable.Disposable = asyncFunction(parameterFactory())
.Finally(() =>
{
if (!cancelRequested) self(interval);
})
.Subscribe(
res => observer.OnNext(Either.Right<Exception, TResult>(res)),
ex => observer.OnNext(Either.Left<Exception, TResult>(ex)));
}));
return disposable;
});
}
.NET 4.5 の async/await 機能を回避し、Schedule 呼び出しを使用しない別のアプローチについては、私の他の回答を参照してください。
rx-java 関係者の助けになれば幸いです。