16

Java で並行処理がどのように機能するかについて、私はまだ頭を悩ませているところです。(OO Java 5 同時実行モデルをサブスクライブしている場合) TaskorCallablerun()orcall()メソッドで (それぞれ) 実装し、その実装されたメソッドを可能な限り並列化する必要があることを理解しています。

しかし、Java での並行プログラミングに固有の何かをまだ理解していません。

  • Taskrun()メソッドには、実行される同時作業の適切な量がどのように割り当てられますか?

readMobyDick()具体的な例として、 Herman Melville のMoby Dickの内容全体をローカル システム上のファイルからメモリに読み込むI/O バウンドメソッドがあるとします。readMobyDick()そして、このメソッドを並行して3つのスレッドで処理したいとしましょう。

  • スレッド #1 は本の最初の 1/3 をメモリに読み込みます
  • スレッド #2 は、本の 2 番目の 1/3 をメモリに読み込みます
  • スレッド #3 は本の最後の 1/3 をメモリに読み込みます

Moby Dick を 3 つのファイルに分割して、それぞれのタスクに渡す必要がありますか、それともreadMobyDick()、実装されたrun()メソッド内から呼び出すだけで、(どういうわけか)Executorスレッド間で作業を分割する方法を知っているのでしょうか。

私は非常に視覚的な学習者なので、これにアプローチする正しい方法のコード例は大歓迎です! ありがとう!

4

4 に答える 4

22

おそらく、並列アクティビティの最悪の例を誤って選択したことでしょう。

単一の機械ディスクからの並列読み取りは、実際には単一のスレッドでの読み取りよりも遅くなります。これは、実際には、各スレッドが順番に実行されるときに機械ヘッドをディスクのさまざまなセクションにバウンスしているためです。これは、シングル スレッド アクティビティとして残すのが最適です。

あなたのものに似ていますが、実際にはいくつかの利点を提供できる別の例を見てみましょう: 単語の膨大なリストで特定の単語の出現を検索したいとします (このリストはディスクファイルから取得することもできますが、私のように単一のスレッドで読み取られます)。あなたの例のように 3 つのスレッドを使用できると仮定します。それぞれが膨大な単語リストの 1/3 を検索し、検索された単語が何回出現したかのローカル カウンターを保持します。

この場合、リストを 3 つの部分に分割し、型が Runnable を実装する別のオブジェクトに各部分を渡し、runメソッドに検索を実装します。

ランタイム自体は、パーティショニングなどを行う方法がわからないため、自分で指定する必要があります。他にも多くのパーティショニング戦略があり、それぞれに長所と短所がありますが、ここでは静的なパーティショニングに固執することができます。

いくつかのコードを見てみましょう:

class SearchTask implements Runnable {
     private int localCounter = 0;
     private int start; // start index of search
     private int end;
     private List<String> words;
     private String token;

     public SearchTask(int start, int end, List<String> words, String token) {
         this.start = start;
         this.end = end;
         this.words = words;
         this.token = token;
     }

     public void run() {
         for(int i = start; i < end; i++) {
              if(words.get(i).equals(token)) localCounter++;
         }
     }

     public int getCounter() { return localCounter; }
}

// meanwhile in main :)

List<String> words = new ArrayList<String>();
// populate words 
// let's assume you have 30000 words

// create tasks
SearchTask task1 = new SearchTask(0, 10000, words, "John");
SearchTask task2 = new SearchTask(10000, 20000, words, "John");
SearchTask task3 = new SearchTask(20000, 30000, words, "John");

// create threads for each task
Thread t1 = new Thread(task1);
Thread t2 = new Thread(task2);
Thread t3 = new Thread(task3);

// start threads
t1.start();
t2.start();
t3.start();

// wait for threads to finish
t1.join();
t2.join();
t3.join();

// collect results
int counter = 0;
counter += task1.getCounter();
counter += task2.getCounter();
counter += task3.getCounter();

これはうまくいくはずです。実際のケースでは、より一般的なパーティショニング スキームを構築することに注意してください。結果を返したい場合は、代わりにExecutorServiceand 実装を使用することもできます。CallableRunnable

したがって、より高度な構成を使用した別の例:

class SearchTask implements Callable<Integer> {
     private int localCounter = 0;
     private int start; // start index of search
     private int end;
     private List<String> words;
     private String token;

