5

たくさんのRSSアイテムを取得するためにReactiveExtensionsを試しています。私はTimGreenfieldによるブログ投稿に基づいています:MVVM内のSilverlightRxDataClient

デスクトップアプリケーション内で使用していますが、コードは似ています。

私が抱えている問題は、Retry()機能を理解することです。それは私が期待していること、そして私が期待していることをしているようには見えません。

var items = new List<RssItem>();
WebHelper.DownloadXmlFileAsync<RssItem>(new Uri(URI), "item")
    .Retry(2)
    .Finally(PublishResults)
    .Subscribe(items.Add, ProcessError, () => ProcessCompleted(items));

有効なURIを渡すと、これは問題なく機能します。URIにタイプミスをすると、ProcessError()予想どおり、関数を介して404エラーが報告されますが、報告されるのは1回だけです。このエラーが2回表示されると思っていました。

したがって、このRetry()関数は私のWebリクエストでは機能していないようですが、に渡される関数には実際に適用されているようSubscribe()です。私はここで間違っている可能性があります。

Retry()通話がWebリクエストに適用されることを確認するにはどうすればよいですか?

追加コード:

public static class WebHelper
{
    public static HttpWebRequest CreateHttp(Uri uri)
    {
        return CreateHttp(uri, "GET");
    }

    public static HttpWebRequest CreateHttp(Uri uri, string method)
    {
        if (uri.Scheme != Uri.UriSchemeHttp && uri.Scheme != Uri.UriSchemeHttps)
        {
            throw new ArgumentException("The specified URI does not use HTTP or HTTPS.", "uri");
        }

        var request = (HttpWebRequest)WebRequest.Create(uri);
        request.Method = method;

        return request;
    }

    public static IObservable<T> DownloadXmlFileAsync<T>(Uri uri, string elementName) where T : class
    {
        return (from request in Observable.Return(CreateHttp(uri))
                from response in Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()
                let stream = response.GetResponseStream()
                where stream != null
                from item in XmlReader.Create(stream).GetXmlItem<T>(elementName).ToObservable()
                select item);
    }
}

public static class XmlExtensions
{
    public static IEnumerable<T> GetXmlItem<T>(this XmlReader reader, string elementName) where T : class
    {
        var serializer = new XmlSerializer(typeof (T));
        while (reader.GoToElement(elementName))
        {
            yield return serializer.Deserialize(reader) as T;
        }
    }

    public static bool GoToElement(this XmlReader reader, string elementName)
    {
        do
        {
            if (reader.NodeType == XmlNodeType.Element && reader.Name == elementName)
            {
                return true;
            }
        } while (reader.Read());

        return false;
    }
}

XmlRoot("item")]
public class RssItem
{
    [XmlElement("description")]
    public string Description { get; set; }

    [XmlElement("link")]
    public string Link { get; set; }

    [XmlElement("pubDate")]
    public string PublishDate { get; set; }

    [XmlElement("title")]
    public string Title { get; set; }

    public override string ToString()
    {
        return string.Format("Title: {0}", Title);
    }
}
4

2 に答える 2

16

シーケンスのRx文法は、次のように定義されます。

OnNext *(OnError | OnCompleted)?

OnErrorまたはのいずれかを受信するOnCompletedと、シーケンスの終了が通知され、パイプラインのサブスクリプションは破棄されると予想されます。

演算子のコンテキストでは:

observable.Retry(n)observableis:を受信したときにOnError、最大n回まで再サブスクライブします。

observable.Finally(action)は:action受信時に実行OnError|OnCompleted

再試行は、コールドオブザーバブル(Lee Campbellこれについて良い投稿をしています)で使用することを目的としています。サブスクリプションにより、基本的にソースが開始されます。

同様に、を受信すると再サブスクライブすることを除いて、Repeatまったく同じです。RetryOnCompleted

これが実際に動作していることを確認するために、最初のn回は「失敗」し、その後成功するオブザーバブルを作成できます。さて、いくつかのコードについて:

    private static IObservable<int> ErrorProducer(int i)
    {
        int count = 0;
        return Observable.Create<int>(observer =>
        {
            Console.WriteLine("Doing work");

            if (count++ < i)
            {
                Console.WriteLine("Failed");
                observer.OnError(new Exception());
            }
            else
            {
                Console.WriteLine("Done");
                observer.OnNext(count);
                observer.OnCompleted();                    
            }
            return Disposable.Empty;
        });
    }

常に失敗するプロデューサーの場合:

      print(ErrorProducer(3).Retry(2));

与える:

Doing work <-- Subscription
Failed
Doing work <-- Resubscription
Failed
OnError(System.Exception)
Finally

最終的に成功するプロデューサーの場合:

print(ErrorProducer(2).Retry(3));

Doing work
Failed
Doing work
Failed
Doing work
Done
OnNext(3) <-- Succeeded
OnCompleted()
Finally

プロセスエラー関数を再試行回数だけ呼び出す場合は、の前に配置する必要がありRetryます。

すなわち、seq.Do(value => { }, exception => { }).Retry(n)

ホット/コールドオブザーバブルの使用、およびRxでの非同期パターンの使用について理解を深めることができます。

于 2012-10-18T16:35:46.853 に答える
4

アスティの答えは的確です。単一の論理シーケンスに対して複数のエラーを公開する方法を知りたい場合に備えて、いくつかの情報を追加したいと思います。

Astiが指摘しているように、シーケンスを終了できるのは1回だけです。この終了は、エラーまたは完了(OnError | OnCompleted)のいずれかです。

ただし、観察可能なシーケンスをネストすることを妨げるものは何もありません。複数のエラーメッセージを表示したい場合は、を返したシナリオを検討してくださいIObservable<IObservable<T>>。内側のシーケンスはデータシーケンス(現在持っているシーケンス)です。このシーケンスがエラーになると、使用できなくなるため、外部シーケンスが新しい内部データシーケンスを生成する可能性があります。

これは少し奇妙に思えるかもしれませんが、MergeやSwitchなどの演算子はすでにこれらのネストされたシーケンスに対応しているため、Rxでサポートされている概念です。このスタイルのRxは、私の本、ネストされたシーケンスの段落のIntroToRxで触れられており、その後、偶然の一致のシーケンスの章でさらに詳しく説明されています。

これが将来のRxの方法の他の可能性を理解するのに役立つことを願っています。

于 2012-10-22T09:00:06.607 に答える