1

解析が必要な長いテキスト行が数百万行あるとします。
私の i7 2600 CPU では、1000 行ごとに解析するのに約 13 ミリ秒かかります。
したがって、1,000,000 行の解析には約 13 秒かかります。
実行時間を短縮するために、複数のスレッドを使用して管理しました。
ブロッキング キューを使用して、1,000,000 行をそれぞれ 1,000 行を含む 1,000 チャンクのセットとしてプッシュし、8 スレッドを使用してチャンクを消費します。コードは単純で機能しているように見えますが、パフォーマンスは期待できるものではなく、約 11 秒かかります。
マルチスレッド コードの主な部分は次のとおりです。

    for(int i=0;i<threadCount;i++)
    {
        Runnable r=new Runnable() {
            public void run() {
                try{
                    while (true){
                        InputType chunk=inputQ.poll(10, TimeUnit.MILLISECONDS);
                        if(chunk==null){
                            if(inputRemains.get())
                                continue;
                            else
                                return;
                        }
                        processItem(chunk);
                    }
                }catch (Exception e) {
                    e.printStackTrace();  
                }
            }
        };
        Thread t=new Thread(r);
        threadList.add(t);
        for(Thread t: threads)
            t.join();

ExecutorService も使用しましたが、パフォーマンスが低下します。
チャンク サイズを変更しても効果はなく、パフォーマンスは向上しません。
これは、ブロッキング キューがボトルネックではないことを意味します。
一方、シリアル プログラムの 4 つのインスタンスを同時に実行すると、4 つのインスタンスすべてが完了するまでに 15 秒しかかかりません。これは、15 秒で 4 つのプロセスを使用して 4,000,0000 行を処理できることを意味し、したがって、速度アップは約 3.4 であり、マルチスレッドの 1.2 速度アップと比較して非常に有望です。

誰かがこれについて何か考えを持っているのだろうか?
問題は非常に単純です。ブロッキング キュー内の一連の行と、キューからアイテムをポーリングして並列処理する複数のスレッドです。スレッドが完全にビジーであるため、キューは最初にいっぱいになります。
私も以前に同様の経験をしましたが、マルチプロセッシングが優れている理由がわかりません。
また、Windows 7 でテストを実行し、1.7 JRE を使用していることにも言及する必要があります。
どんなアイデアでも大歓迎です。事前に感謝します。

4

4 に答える 4

1

編集:

だから私は当初、あなたのタイミングはあなたのプログラム全体の周りにあると思っていました. 行がメモリに読み込まれた後に行の処理のタイミングを計っている場合は、メソッド独自のprocessItem(chunk);IO を実行しているか、synchronizedオブジェクトまたは他の共有変数に情報を書き込んでいる可能性があります。同時に実行することができます。


誰かがこれについて何か考えを持っているのだろうか?

問題は、CPU ボードではなくIO バウンドである可能性があります。スレッドを追加して速度を大幅に向上させる唯一の方法は、ディスクからの読み取り (またはディスクへの書き込み) よりも多くの CPU 処理を行っている場合です。ディスク サブシステムの IO 機能を最大限に活用すると、処理速度を向上させるためにできることはあまりありません。あなたが示したように、スレッドを追加すると、実際にはIOバウンドプログラムの速度が低下する可能性があります。

1 つの余分なスレッド (つまり、2 つの処理スレッド) を追加して、それが役立つかどうかを確認します。2 秒の速度向上だけが得られる場合は、ファイルを複数のドライブに分割するか、これが繰り返しのタスクである場合はファイルをメモリ ドライブに移動して、より速く読み取ることができるようにする必要があります。

ExecutorService も使用しましたが、パフォーマンスが低下します。

これは、使用しているスレッドが多すぎるか、反復/チャンクごとに処理する行が少なすぎるために発生する可能性があります。

一方、シリアル プログラムの 4 つのインスタンスを同時に実行すると、4 つのインスタンスすべてが完了するまでに 15 秒しかかかりません。

これは、それぞれが OS から互いのディスク キャッシュを使用できるためだと思われます。最初のアプリケーションがブロック #1 を読み取るとき、他の 3 つのアプリケーションは必要ありません。ファイルを 4 回コピーして、4 つのシリアル アプリケーションをそれぞれ独自のファイルで同時に実行してみてください。違いがわかるはずです。

于 2012-10-16T20:59:27.127 に答える
0

1000 行ごとに解析するのに約 13 ミリ秒かかります。したがって、1,000,000 行の解析には約 13 秒かかります。

jVM は 10,000 を実行するまでウォームアップしません。その後は 10 ~ 100 倍速くなり、13 秒または 130 ミリ秒以下になる可能性があります。

ブロッキング キューを使用して、1,000,000 行をそれぞれ 1,000 行を含む 1,000 チャンクのセットとしてプッシュし、8 スレッドを使用してチャンクを消費します。コードは単純で動作しているように見えますが、パフォーマンスは期待できるものではなく、約 11 秒かかります。

1 つのスレッドを再テストすることをお勧めします。おそらく 11 秒もかからないでしょう。

ボトル ネックは、文字列を 1 行に解析して文字列オブジェクトを作成するのにかかる時間です。残りは単なるオーバーヘッドであり、真のボトル ​​ネックには対応していません。


CPU ごとに 1 つずつ、異なるファイルを読み取ると、直線的な速度アップに近づくことができます。行を読み取る際の問題は、1 つずつ読み取る必要があり、同時実行のメリットがほとんど得られないことです。

于 2012-10-16T22:00:55.413 に答える
0

2600 は 8 スレッドに HT (ハイパー スレッディング) を使用しています..解析は主にメモリ作業であるため、HT のメリットはほとんどありません..

于 2012-11-14T07:35:02.380 に答える
0

あなたのコードの並列化を非難します。アイテムを処理できる場合、複数のスレッドが同じリソース (キュー) を求めて競合します。同期ロックの競合は、少しパフォーマンスを低下させます。アイテムがキューに追加されるよりも速く処理されている場合、不足しているスレッドはほとんどビジー ループです。while (true) {}. これは、ポーリング時間が非常に短く、ポーリングが失敗したときにすぐに再試行するためです。

同期に関するちょっとしたメモ。まず、JVM はビジー ループを使用してリソースが利用可能になるのを待ちます。これは、(一般に) コードは同期ロックをできるだけ早く解放するように記述されており、別の方法 (コンテキスト スイッチを実行すること) は非常にコストがかかるためです。最終的に、JVM がほとんどの時間を同期ロックの待機に費やしていることが判明した場合、ロックを取得できない場合はデフォルトで別のスレッドに切り替えるようになります。

より良い解決策は、スレッドに使用可能なスロットと新しいスレッドのデータの両方がある場合に、1 つのスレッドがデータを読み取り、新しいスレッドをディスパッチすることです。ここで Executor は、どのスレッドが終了し、どのスレッドがまだビジーであるかを追跡できるので便利です。しかし、疑似コードは次のようになります。

int charsRead;
char[] buffer = new char[BUF_SIZE];
int startIndex = 0;

while((charsRead = inputStreamReader.read(buffer, startIndex, buffer.length)
                != -1) {
    // find last new line so don't give a thread any partial lines
    int lastNewLine = findFirstNewLineBeforeIndex(buffer, charsRead);

    waitForAvailableThread(); // if not max threads running then should return 
    // immediately
    Thread t = new Thread(createRunnable(buffer, lastNewLine));
    t.start();
    addRunningThread(t);

    // copy any overshoot to the start of a new buffer
    // use a new buffer as the another thread is now reading from the previous 
    // buffer
    char[] newBuffer = new char[BUF_SIZE];
    System.arraycopy(buffer, lastNewLine+1, newBuffer, 0, 
        charsRead-lastNewLine-1);
    buffer = newBuffer;
}
waitForRemainingThreadsToTerminate();
于 2012-10-16T21:58:02.033 に答える