9

AtomicReferenceオブジェクトのリストがあるとします:

AtomicReference<List<?>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>());

スレッド Aは、このリストに要素を追加します。batch.get().add(o);

後で、スレッド Bがリストを取得し、たとえば、DB に格納します。insertBatch(batch.get());

書き込み時 (スレッド A) と読み取り時 (スレッド B) に追加の同期を実行して、スレッド B が A が残した方法でリストを確認できるようにする必要がありますか? それとも、これは AtomicReference によって処理されますか?

言い換えれば、可変オブジェクトへの AtomicReference があり、1 つのスレッドがそのオブジェクトを変更した場合、他のスレッドはこの変更をすぐに認識しますか?

編集:

たぶん、いくつかのサンプルコードが適切です:

public void process(Reader in) throws IOException {
    List<Future<AtomicReference<List<Object>>>> tasks = new ArrayList<Future<AtomicReference<List<Object>>>>();
    ExecutorService exec = Executors.newFixedThreadPool(4);

    for (int i = 0; i < 4; ++i) {
        tasks.add(exec.submit(new Callable<AtomicReference<List<Object>>>() {
            @Override public AtomicReference<List<Object>> call() throws IOException {

                final AtomicReference<List<Object>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>(batchSize));

                Processor.this.parser.parse(in, new Parser.Handler() {
                    @Override public void onNewObject(Object event) {
                            batch.get().add(event);

                            if (batch.get().size() >= batchSize) {
                                dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));
                            }
                    }
                });

                return batch;
            }
        }));
    }

    List<Object> remainingBatches = new ArrayList<Object>();

    for (Future<AtomicReference<List<Object>>> task : tasks) {
        try {
            AtomicReference<List<Object>> remainingBatch = task.get();
            remainingBatches.addAll(remainingBatch.get());
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();

            if (cause instanceof IOException) {
                throw (IOException)cause;
            }

            throw (RuntimeException)cause;
        }
    }

    // these haven't been flushed yet by the worker threads
    if (!remainingBatches.isEmpty()) {
        dao.insertBatch(remainingBatches);
    }
}

ここでは、4 つのワーカー スレッドを作成してテキストを解析します (これはメソッドのReader inパラメーターprocess()です)。各ワーカーは、解析した行をバッチに保存し、満杯になるとバッチをフラッシュします ( dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));)。

テキストの行数はバッチ サイズの倍数ではないため、最後のオブジェクトは、満杯ではないため、フラッシュされないバッチになります。したがって、これらの残りのバッチは、メイン スレッドによって挿入されます。

AtomicReference.getAndSet()完全なバッチを空のバッチに置き換えるために使用します。このプログラムはスレッドに関して正しいですか?

4

4 に答える 4

11

ええと...それは実際にはこのようには機能しません。AtomicReference参照自体がスレッド全体で表示されることを保証します。つまり、元の参照とは異なる参照を割り当てると、更新が表示されます。参照が指しているオブジェクトの実際の内容については保証しません。

したがって、リストの内容に対する読み取り/書き込み操作には、個別の同期が必要です。

編集:したがって、更新されたコードと投稿したコメントから判断すると、ローカル参照をに設定するvolatileだけで、可視性を確保できます。

于 2012-02-21T13:22:14.683 に答える
1

ここのすべてのコードを忘れて、正確な質問はこれだと思います:

書き込み時 (スレッド A) と読み取り時 (スレッド B) に追加の同期を実行して、スレッド B が A が残した方法でリストを確認できるようにする必要がありますか? それとも、これは AtomicReference によって処理されますか?

したがって、それに対する正確な応答は次のとおりです。はい、アトミックは可視性を処理します。そして、それは私の意見ではなく、JDKのドキュメントの意見です:

The Java Language Specification, Third Edition (17.4 Memory Model) に記載されているように、アトミックのアクセスと更新のメモリ効果は、通常、volatile の規則に従います。

これが役立つことを願っています。

于 2012-08-14T16:45:54.040 に答える
0