     public SearchTask(int start, int end, List<String> words, String token) {
         this.start = start;
         this.end = end;
         this.words = words;
         this.token = token;
     }

     public Integer call() {
         for(int i = start; i < end; i++) {
              if(words.get(i).equals(token)) localCounter++;
         }
         return localCounter;
     }        
}

// meanwhile in main :)

List<String> words = new ArrayList<String>();
// populate words 
// let's assume you have 30000 words

// create tasks
List<Callable> tasks = new ArrayList<Callable>();
tasks.add(new SearchTask(0, 10000, words, "John"));
tasks.add(new SearchTask(10000, 20000, words, "John"));
tasks.add(new SearchTask(20000, 30000, words, "John"));

// create thread pool and start tasks
ExecutorService exec = Executors.newFixedThreadPool(3);
List<Future> results = exec.invokeAll(tasks);

// wait for tasks to finish and collect results
int counter = 0;
for(Future f: results) {
    counter += f.get();
}
于 2012-05-16T19:16:20.563 に答える
2

チューダーがとても親切に指摘してくれたので、あなたは悪い例を選びました。回転するディスクハードウェアは、プラッターとヘッドを移動するという物理的な制約を受けます。最も効率的な読み取りの実装は、各ブロックを順番に読み取ることです。これにより、ヘッドを移動したり、ディスクが整列するのを待つ必要がなくなります。

とは言うものの、一部のオペレーティングシステムは常にディスクに物事を継続的に保存するとは限りません。覚えている人にとっては、OS /ファイルシステムがその役割を果たさなかった場合、デフラグによってディスクのパフォーマンスが向上する可能性があります。

メリットのあるプログラムが必要だとおっしゃっていましたが、単純な行列の加法を提案させてください。

コアごとに1つのスレッドを作成したとすると、追加する任意の2つのマトリックスをN(スレッドごとに1つ)の行に簡単に分割できます。行列の加法(思い出す場合)は次のように機能します。

A + B = C

また

[ a11, a12, a13 ]   [ b11, b12, b13]  =  [ (a11+b11), (a12+b12), (a13+c13) ]
[ a21, a22, a23 ] + [ b21, b22, b23]  =  [ (a21+b21), (a22+b22), (a23+c23) ]
[ a31, a32, a33 ]   [ b31, b32, b33]  =  [ (a31+b31), (a32+b32), (a33+c33) ]

したがって、これをN個のスレッドに分散するには、行数とモジュラスをスレッド数で割って、追加される「スレッドID」を取得する必要があります。

matrix with 20 rows across 3 threads
row % 3 == 0 (for rows 0, 3, 6,  9, 12, 15, and 18)
row % 3 == 1 (for rows 1, 4, 7, 10, 13, 16, and 19)
row % 3 == 2 (for rows 2, 5, 8, 11, 14, and 17)
// row 20 doesn't exist, because we number rows from 0

これで、各スレッドは処理する行を「認識」し、結果が他のスレッドの計算ドメインと交差しないため、「行ごと」の結果を簡単に計算できます。

ここで必要なのは、値がいつ計算されたかを追跡する「結果」データ構造であり、最後の値が設定されると、計算が完了します。2つのスレッドを使用した行列の加算結果のこの「偽の」例では、2つのスレッドを使用して回答を計算するのに約半分の時間がかかります。

// the following assumes that threads don't get rescheduled to different cores for 
// illustrative purposes only.  Real Threads are scheduled across cores due to
// availability and attempts to prevent unnecessary core migration of a running thread.
[ done, done, done ] // filled in at about the same time as row 2 (runs on core 3)
[ done, done, done ] // filled in at about the same time as row 1 (runs on core 1)
[ done, done, .... ] // filled in at about the same time as row 4 (runs on core 3)
[ done, ...., .... ] // filled in at about the same time as row 3 (runs on core 1)

より複雑な問題はマルチスレッドによって解決でき、さまざまな問題はさまざまな手法で解決されます。最も単純な例の1つを意図的に選びました。

于 2012-05-16T19:44:41.297 に答える
2

run()またはcall()メソッドを使用してTaskまたはCallableを実装し(それぞれ)、実装されたメソッドを可能な限り並列化することをお勧めします。

