31

ディレクトリを監視するプログラムを作成しており、その中にファイルが作成されると、名前が変更されて新しいディレクトリに移動されます。最初の実装では、1kb ファイルのテスト時に問題なく動作する Java の Watch Service API を使用しました。発生した問題は、実際には、作成されるファイルが 50 ~ 300 MB のどこかにあるということです。これが発生すると、ウォッチャー API はすぐにファイルを見つけますが、ファイルがまだ書き込まれているため移動できませんでした。ウォッチャーをループに入れてみましたが (ファイルが移動できるようになるまで例外が生成されました)、これはかなり非効率的でした。

それがうまくいかなかったので、10秒ごとにフォルダーをチェックし、可能であればファイルを移動するタイマーを使用してみました。これは私が行き着いた方法です。

質問: 例外チェックを実行したり、サイズを継続的に比較したりせずに、ファイルの書き込みが完了したことを通知する方法はありますか? タイマーで継続的にチェックする (そして例外が発生する) のではなく、ファイルごとに 1 回だけ Watcher API を使用するというアイデアが気に入っています。

すべての応答は大歓迎です!

nt

4

13 に答える 13

22

今日も同じ問題に遭遇しました。私のユースケースでは、ファイルが実際にインポートされるまでのわずかな遅延は大きな問題ではなく、それでも NIO2 API を使用したいと考えていました。私が選んだ解決策は、ファイルに対して操作を実行する前に、ファイルが変更されない状態が 10 秒間続くまで待機することでした。

実装の重要な部分は次のとおりです。プログラムは、待機時間が満了するか、新しいイベントが発生するまで待機します。有効期限は、ファイルが変更されるたびにリセットされます。待機時間が経過する前にファイルが削除されると、そのファイルはリストから削除されます。予想される有効期限のタイムアウト、つまり (lastmodified+waitTime)-currentTime で poll メソッドを使用します

private final Map<Path, Long> expirationTimes = newHashMap();
private Long newFileWait = 10000L;

public void run() {
    for(;;) {
        //Retrieves and removes next watch key, waiting if none are present.
        WatchKey k = watchService.take();

        for(;;) {
            long currentTime = new DateTime().getMillis();

            if(k!=null)
                handleWatchEvents(k);

            handleExpiredWaitTimes(currentTime);

            // If there are no files left stop polling and block on .take()
            if(expirationTimes.isEmpty())
                break;

            long minExpiration = min(expirationTimes.values());
            long timeout = minExpiration-currentTime;
            logger.debug("timeout: "+timeout);
            k = watchService.poll(timeout, TimeUnit.MILLISECONDS);
        }
    }
}

private void handleExpiredWaitTimes(Long currentTime) {
    // Start import for files for which the expirationtime has passed
    for(Entry<Path, Long> entry : expirationTimes.entrySet()) {
        if(entry.getValue()<=currentTime) {
            logger.debug("expired "+entry);
            // do something with the file
            expirationTimes.remove(entry.getKey());
        }
    }
}

private void handleWatchEvents(WatchKey k) {
    List<WatchEvent<?>> events = k.pollEvents();
    for (WatchEvent<?> event : events) {
        handleWatchEvent(event, keys.get(k));
    }
    // reset watch key to allow the key to be reported again by the watch service
    k.reset();
}

private void handleWatchEvent(WatchEvent<?> event, Path dir) throws IOException {
    Kind<?> kind = event.kind();

    WatchEvent<Path> ev = cast(event);
        Path name = ev.context();
        Path child = dir.resolve(name);

    if (kind == ENTRY_MODIFY || kind == ENTRY_CREATE) {
        // Update modified time
        FileTime lastModified = Attributes.readBasicFileAttributes(child, NOFOLLOW_LINKS).lastModifiedTime();
        expirationTimes.put(name, lastModified.toMillis()+newFileWait);
    }

    if (kind == ENTRY_DELETE) {
        expirationTimes.remove(child);
    }
}
于 2011-01-24T15:03:18.660 に答える
11

元のファイルが完成したことを示すために、別のファイルを書き込みます。Ig 'fileorg.dat' が成長している場合、ファイル 'fileorg.done' を作成し、'fileorg.done' のみをチェックします。

