8

FileSystemWatcherChangedイベントを監視可能なシーケンスに変換するコードをいくつか書きました。

私の目標は、すべてのファイル システムの変更を別々のストリームに分割し、それらを調整することです。

たとえば、0.5 秒に 3 回変更される 10 個の異なるファイルがある場合、ファイルごとに 1 回だけ通知を受け取ります。

ただ、気になるのはGroupBy()オペレーターです。これが機能するためには、(私が推測するに) 時間をかけてグループを構築し続け、少量のメモリを消費し続ける必要があります。

これにより「リーク」が発生しますか?もしそうなら、どうすればそれを防ぐことができますか?

FileSystemWatcher _watcher = new FileSystemWatcher("d:\\") {
    EnableRaisingEvents = true,
    NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.Size
};

void Main()
{
    var fileSystemEventStream = 
        Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>
            (
                _ => _watcher.Changed += _, 
                _ => _watcher.Changed -= _
            )
            .ObserveOn(ThreadPoolScheduler.Instance)
            .SubscribeOn(ThreadPoolScheduler.Instance)
            .GroupBy(ep => ep.EventArgs.FullPath, ep => ep.EventArgs.FullPath)
            ;

    var res = 
        from fileGroup in fileSystemEventStream
        from file in fileGroup.Throttle(TimeSpan.FromSeconds(1))
        select file;

    res.Subscribe(
        ReceiveFsFullPath, 
        exception => {
            Console.WriteLine ("Something went wrong - " + exception.Message + " " + exception.StackTrace);
        });

    Console.Read();
}

void ReceiveFsFullPath(string s){
    Console.WriteLine ("Received file system event on thread " + Thread.CurrentThread.ManagedThreadId);
    Console.WriteLine(s);
}
4

2 に答える 2

4

はい、新しいキーごとに、GroupBy はサブジェクトを作成し、これらのサブジェクトの辞書を維持します。そして、あなたはこれらのそれぞれを購読しています。つまり、古いエントリを解放することなく、時間の経過とともに大きくなるメモリの小さなチャンクです。本当に必要なのは、スロットル タイマーの期限が切れたときにキーを削除することです。組み込み演算子でこれを行う方法は考えられません。そのため、カスタム オペレーターが必要です。これが1つの刺し傷です。

public IObservable<T> ThrottleDistinct<T>(this IObservable<T> source, TimeSpan delay)
{
    return Observable.Create(observer =>
    {
        var notifications = new Subject<IObservable<T>>();
        var subscription = notifications.Merge().Subscribe(observer);
        var d = new Dictionary<T, IObserver<T>>();
        var gate = new object();
        var sourceSubscription = new SingleAssignmentDisposable();
        var subscriptions = new CompositeDisposable(subscription, sourceSubscription);
        sourceSubscription.Disposable = source.Subscribe(value =>
        {
           IObserver<T> entry;
           lock(gate)
           {
             if (d.TryGetValue(value, out entry))
             {
               entry.OnNext(value);
             }
             else
             {
               var s = new Subject<T>();
               var o = s.Throttle(delay).FirstAsync().Do(() =>
               {
                 lock(gate)
                 {
                   d.Remove(value);
                 }
               });
               notifications.OnNext(o);
               d.Add(value, s);
               s.OnNext(value);
             }
          }
        }, observer.OnError, notifications.OnCompleted);

        return subscriptions;
    });
}

...
Observable.FromEventPattern(...)
    .Select(e => e.EventArgs.FullPath)
    .ThrottleDistinct(TimeSpan.FromSeconds(1))
    .Subscribe(...);
于 2013-08-04T12:57:30.310 に答える
1

ブランドンの回答によると、被験者は成長し、取り戻す方法はありません*. ここでのメモリリークに関する私の主な懸念は、サブスクリプションをキャプチャしないことです! すなわち

res.Subscribe(...

に置き換える必要があります

subscription = res.Subscribe(...

サブスクリプションをキャプチャしないと、サブスクリプションを破棄できないため、イベント ハンドラーを解放できず、「メモリ リーク」が発生します。明らかに、サブスクリプションを実際にどこかで処分しない場合、これは役に立ちません。

*まあ、それらが完了した場合、それらは自動的に破棄されるので、うまくいくでしょう。FileDeleted イベントが発生したときにシーケンスを完了するように見えますか?

于 2013-08-05T09:55:53.297 に答える