AtomicReferenceリストへの参照を支援するだけで、リスト自体には何もしません。より具体的には、あなたのシナリオでは、プロデューサーが項目をリストに追加している間にコンシューマーがリストを取得した場合にシステムに負荷がかかると、ほぼ確実に問題が発生します。

.を使用する必要があるように聞こえますBlockingQueue。プロデューサーがコンシューマーよりも高速である場合は、メモリ フットプリントを制限し、すべての競合をキューに処理させることができます。

何かのようなもの:

ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (50);

// ... Producer
queue.put(o);

// ... Consumer
List<Object> queueContents = new ArrayList<Object> ();
// Grab everything waiting in the queue in one chunk. Should never be more than 50 items.
queue.drainTo(queueContents);

追加した

使用しているアーキテクチャを指摘してくれた @Tudor に感謝します。... 私はそれがかなり奇妙であることを認めなければなりません。AtomicReference私が見る限り、あなたはまったく必要ありません。各スレッドは、それが置き換えられる時点でArrayList渡されるまで独自のものを所有するため、競合はどこにもありません。dao

1 つの で 4 つのパーサーを作成することについて少し心配していますReader。各パーサーが他のパーサーに影響を与えないようにする何らかの方法があることを願っています。

上記のコードで説明したように、個人的には何らかの形式のプロデューサー/コンシューマー パターンを使用します。このようなものかもしれません。

static final int PROCESSES = 4;
static final int batchSize = 10;

public void process(Reader in) throws IOException, InterruptedException {

  final List<Future<Void>> tasks = new ArrayList<Future<Void>>();
  ExecutorService exec = Executors.newFixedThreadPool(PROCESSES);
  // Queue of objects.
  final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (batchSize * 2);
  // The final object to post.
  final Object FINISHED = new Object();

  // Start the producers.
  for (int i = 0; i < PROCESSES; i++) {
    tasks.add(exec.submit(new Callable<Void>() {
      @Override
      public Void call() throws IOException {

        Processor.this.parser.parse(in, new Parser.Handler() {
          @Override
          public void onNewObject(Object event) {
            queue.add(event);
          }
        });
        // Post a finished down the queue.
        queue.add(FINISHED);
        return null;
      }
    }));
  }

  // Start the consumer.
  tasks.add(exec.submit(new Callable<Void>() {
    @Override
    public Void call() throws IOException {
      List<Object> batch = new ArrayList<Object>(batchSize);
      int finishedCount = 0;
      // Until all threads finished.
      while ( finishedCount < PROCESSES ) {
        Object o = queue.take();
        if ( o != FINISHED ) {
          // Batch them up.
          batch.add(o);
          if ( batch.size() >= batchSize ) {
            dao.insertBatch(batch);
            // If insertBatch takes a copy we could merely clear it.
            batch = new ArrayList<Object>(batchSize);
          }
        } else {
          // Count the finishes.
          finishedCount += 1;
        }
      }
      // Finished! Post any incopmplete batch.
      if ( batch.size() > 0 ) {
        dao.insertBatch(batch);
      }
      return null;
    }
  }));

  // Wait for everything to finish.
  exec.shutdown();
  // Wait until all is done.
  boolean finished = false;
  do {
    try {
      // Wait up to 1 second for termination.
      finished = exec.awaitTermination(1, TimeUnit.SECONDS);
    } catch (InterruptedException ex) {
    }
  } while (!finished);
}
于 2012-02-21T16:41:51.277 に答える
0

Tudorの回答に追加する:それ自体をスレッドセーフにするArrayList、要件に応じてさらに大きなコード ブロックにする必要があります。

スレッドセーフArrayListを回避できる場合は、次のように「装飾」できます。

batch = java.util.Collections.synchronizedList(new ArrayList<Object>());

しかし、心に留めておいてください: このような「単純な」構造でさえ、これではスレッドセーフではありません:

Object o = batch.get(batch.size()-1);
于 2012-02-21T13:33:27.780 に答える