2

作成されたイベントを受け取り、別のスレッドが機能するようにキューに格納する機能する SystemFileWatcher を作成するのに問題があります。この問題に関してここで数え切れないほどのスレッドを読みましたが、この特定の問題に頭を悩ませることはできません。

using System;
using System.IO;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Collections;
using System.Threading;

namespace FileSystemWatcherTest
{
    class Program
    {
        public static BlockingCollection<string> processCollection = new BlockingCollection<string>(new ConcurrentQueue<string>());

    static void Main(string[] args)
    {
        string path = @"C:\test\";
        FileSystemWatcher watcher = new FileSystemWatcher();

        watcher.Path = path;
        watcher.EnableRaisingEvents = true;
        watcher.Filter = "*.*";

        watcher.Created += new FileSystemEventHandler(onCreated);
        Thread Consumer = new Thread(new ThreadStart(mover));
        Consumer.Start();


        while (true) ;//run infinite loop so program doesn't terminate untill we force it.
    }
    static void onCreated(object sender, FileSystemEventArgs e)
    {
        processCollection.Add(e.FullPath);     
    }

    static void mover()
    {
        string current;
        string processed = @"C:\test\processed\";
        while (true)
        {
            while (processCollection.IsCompleted)
            {
                Thread.Sleep(1000);
            }
            while (processCollection.TryTake(out current))
            {
                System.IO.File.Move(current, processed);
            }
        }
    }
}

}

これは私がテストしたいものです。これが機能しないことは承知しています。ファイルがキュー内に配置されたときにコンソールに書き込むだけで、FSW が機能することを確認しました。私の問題は、ムーバー関数を独自のスレッドで開始しようとしたときに始まります。キューから作業を開始すると、mover 関数と onCreated が通信していないように見えます。

このコードに対する私の期待は、独自のスレッドでムーバー関数を開始し、SFW と一緒に実行することです。私の期待は、blockingcollection にアタッチされた並行キューが自動更新されることです (私は onCreated を介してアイテムをキューに入れます。ムーバーはそのキューに +1 があることを確認します。ムーバーはキューから 1 つ取得し、onCreated はこれを確認します)。おそらく Thread.Sleep の使い方が間違っています。私はもはやblockingcollectionを使用する支持的な理由を持っていません. ロックの使用を見てきましたが、私が理解していることから、concurrentQueue がどのように同期するかにより、これは実際には必要ありません。

最終的な目標は、ランダムなタイミングで入ってくる大量の小さなファイルを処理することです。これらのファイルは .EML です。

可能であれば、何が起こっているのか、この問題を回避するための提案が何であるかについての説明をいただければ幸いです。私は謙虚に来て、私が理解していることはすべて間違っていると言われることを期待しています!

編集:これをコンソール アプリケーションとしてテストしていますが、後でサービスとして使用されます。while (true) ; を追加しました。onCreated() の前に FSW の実行を維持します。

4

1 に答える 1

2

コード例にはいくつかの問題があります。

  1. File.Move()メソッドを悪用しています。両方のパラメータが完全なファイルである必要があります。ディレクトリ名を 2 番目のパラメーターとして渡していますが、これは正しくありません。
  2. IsCompletedあたかもそれが有用であるかのように、コレクションのプロパティを調べています。それは常に でありfalse、コードのブロックは何もしません。これが次の問題につながる…
  3. スレッドがタイトなループで実行されており、大量の CPU 時間を消費しています。これにより、エラーが発生する場合と発生しない場合がありますがFileSystemWatcher、変更が常に報告されることが実際には保証されているわけではなく、そうでない理由の 1 つは、ファイル システムを監視するのに十分な CPU 時間を取得できない場合です。すべての CPU 時間を使い切ってしまうと、単に変更が報告されないことに気付くかもしれません。この問題はプライマリ スレッドにも存在することに注意してください。また、タイトなループで実行され、何もせずに膨大な量の CPU 時間を消費しています。したがって、システムの 2 つのコアを完全に占有しています。
  4. BlockingCollection設計された生産者/消費者実行モデルを利用できていません。によって返された列挙をワーカー スレッドに列挙させ、メソッドをGetConsumingEnumerable()使用してそのスレッドに作業がないことを知らせる必要があります。CompleteAdding()

上記の間違いを修正し、より自己完結型になるように例を少しクリーンアップしたバージョンのコード例を次に示します。

// The default backing collection for BlockingCollection<T>
// is ConcurrentQueue<T>. There's no need to specify that
// explicitly.
public static BlockingCollection<string> processCollection = new BlockingCollection<string>();

static void Main(string[] args)
{
    string testDirectory = Path.Combine(Environment.CurrentDirectory, "test");

    Console.WriteLine("Creating directory: \"{0}\"", testDirectory);
    Directory.CreateDirectory(testDirectory);

    FileSystemWatcher watcher = new FileSystemWatcher();

    watcher.Path = testDirectory;
    watcher.EnableRaisingEvents = true;
    watcher.Filter = "*.*";

    watcher.Created += new FileSystemEventHandler(onCreated);
    Thread Consumer = new Thread(new ParameterizedThreadStart(mover));
    Consumer.Start(testDirectory);

    string text;

    while ((text = Console.ReadLine()) != "")
    {
        string newFile = Path.Combine(testDirectory, text + ".txt");

        File.WriteAllText(newFile, "Test file");
    }

    processCollection.CompleteAdding();
}

static void onCreated(object sender, FileSystemEventArgs e)
{
    if (e.ChangeType == WatcherChangeTypes.Created)
    {
        processCollection.Add(e.FullPath);
    }
}

static void mover(object testDirectory)
{
    string processed = Path.Combine((string)testDirectory, "processed");

    Console.WriteLine("Creating directory: \"{0}\"", processed);

    Directory.CreateDirectory(processed);

    foreach (string current in processCollection.GetConsumingEnumerable())
    {
        // Ensure that the file is in fact a file and not something else.
        if (File.Exists(current))
        {
            System.IO.File.Move(current, Path.Combine(processed, Path.GetFileName(current)));
        }
    }
}
于 2015-08-08T00:58:48.920 に答える