0

まず背景として、 Duplicitiy (github.com で)という名前のオープン ソース .NET ライブラリを作成しました。これは を使用して、 FileSystemWatcher2 つのディレクトリ間ですべてのファイル変更を複製します。

実装するFileSystemObservableクラスを作成しました( FSWatcherIObservable<FileSystemChange>を使用して実際のをラップします)。ファイルまたはディレクトリが作成、変更、または削除されると、Reactive Extensions を使用して変更が公開されます。FileSystemWatcherSubject<FileSystemChange>

次に、次のサブスクリプションを使用して、このオブザーバブルをサブスクライブします。

 return observable
          .Buffer(() => observable.Throttle(TimeSpan.FromSeconds(2)).Timeout(TimeSpan.FromMinutes(1)))     
          .PrioritizeFileSystemChanges()           
          .SelectMany(x => x);

変更は、最大 1 分間、変更がない状態が少なくとも 2 秒間続くまでバッファリングされます。これは、ディレクトリを削除するときに、FileSystemWatcherが含まれているすべてのファイルとディレクトリについて通知するためです。ディレクトリ内に含まれる変更を飲み込み、サブスクライバーの親を削除するだけで、動作を最適化できます。これはPrioritizeFileSystemChangesフィルターによって処理されます。また、バッファ ウィンドウ内で作成され、その後削除されたファイルを無視することもできるため、ターゲットでの IO 操作が再び減少します。

これは、失敗/再試行をサポートしていない現時点では単純な方法ではありますが、機能します。

ただし、私の質問は、このオブザーバブルのサブスクライバーが各変更を処理するのにかなりの時間がかかる可能性があるということです。たとえば、大きなファイルを低速のファイル システムにコピーします。現在コピーされている同じファイルに対して新しいファイル システムの変更が発生した場合、進行中の操作を中止するにはどうすればよいですか。または、ファイルがバッファリングされたリストに含まれているが未処理の場合、どのように削除または除外できますか?

元のオブザーバブルへの別のサブスクリプションが必要になると思いますが、状態を共有したり、保留中のタスクを変更したりするのに最適な方法がわかりませんか? 変更は、受信した順序で処理する必要があります。これはキューを示します。ただし、新しいファイル システムの変更は、キャンセルまたは削除する必要があるキュー操作に適用される場合があります。キューは、順不同の削除用に設計されていません。

たとえば、現在ファイルFoo\Bar.txtをコピーしていて、Fooディレクトリが削除されているとします。次に、ディレクトリおよびすべてのサブディレクトリに対する進行中または保留中の変更をキャンセルする必要があります。これは Task Parallel Library の使用例でしょうか、それとも私が取れるリアクティブなアプローチはありますか?

github のプル リクエストもお待ちしております。

4

1 に答える 1

1

ここにはいくつかの目標/質問があるようです:

  1. 後の変更により不要になった以前の変更を削除します。 リンクされたリストは、これに適している可能性があります。これは、一般的なキューの使用と優れたアイテム削除パフォーマンスの両方で優れたパフォーマンスを提供します。
  2. 後の変更により不要になった進行中の操作のキャンセル。これには、再起動が必要な操作も含まれます。これには、進行中の操作をキャンセルできるライブラリを見つける必要があります。System.IO クラスはそのようなキャンセルを提供しないため、ライブラリを見つけるか、独自に作成する必要があります。
  3. これは Task Parallel Library のユースケースでしょうか、それとも私が取れるリアクティブなアプローチはありますか? あなたの言い回しは、ここにどちらか一方の選択肢があるかのように私を驚かせましたが、2 つを混在させることができない理由はありません. ファイルの変更に必要なオブザーバブルは、適切な出発点 (RX) です。進行中の操作は、 (TPL)を取得しCancellationTokenて返すメソッドとして実装される可能性があります。Task

ここで欠けているステップは、変更の「キュー」から実際の作業に移行する方法のようです。基本的に、サブスクリプションは変更を (迅速に) キューに入れ、(遅い、非同期の) メソッドを起動する必要があります。メソッドがまだ実行されていない場合は、「再帰的に」キューを処理します。何かのようなもの:

'changes is your returned observable
'toProcess is the "queue" of changes
'processor holds information about and the task of the in-progress operation
changes.Subscribe(Sub(c)
                     UpdateQueueWithChange(c, toProcess, processor)
                     If processor.Task.IsCompleted Then
                         ProcessNextChange(processor, toProcess)
                     End If
                  End Sub)

ProcessNextChangeキュー内の次の変更を取得し、操作を開始し、操作タスクのコールバックを設定して ProcessNextChange を再呼び出しするメソッドです。変更が残っていない場合は、processorProcessNextChange を再呼び出ししない完了したタスクを与える必要があります。

UpdateQueueWithChange「キュー」を更新し、必要に応じて進行中の操作をキャンセルする必要があります。これProcessNextChangeにより、次の操作を開始するタスクの完了による呼び出しがトリガーされます。

変更オブザーバブルへのサブスクリプションをキャンセルするときに操作をキャンセルしたい場合は、サブスクリプションを使い捨てにすることをおCompositeDisposable勧めSerialDisposableします。操作方法。ProcessNextChange は、操作を開始する前に、SerialDisposable が破棄されたかどうかを確認します。CompositeDisposable は、全体を終了するためにどこかに保存するものです。CancellationDispoableProcessNextChangeprocessorCancellationToken

CompositeDisposable 'this is what your application keeps around
|- IDisposable from subscription to changes observable
|- SerialDisposable
   |- .Disposable property = CancellationDisposable 
      'changed each time ProcessNextChange is called
于 2012-06-20T04:46:00.887 に答える