巧妙な命名規則により、問題は発生しないはずです。

于 2010-07-30T07:24:17.073 に答える
9

2 つのソリューション:

1 つ目は、 stacker による回答のわずかなバリエーションです。

不完全なファイルには一意のプレフィックスを使用します。myhugefile.zip.incの代わりのようなものmyhugefile.zip。アップロード/作成が終了したら、ファイルの名前を変更します。ウォッチから .inc ファイルを除外します。

2 つ目は、同じドライブ上の別のフォルダーを使用してファイルを作成/アップロード/書き込みし、準備ができたら監視フォルダーに移動することです。それらが同じドライブ上にある場合、移動はアトミックアクションである必要があります(ファイルシステムに依存すると思います)。

いずれにせよ、ファイルを作成するクライアントは、追加の作業を行う必要があります。

于 2010-07-30T08:20:44.500 に答える
5

Apache Camel は、ファイルの名前を変更しようとする (java.io.File.renameTo) ことで、ファイルがアップロードされていない問題を処理しているようです。名前の変更が失敗した場合、読み取りロックはありませんが、試行を続けます。名前の変更が成功すると、名前を元に戻してから、意図した処理に進みます。

以下のoperations.renameFileを参照してください。Apache Camel ソースへのリンクは次のとおりです: GenericFileRenameExclusiveReadLockStrategy.javaおよびFileUtil.java

public boolean acquireExclusiveReadLock( ... ) throws Exception {
   LOG.trace("Waiting for exclusive read lock to file: {}", file);

   // the trick is to try to rename the file, if we can rename then we have exclusive read
   // since its a Generic file we cannot use java.nio to get a RW lock
   String newName = file.getFileName() + ".camelExclusiveReadLock";

   // make a copy as result and change its file name
   GenericFile<T> newFile = file.copyFrom(file);
   newFile.changeFileName(newName);
   StopWatch watch = new StopWatch();

   boolean exclusive = false;
   while (!exclusive) {
        // timeout check
        if (timeout > 0) {
            long delta = watch.taken();
            if (delta > timeout) {
                CamelLogger.log(LOG, readLockLoggingLevel,
                        "Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
                // we could not get the lock within the timeout period, so return false
                return false;
            }
        }

        exclusive = operations.renameFile(file.getAbsoluteFilePath(), newFile.getAbsoluteFilePath());
        if (exclusive) {
            LOG.trace("Acquired exclusive read lock to file: {}", file);
            // rename it back so we can read it
            operations.renameFile(newFile.getAbsoluteFilePath(), file.getAbsoluteFilePath());
        } else {
            boolean interrupted = sleep();
            if (interrupted) {
                // we were interrupted while sleeping, we are likely being shutdown so return false
                return false;
            }
        }
   }

   return true;
}
于 2013-07-23T16:18:47.390 に答える
4

私はそれが古い質問であることを知っていますが、誰かを助けることができるかもしれません.

私は同じ問題を抱えていたので、私がしたことは次のとおりでした:

if (kind == ENTRY_CREATE) {
            System.out.println("Creating file: " + child);

            boolean isGrowing = false;
            Long initialWeight = new Long(0);
            Long finalWeight = new Long(0);

            do {
                initialWeight = child.toFile().length();
                Thread.sleep(1000);
                finalWeight = child.toFile().length();
                isGrowing = initialWeight < finalWeight;

            } while(isGrowing);

            System.out.println("Finished creating file!");

        }

ファイルが作成されると、ファイルはどんどん大きくなります。だから私がしたことは、秒単位で区切られた重量を比較することでした. 両方の重みが同じになるまで、アプリはループに入ります。

于 2013-03-08T16:09:19.097 に答える
3

SO がコピーを終了したときに Watcher Service API から通知を受けることはできませんが、すべてのオプションは「回避策」のようです (これを含む!)。

上でコメントしたように、

1) UNIX では、移動またはコピーはオプションではありません。

2) File.canWrite は、ファイルがまだコピー中であっても、書き込み権限がある場合は常に true を返します。

3) タイムアウトまたは新しいイベントが発生するまで待機することもできますが、システムが過負荷になってもコピーが完了していない場合はどうなりますか? タイムアウトが大きな値の場合、プログラムは非常に長く待機します。

