3

並行して処理したい監視可能なコレクションがあり、フィルタリング中に処理された値を監視し、最後にフィルタリングされた値を受け取るハンドラーをサブスクライブします。

私のサンプルは構文的に正しく、正常にコンパイルされます。コードを実行するとWhere、フィルタリングを実行するステートメントが評価されます。ただし、サブスクリプションにはデータが届きません。を削除AsParallelして処理が通常どおりに行われるようにするIEnumerableと、データが通過し、すべてが期待どおりに機能します。

これが私のサンプルで、文字列に対していくつかの処理を行っています。

// Generate some data every second
var strings = Observable.Generate(() =>
    new TimeInterval<Notification<string>>(
        new Notification<string>
            .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));

// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                      select "Parallel " + value;

// Filter and observe
var data = String.Empty;
parallelStrings
    .Where(value => !String.IsNullOrEmpty(value))
    .ToObservable()
    .Subscribe(value => data = value);

次の奇妙なことは、TakeWhile演算子を使用する場合です。これは、概念的にはWhereに似ていますが、ParallelQueryを観察すると期待どおりに機能します。

// Filter and observe
var data = String.Empty;
parallelStrings
    .TakeWhile(cs => !String.IsNullOrEmpty(cs))
    .ToObservable()
    .Subscribe(value => data = value);

サブスクリプションにログコードを追加すると、データは変換まで受信されますが、ToObservable変換後は受信されないことが示されます。

1.    var data = String.Empty;
2.    parallelStrings
3.        .Where(value => !String.IsNullOrEmpty(value))
4.        .Select(value => value)
5.        .ToObservable()
6.        .Select(value => value)
7.        .Subscribe(value => data = value);

4行目のラムダのブレークポイントはヒットしますが、6行目のラムダのブレークポイントはヒットしません。

なぜTakeWhileデータが加入者に届くのにWhere届かないのですか?

重要な場合は、.Net4.0Frameworkクライアントプロファイルを対象としたプロジェクトを使用してVisualStudio2010RCでコードを開発します。

更新: @Sergeysの回答に基づいて、フィルターの配置を作り直しましたWhere。次のコードは期待どおりに機能します。

var processedStrings = from value in strings
                       let processedValue = "Parallel " + value
                       where !String.IsNullOrEmpty(processedValue)
                       select processedValue;

var data = String.Empty;
processedStrings
    .ToEnumerable()
    .AsParallel()
    .ToObservable()
    .Subscribe(value => data = value );

processedStrings最初に最初のオブザーバブルを列挙可能に変換して並列化し、次にそれをオブザーバブルに変換して最終結果をサブスクライブする必要があるのは、まだ少し厄介です。

4

2 に答える 2

2

TakeWhileWhere順序に依存するため、概念的にはと同等ではありません。クエリは実際には順番に実行されていると思います(このブログ投稿を参照)。.WithExecutionMode(ParallelExecutionMode.ForceParallelism)あなたの例で電話してみてくださいTakeWhile、そして私はあなたが同じ結果を見るだろうと思います。

並列の場合になぜ機能しないのかわかりませんが...データがどこまで到達するかを確認するためにログを記録することをお勧めしますか?たとえば、ログを記録した後に元のアイテムを返すSelectを使用して、便利なログを実行できます。

于 2010-02-18T10:59:28.530 に答える
2

一言で言えばC#4.0から:


現在、PLINQが並列化できるものにはいくつかの実際的な制限があります。これらの制限は、後続のサービスパックおよびフレームワークバージョンで緩和される可能性があります。次のクエリ演算子は、ソース要素が元のインデックス位置にない限り、クエリが並列化されるのを防ぎます。

  • Take、TakeWhile、Skip、SkipWhile
  • Select、SelectMany、およびElementAtのインデックス付きバージョン

ほとんどのクエリ演算子は、要素のインデックス位置を変更します(Whereなどの要素を削除するものを含む)。つまり、前述の演算子を使用する場合は、通常、クエリの先頭に配置する必要があります。


したがって、実際には、TakeWhileを使用すると、.AsParallel()が並列化されなくなります。Whereが添え字を削除する理由を言うのは難しいですが、AsParallelの前に置くと問題解決する可能性があります。

于 2010-02-18T17:13:47.510 に答える