2

マルチスレッド化するために、ログ ファイルを異なるチャンクで読み取りたいと考えています。アプリケーションは、複数のハードディスクを備えたサーバー側環境で実行されます。チャンクに読み取った後、アプリはすべてのチャンクの行ごとに処理を行います。

bufferedreader を使用してすべてのファイル行の読み取りを完了し、RandomAccessFile と MappedByteBuffer を組み合わせてファイルのチャンクを作成できますが、これら 2 つを組み合わせるのは簡単ではありません。

問題は、チャンクが私のチャンクの最後の行に割り込んでいることです。ブロックの最後の行全体を持っていないため、この最後のログ行を処理することは不可能です。行末を考慮して、ファイルを可変長のチャンクに分割する方法を見つけようとしています。

誰もこれを行うためのコードを持っていますか?

4

2 に答える 2

9

チャンクの処理を開始する前に、ファイル内で行の境界にあるオフセットを見つけることができます。ファイルサイズをチャンク番号で割ってオフセットから始め、線の境界が見つかるまで探します。次に、それらのオフセットをマルチスレッドファイルプロセッサにフィードします。チャンクの数に使用可能なプロセッサの数を使用する完全な例を次に示します。

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ReadFileByChunks {
    public static void main(String[] args) throws IOException {
        int chunks = Runtime.getRuntime().availableProcessors();
        long[] offsets = new long[chunks];
        File file = new File("your.file");

        // determine line boundaries for number of chunks
        RandomAccessFile raf = new RandomAccessFile(file, "r");
        for (int i = 1; i < chunks; i++) {
            raf.seek(i * file.length() / chunks);

            while (true) {
                int read = raf.read();
                if (read == '\n' || read == -1) {
                    break;
                }
            }

            offsets[i] = raf.getFilePointer();
        }
        raf.close();

        // process each chunk using a thread for each one
        ExecutorService service = Executors.newFixedThreadPool(chunks);
        for (int i = 0; i < chunks; i++) {
            long start = offsets[i];
            long end = i < chunks - 1 ? offsets[i + 1] : file.length();
            service.execute(new FileProcessor(file, start, end));
        }
        service.shutdown();
    }

    static class FileProcessor implements Runnable {
        private final File file;
        private final long start;
        private final long end;

        public FileProcessor(File file, long start, long end) {
            this.file = file;
            this.start = start;
            this.end = end;
        }

        public void run() {
            try {
                RandomAccessFile raf = new RandomAccessFile(file, "r");
                raf.seek(start);

                while (raf.getFilePointer() < end) {
                    String line = raf.readLine();
                    if (line == null) {
                        continue;
                    }

                    // do what you need per line here
                    System.out.println(line);
                }

                raf.close();
            } catch (IOException e) {
                // deal with exception
            }
        }
    }
}
于 2011-04-01T09:45:18.810 に答える
0

チャンクをオーバーラップさせる必要があります。ブロックより長い行がない場合は、1 ブロックのオーバーラップで十分です。マルチスレッド版が本当に必要ですか? gnu grep のパフォーマンスは十分ではありませんか?

gnu grep の実装により、チャンク境界をまたぐ行の問題が解決されました。GNU ライセンスに煩わされない場合は、おそらくそこからアイデアやコードを借りることができます。これは非常に効率的なシングルスレッドの実装です。

于 2011-04-01T09:08:42.680 に答える