並行して処理したい監視可能なコレクションがあり、フィルタリング中に処理された値を監視し、最後にフィルタリングされた値を受け取るハンドラーをサブスクライブします。
私のサンプルは構文的に正しく、正常にコンパイルされます。コードを実行するとWhere
、フィルタリングを実行するステートメントが評価されます。ただし、サブスクリプションにはデータが届きません。を削除AsParallel
して処理が通常どおりに行われるようにするIEnumerable
と、データが通過し、すべてが期待どおりに機能します。
これが私のサンプルで、文字列に対していくつかの処理を行っています。
// Generate some data every second
var strings = Observable.Generate(() =>
new TimeInterval<Notification<string>>(
new Notification<string>
.OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));
// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
select "Parallel " + value;
// Filter and observe
var data = String.Empty;
parallelStrings
.Where(value => !String.IsNullOrEmpty(value))
.ToObservable()
.Subscribe(value => data = value);
次の奇妙なことは、TakeWhile
演算子を使用する場合です。これは、概念的にはWhereに似ていますが、ParallelQueryを観察すると期待どおりに機能します。
// Filter and observe
var data = String.Empty;
parallelStrings
.TakeWhile(cs => !String.IsNullOrEmpty(cs))
.ToObservable()
.Subscribe(value => data = value);
サブスクリプションにログコードを追加すると、データは変換まで受信されますが、ToObservable
変換後は受信されないことが示されます。
1. var data = String.Empty;
2. parallelStrings
3. .Where(value => !String.IsNullOrEmpty(value))
4. .Select(value => value)
5. .ToObservable()
6. .Select(value => value)
7. .Subscribe(value => data = value);
4行目のラムダのブレークポイントはヒットしますが、6行目のラムダのブレークポイントはヒットしません。
なぜTakeWhile
データが加入者に届くのにWhere
届かないのですか?
重要な場合は、.Net4.0Frameworkクライアントプロファイルを対象としたプロジェクトを使用してVisualStudio2010RCでコードを開発します。
更新: @Sergeysの回答に基づいて、フィルターの配置を作り直しましたWhere
。次のコードは期待どおりに機能します。
var processedStrings = from value in strings
let processedValue = "Parallel " + value
where !String.IsNullOrEmpty(processedValue)
select processedValue;
var data = String.Empty;
processedStrings
.ToEnumerable()
.AsParallel()
.ToObservable()
.Subscribe(value => data = value );
processedStrings
最初に最初のオブザーバブルを列挙可能に変換して並列化し、次にそれをオブザーバブルに変換して最終結果をサブスクライブする必要があるのは、まだ少し厄介です。