4)ファイルを作成するのではなく、単に消費するだけの場合、コピーが終了したことを「フラグ」に別のファイルに書き込むことはオプションではありません。

別の方法は、以下のコードを使用することです。

boolean locked = true;

while (locked) {
    RandomAccessFile raf = null;
    try {
            raf = new RandomAccessFile(file, "r"); // it will throw FileNotFoundException. It's not needed to use 'rw' because if the file is delete while copying, 'w' option will create an empty file.
            raf.seek(file.length()); // just to make sure everything was copied, goes to the last byte
            locked = false;
        } catch (IOException e) {
            locked = file.exists();
            if (locked) {
                System.out.println("File locked: '" + file.getAbsolutePath() + "'");
                Thread.sleep(1000); // waits some time
            } else { 
                System.out.println("File was deleted while copying: '" + file.getAbsolutePath() + "'");
            }
    } finally {
            if (raf!=null) {
                raf.close();    
            }
        }
}
于 2012-10-03T13:37:36.430 に答える
0

これは非常に興味深い議論です。確かにこれはパンとバターのユースケースです。新しいファイルが作成されるのを待ってから、何らかの方法でファイルに反応します。ここでの競合状態は興味深いものです。確かに、ここでの高レベルの要件は、イベントを取得してから、実際に(少なくとも)ファイルの読み取りロックを取得することです。大きなファイルまたは単に大量のファイル作成の場合、これには、新しく作成されたファイルのロックを定期的に取得しようとし、成功すると実際に作業を行うワーカースレッドのプール全体が必要になる可能性があります。しかし、NTが認識しているように、これは最終的にはポーリングアプローチであり、スケーラビリティとポーリングはうまく調和する2つの言葉ではないため、スケーリングするには慎重に行う必要があります。

于 2011-01-20T20:00:54.117 に答える
0

Linux の大きなファイルの場合、ファイルは .filepart の拡張子でコピーされます。commons api を使用して拡張機能を確認し、ENTRY_CREATE イベントを登録するだけです。これを.csvファイル(1GB)でテストし、機能したことを追加しました

public void run()
{
    try
    {
        WatchKey key = myWatcher.take();
        while (key != null)
        {
            for (WatchEvent event : key.pollEvents())
            {
                if (FilenameUtils.isExtension(event.context().toString(), "filepart"))
                {
                    System.out.println("Inside the PartFile " + event.context().toString());
                } else
                {
                    System.out.println("Full file Copied " + event.context().toString());
                    //Do what ever you want to do with this files.
                }
            }
            key.reset();
            key = myWatcher.take();
        }
    } catch (InterruptedException e)
    {
        e.printStackTrace();
    }
}
于 2015-04-20T07:52:14.463 に答える
0

アップロードされたファイルを転送するためにファイル システム ウォッチャーを実装したときも、同様の状況に対処する必要がありました。この問題を解決するために実装したソリューションは、次のとおりです。

1- まず、未処理のファイルのマップを維持します (ファイルがまだコピーされている限り、ファイル システムは Modify_Event を生成するため、フラグが false の場合は無視できます)。

2- fileProcessor で、リストからファイルを取得し、ファイルシステムによってロックされているかどうかを確認します。ロックされている場合は、例外が発生します。この例外をキャッチし、スレッドを待機状態 (つまり 10 秒) にしてから、再試行します。再びロックが解除されるまで。ファイルを処理した後、フラグを true に変更するか、マップから削除できます。

待機タイムスロット中に同じファイルの多くのバージョンが転送される場合、このソリューションは効率的ではありません。

乾杯、ラムジ

于 2011-12-12T15:00:01.680 に答える
0

書き込みが完了したファイルをどれだけ緊急に移動する必要があるかに応じて、安定した最終変更のタイムスタンプを確認し、静止しているファイルのみを移動することもできます。安定するために必要な時間は実装に依存する可能性がありますが、最終更新のタイムスタンプが 15 秒間変更されていないものは、移動するのに十分安定している必要があると思います。

于 2012-10-03T18:32:56.900 に答える
-1

java.io.File.canWrite() は、ファイルの書き込みがいつ完了したかを教えてくれると思います。

于 2010-07-30T08:42:35.130 に答える