Aは個別の作業単位をTask表し ますファイルをメモリにロードすることは個別の作業単位であるため、このアクティビティをバックグラウンドスレッドに委任できます。つまり、バックグラウンドスレッドがファイルをロードするこのタスクを実行します。 それは、その仕事をする(ファイルをロードする)ために他の依存関係を必要とせず、離散的な境界を持っているので、離散的な作業単位です。 あなたが求めているのは、これをさらにタスクに分割することです。つまり、スレッドはファイルの1/3をロードし、別のスレッドは2/3などをロードします。 タスクをさらにサブタスクに分割できた場合、定義上、そもそもタスクではありません。したがって、ファイルのロードはそれ自体が1つのタスクです。



例を挙げる
と、GUIがあり、5つの異なるファイルからのデータをユーザーに提示する必要があるとします。それらを提示するには、実際のデータを処理するためのいくつかのデータ構造も準備する必要があります。
これらはすべて別個のタスクです。
たとえば、ファイルのロードは5つの異なるタスクであるため、5つの異なるスレッドで実行できます。
データ構造の準備は別のスレッドで行うことができます。
もちろん、GUIは別のスレッドで実行されます。
これらはすべて同時に発生する可能性があります

于 2012-05-16T19:45:13.763 に答える
-1

システムが高スループット I/O をサポートしている場合は、次の方法で実行できます。

高スループット (3GB/秒) のファイル システムが利用可能な場合に、Java で複数のスレッドを使用してファイルを読み取る方法

これは、複数のスレッドで単一のファイルを読み取るためのソリューションです。

ファイルを N 個のチャンクに分割し、各チャンクをスレッドで読み取り、順番にマージします。チャンクの境界をまたぐ行に注意してください。ユーザー スラックスが提案する基本的な考え方です

単一の 20 GB ファイルに対する複数スレッドの実装を下回るベンチマーク:

1 スレッド : 50 秒 : 400 MB/s

2 スレッド: 30 秒: 666 MB/秒

4 スレッド: 20 秒: 1GB/秒

8 スレッド: 60 秒: 333 MB/秒

同等の Java7 readAllLines() : 400 秒 : 50 MB/秒

注: これは、高スループット I/O をサポートするように設計されたシステムでのみ機能し、通常のパーソナル コンピューターでは機能しません。

コードの重要な部分は次のとおりです。完全な詳細については、リンクをたどってください

public class FileRead implements Runnable
{

private FileChannel _channel;
private long _startLocation;
private int _size;
int _sequence_number;

public FileRead(long loc, int size, FileChannel chnl, int sequence)
{
    _startLocation = loc;
    _size = size;
    _channel = chnl;
    _sequence_number = sequence;
}

@Override
public void run()
{
        System.out.println("Reading the channel: " + _startLocation + ":" + _size);

        //allocate memory
        ByteBuffer buff = ByteBuffer.allocate(_size);

        //Read file chunk to RAM
        _channel.read(buff, _startLocation);

        //chunk to String
        String string_chunk = new String(buff.array(), Charset.forName("UTF-8"));

        System.out.println("Done Reading the channel: " + _startLocation + ":" + _size);

}

//args[0] is path to read file
//args[1] is the size of thread pool; Need to try different values to fing sweet spot
public static void main(String[] args) throws Exception
{
    FileInputStream fileInputStream = new FileInputStream(args[0]);
    FileChannel channel = fileInputStream.getChannel();
    long remaining_size = channel.size(); //get the total number of bytes in the file
    long chunk_size = remaining_size / Integer.parseInt(args[1]); //file_size/threads


    //thread pool
    ExecutorService executor = Executors.newFixedThreadPool(Integer.parseInt(args[1]));

    long start_loc = 0;//file pointer
    int i = 0; //loop counter
    while (remaining_size >= chunk_size)
    {
        //launches a new thread
        executor.execute(new FileRead(start_loc, toIntExact(chunk_size), channel, i));
        remaining_size = remaining_size - chunk_size;
        start_loc = start_loc + chunk_size;
        i++;
    }

    //load the last remaining piece
    executor.execute(new FileRead(start_loc, toIntExact(remaining_size), channel, i));

    //Tear Down

}

}
于 2016-11-04T22:15:05.790 に答える