6

Java を使用して非常に大きなファイルを読み込もうとしています。その大きなファイルには次のようなデータが含まれます。つまり、各行にユーザー ID があります。

149905320
1165665384
66969324
886633368
1145241312
286585320
1008665352

そして、その大きなファイルには、約 3000 万のユーザー ID が含まれます。今、私はその大きなファイルからすべてのユーザー ID を 1 つずつ読み取ろうとしています。つまり、各ユーザー ID は、その大きなファイルから 1 回だけ選択する必要があります。たとえば、3000 万のユーザー ID がある場合、マルチスレッド コードを使用して 3000 万のユーザー ID を 1 回だけ出力する必要があります。

以下は、10 個のスレッドで実行されるマルチスレッド コードであるコードですが、以下のプログラムでは、各ユーザー ID が 1 回だけ選択されていることを確認できません。

public class ReadingFile {


    public static void main(String[] args) {

        // create thread pool with given size
        ExecutorService service = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 10; i++) {
            service.submit(new FileTask());
        }
    }
}

class FileTask implements Runnable {

    @Override
    public void run() {

        BufferedReader br = null;
        try {
            br = new BufferedReader(new FileReader("D:/abc.txt"));
            String line;
            while ((line = br.readLine()) != null) {
                System.out.println(line);
                //do things with line
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                br.close();
            } catch (IOException e) {

                e.printStackTrace();
            }
        }
    }
}

誰でもこれで私を助けることができますか? 私は何を間違っていますか?そして、これを行うための最速の方法は何ですか?

4

3 に答える 3

18

ファイルを複数のディスクにストライプ化するようなことを何もしていないと仮定すると、1 つのスレッドがファイルを順番に読み取ることを実際に改善することはできません。1 つのスレッドで、シークを 1 回実行してから、長いシーケンシャル読み取りを 1 回実行します。複数のスレッドを使用すると、スレッドが複数のシークを引き起こし、それぞれがディスク ヘッドの制御を取得することになります。

編集: これは、シリアル I/O を使用して行を読み取りながら、行処理を並列化する方法です。BlockingQueueを使用してスレッド間の通信を行います。FileTaskは行をキューに追加し、 は行を読み取っCPUTaskて処理します。これはスレッドセーフなデータ構造であるため、同期を追加する必要はありません。を使用put(E e)して文字列をキューに追加しているため、キューがいっぱいの場合 ( の宣言で定義されているように、最大​​ 200 個の文字列を保持できますReadingFile)、FileTaskスペースが解放されるまでブロックされます。同様にtake()、キューからアイテムを削除するために使用しているためCPUTask、アイテムが利用可能になるまでブロックされます。

public class ReadingFile {
    public static void main(String[] args) {

        final int threadCount = 10;

        // BlockingQueue with a capacity of 200
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(200);

        // create thread pool with given size
        ExecutorService service = Executors.newFixedThreadPool(threadCount);

        for (int i = 0; i < (threadCount - 1); i++) {
            service.submit(new CPUTask(queue));
        }

        // Wait til FileTask completes
        service.submit(new FileTask(queue)).get();

        service.shutdownNow();  // interrupt CPUTasks

        // Wait til CPUTasks terminate
        service.awaitTermination(365, TimeUnit.DAYS);

    }
}

class FileTask implements Runnable {

    private final BlockingQueue<String> queue;

    public FileTask(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        BufferedReader br = null;
        try {
            br = new BufferedReader(new FileReader("D:/abc.txt"));
            String line;
            while ((line = br.readLine()) != null) {
                // block if the queue is full
                queue.put(line);
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                br.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

class CPUTask implements Runnable {

    private final BlockingQueue<String> queue;

    public CPUTask(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        String line;
        while(true) {
            try {
                // block if the queue is empty
                line = queue.take(); 
                // do things with line
            } catch (InterruptedException ex) {
                break; // FileTask has completed
            }
        }
        // poll() returns null if the queue is empty
        while((line = queue.poll()) != null) {
            // do things with line;
        }
    }
}
于 2013-06-20T18:22:15.633 に答える
0

これは、行が改行で区切られた平均 315 MB のファイルのことです。これは簡単に記憶に収まると思います。保存する必要があるユーザー名には特定の順序がないことを意味します。したがって、次のアルゴリズムをお勧めします。

  • ファイルの長さを取得する
  • ファイルの 10 分の 1 をバイト バッファーにコピーします (バイナリ コピーは高速である必要があります)。
  • これらの各バッファを処理するためのスレッドを開始します
  • 各スレッドは、最初と最後の行を除いて、その領域内のすべての行を処理します。
  • 各スレッドは、終了時にデータの最初と最後の部分行を返す必要があります。
  • 各スレッドの「最後の」スレッドは、次のファイル ブロックで作業しているスレッドの「最初の」スレッドと再結合する必要があります。そして、これらのトークンは後で処理する必要があります。
于 2014-07-15T13:47:22.913 に答える
-2

1.7 で導入された Fork Join API は、このユース ケースに最適です。http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.htmlを確認してください。検索すると、たくさんのサンプルが見つかります。

于 2013-06-20T18:26:00.613 に答える