11

ジョブを処理するためのワーカー スレッドを持つ Java アプリケーションがあります。ワーカーは次のような結果オブジェクトを生成します。

class WorkerResult{
    private final Set<ResultItems> items;
    public Worker(Set<ResultItems> pItems){
         items = pItems;
    }
}

ワーカーが終了すると、次の操作を行います。

 ...
 final Set<ResultItems> items = new SomeNonThreadSafeSetImplSet<ResultItems>();
 for(Item producedItem : ...){
      items.add(item);
 }
 passToGatherThread(items);

ここでのitemsセットは、一種の「作業単位」です。このpassToGatherThreadメソッドは、itemsセットを収集スレッドに渡しますが、実行時に存在するのは 1 つだけです。

1 つのスレッド (Gather-thread) だけがセットを読み取るため、競合状態は発生しないため、ここでは同期は必要ありませんitems。AFAICS、セットはスレッドセーフではないため、Gather-thread はすべてのアイテムを表示しない場合がありますよね?

passToGatherThreadたとえば、サードパーティのライブラリであるため、同期できないとします。私が基本的に恐れているのは、キャッシングや VM の最適化などのために、収集スレッドがすべてのアイテムを認識していないことです。ここで問題が発生します。収集スレッドが「認識」するように、アイテム セットをスレッドセーフな方法で渡す方法適切なアイテムのセット?

4

4 に答える 4

2

ここには同期の問題はないようです。passToGatherThread ごとに新しい Set オブジェクトを作成し、セットを変更した後にそれを行います。オブジェクトが失われることはありません。

セット (およびほとんどの Java コレクション) は、コレクションが変更されていない限り、多数のスレッドから同時にアクセスできます。それCollections.unmodifiableCollectionがそのためです。

上記のpassToGatherThreadメソッドは他のスレッドとの通信として機能するため、何らかの同期を使用する必要があります。各同期により、スレッド間のメモリの一貫性が保証されます。

また、渡されたコレクション内のオブジェクトへのすべての書き込みは、他のスレッドに渡されるに行われることに注意してください。メモリがスレッドのローカル キャッシュにコピーされた場合でも、他のスレッドと同じ変更されていない値が保持されます。

于 2013-02-11T13:08:47.143 に答える
1

JavaSetWorkerResult. 例を参照してください。

別のオプションは、を使用することCollections.synchronizedSet()です。

于 2013-02-11T13:09:38.100 に答える
1

私はこの質問について何度も考え (そして議論し)、別の答えにたどり着きました。

同期されたコレクションを渡すことは、そのコレクションに対する後続の各操作が同期されるため、効率の点で良くありません。多くの操作がある場合、ハンディキャップになる可能性があります。

要点:いくつかの仮定を立てましょう(私は同意しません):

  • 言及されたpassToGatherThread方法は確かに安全ではありませんが、ありそうにないようです
  • passToGatherThreadコンパイラは、コレクションがいっぱいになる前に が呼び出されるように、コード内のイベントを並べ替えることができます

ギャザラー メソッドに渡されるコレクションの準備が完了していることを確認するための最も簡単で、クリーンで、おそらく最も効率的な方法は、次のようにコレクション プッシュを同期ブロックに入れることです。

synchronized(items) {
  passToGatherThread(items);
}

こうすることで、コレクションが渡される前にメモリ同期と有効な事前発生シーケンスが保証され、すべてのオブジェクトが正しく渡されるようになります。

于 2013-02-13T15:57:38.470 に答える
0

ワーカーは callable を実装し、WorkerResult を返します。

class Worker implements Callable<WorkerResult> {
    private WorkerInput in;

    public Worker(WorkerInput in) {
        this.in = in;
    }

    public WorkerResult call() {
        // do work here
    }
}

次に、ExecutorService を使用してスレッド プールを管理し、Future を使用して結果を収集します。

public class PooledWorkerController {

    private static final int MAX_THREAD_POOL = 3;
    private final ExecutorService pool = 
       Executors.newFixedThreadPool(MAX_THREAD_POOL);

    public Set<ResultItems> process(List<WorkerInput> inputs) 
           throws InterruptedException, ExecutionException{         
        List<Future<WorkerResult>> submitted = new ArrayList<>();
        for (WorkerInput in : inputs) {
            Future<WorkerResult> future = pool.submit(new Worker(in));
            submitted.add(future);
        }
        Set<ResultItems> results = new HashSet<>();
        for (Future<WorkerResult> future : submitted) {
            results.addAll(future.get().getItems());
        }
        return results;
    }
}
于 2013-02-12T01:16:45.120 に答える