3

700K + 行を超える巨大な CSV ファイルがあります。その CSV ファイルの行を解析して操作を行う必要があります。スレッドを使用してそれを行うことを考えました。私が最初にやろうとしていることは単純です。すべてのスレッドは、CSV ファイルの固有の行を処理する必要があります。読み取る行数が 3000 に制限されています。スレッドを 3 つ作成します。各スレッドは、CSV ファイルの 1 行を読み取る必要があります。コードは次のとおりです。

import java.io.*;

class CSVOps implements Runnable
{
    static int lineCount = 1;
    static int limit = 3000;
    BufferedReader CSVBufferedReader;

    public CSVOps(){} // Default constructor

    public CSVOps(BufferedReader br){
        this.CSVBufferedReader = br;
    }

    private synchronized void readCSV(){
        System.out.println("Current thread "+Thread.currentThread().getName());
        String line;
        try {
            while((line = CSVBufferedReader.readLine()) != null){
                System.out.println(line);
                lineCount ++;
                if(lineCount >= limit){
                    break;
                }
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void run() {
        readCSV();
    }

}

class CSVResourceHandler
{
    String CSVPath;

    public CSVResourceHandler(){ }// default constructor

    public CSVResourceHandler(String path){
        File f = new File(path);
        if(f.exists()){
            CSVPath = path;
        }
        else{
            System.out.println("Wrong file path! You gave: "+path);
        }
    }

    public BufferedReader getCSVFileHandler(){
        BufferedReader br = null;
        try{
            FileReader is = new FileReader(CSVPath);
            br = new BufferedReader(is);
        }
        catch(Exception e){
        }
        return br;
    }
}

public class invalidRefererCheck
{
    public static void main(String [] args) throws InterruptedException
    {
        String pathToCSV = "/home/shantanu/DEV_DOCS/Contextual_Work/invalid_domain_kw_site_wise_click_rev2.csv";
        CSVResourceHandler csvResHandler = new CSVResourceHandler(pathToCSV);
        CSVOps ops = new CSVOps(csvResHandler.getCSVFileHandler());

        Thread t1 = new Thread(ops);
        t1.setName("T1");

        Thread t2 = new Thread(ops);
        t1.setName("T2");

        Thread t3 = new Thread(ops);
        t1.setName("T3");

        t1.start();
        t2.start();
        t3.start();
    }
}

クラス CSVResourceHandler は、渡されたファイルが存在するかどうかを単純に検出し、BufferedReader を作成して提供します。このリーダーは CSVOps クラスに渡されます。CSV ファイルの 1 行を読み取って出力する readCSV メソッドがあります。3000 に制限が設定されています。

スレッドがカウントを台無しにしないようにするために、これらの制限とカウント変数の両方を静的として宣言します。このプログラムを実行すると、奇妙な出力が得られます。私は約 1000 レコードしか取得できず、1500 レコードを取得することもあります。順不同です。出力の最後に CSV ファイルの 2 行を取得し、現在のスレッド名が main であることがわかります!!

私はスレッドの初心者です。このCSVファイルの読み込みを速くしたい。それは何ができますか?

4

2 に答える 2

2

まず、複数のスレッドを使用して単一の機械ディスクから並列 I/O を実行しないでください。スレッドが実行されるたびに機械ヘッドが次の読み取り場所を探す必要があるため、実際にはパフォーマンスが低下します。したがって、コストのかかる操作であるディスクのヘッドを不必要にバウンスしています。

シングル プロデューサー マルチ コンシューマー モデルを使用して、シングル スレッドを使用して行を読み取り、ワーカーのプールを使用してそれらを処理します。

あなたの問題に:

メインを終了する前に、スレッドが終了するのを実際に待つべきではありませんか?

public class invalidRefererCheck
{
    public static void main(String [] args) throws InterruptedException
    {
        ...
        t1.start();
        t2.start();
        t3.start();

        t1.join();
        t2.join();
        t3.join();
    }
}
于 2012-06-26T13:14:17.587 に答える
0

ファイルを大きなチャンクで読むことをお勧めします。大きなバッファー オブジェクトを割り当て、チャンクを読み取り、最後から解析して最後の EOL 文字を見つけ、バッファーの最後のビットを一時文字列にコピーし、EOL+1 で null をバッファーに押し込み、バッファーをキューに入れます。参照、すぐに新しいものを作成し、最初に一時文字列をコピーしてから、残りのバッファを埋めて、EOF まで繰り返します。完了するまで繰り返します。スレッドのプールを使用して、バッファーを解析/処理します。

有効な行のチャンク全体をキューに入れる必要があります。単一行をキューに入れると、スレッド通信が解析よりも長くかかります。

これと同様に、チャンクがプール内のスレッドによって「順不同」に処理される可能性があることに注意してください。順序を維持する必要がある場合 (たとえば、入力ファイルがソートされ、出力がソートされたままの別のファイルに送られる場合)、チャンク アセンブラー スレッドに各チャンク オブジェクトにシーケンス番号を挿入させることができます。その後、プール スレッドは、処理済みのバッファーをさらに別のスレッド (またはタスク) に渡すことができます。このスレッドは、以前のチャンクがすべて入るまで、順不同のチャンクのリストを保持します。

マルチスレッドは、難しい/危険/非効率的である必要はありません。キュー/プール/タスクを使用する場合は、同期/結合を避け、スレッドを継続的に作成/終了/破棄せず、一度に 1 つのスレッドしか処理できない大きなバッファー オブジェクトだけをキューに入れます。デッドロックや偽共有などの可能性がほとんどなく、高速化が見られるはずです。

このような高速化の次のステップは、バッファーのプール キューを事前に割り当てて、バッファーと関連する GC の継続的な作成/削除を排除し、すべてのバッファーの開始時に (L1 キャッシュ サイズ) 'デッド ゾーン' を使用して、キャッシュ共有を完全に排除します。マルチコア ボックス (特に SSD を使用する場合) では、これは非常に高速です。

ジャバ、そうですね。nullターミネータを使用した回答の「CplusPlusらしさ」についてお詫び申し上げます。ただし、残りのポイントは問題ありません。これは、言語に依存しない回答である必要があります:)

于 2012-06-27T08:33:10.527 に答える