AtomicReference
オブジェクトのリストがあるとします:
AtomicReference<List<?>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>());
スレッド Aは、このリストに要素を追加します。batch.get().add(o);
後で、スレッド Bがリストを取得し、たとえば、DB に格納します。insertBatch(batch.get());
書き込み時 (スレッド A) と読み取り時 (スレッド B) に追加の同期を実行して、スレッド B が A が残した方法でリストを確認できるようにする必要がありますか? それとも、これは AtomicReference によって処理されますか?
言い換えれば、可変オブジェクトへの AtomicReference があり、1 つのスレッドがそのオブジェクトを変更した場合、他のスレッドはこの変更をすぐに認識しますか?
編集:
たぶん、いくつかのサンプルコードが適切です:
public void process(Reader in) throws IOException {
List<Future<AtomicReference<List<Object>>>> tasks = new ArrayList<Future<AtomicReference<List<Object>>>>();
ExecutorService exec = Executors.newFixedThreadPool(4);
for (int i = 0; i < 4; ++i) {
tasks.add(exec.submit(new Callable<AtomicReference<List<Object>>>() {
@Override public AtomicReference<List<Object>> call() throws IOException {
final AtomicReference<List<Object>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>(batchSize));
Processor.this.parser.parse(in, new Parser.Handler() {
@Override public void onNewObject(Object event) {
batch.get().add(event);
if (batch.get().size() >= batchSize) {
dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));
}
}
});
return batch;
}
}));
}
List<Object> remainingBatches = new ArrayList<Object>();
for (Future<AtomicReference<List<Object>>> task : tasks) {
try {
AtomicReference<List<Object>> remainingBatch = task.get();
remainingBatches.addAll(remainingBatch.get());
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException)cause;
}
throw (RuntimeException)cause;
}
}
// these haven't been flushed yet by the worker threads
if (!remainingBatches.isEmpty()) {
dao.insertBatch(remainingBatches);
}
}
ここでは、4 つのワーカー スレッドを作成してテキストを解析します (これはメソッドのReader in
パラメーターprocess()
です)。各ワーカーは、解析した行をバッチに保存し、満杯になるとバッチをフラッシュします ( dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));
)。
テキストの行数はバッチ サイズの倍数ではないため、最後のオブジェクトは、満杯ではないため、フラッシュされないバッチになります。したがって、これらの残りのバッチは、メイン スレッドによって挿入されます。
AtomicReference.getAndSet()
完全なバッチを空のバッチに置き換えるために使用します。このプログラムはスレッドに関して正しいですか?