私はのようなデータ構造を持っていますMap<Key, Set<Value>>
。次のシナリオを実装しようとしています。
- いくつかのプロデューサーがこのマップを更新して、既存のキーまたは新しいキーのいずれかに新しい値を追加します(この場合、新しいマップエントリが作成されます)。
- コンシューマーは、マップから限られた数のエントリを定期的にポーリングし、それらをプロセッサに渡します。
これが私の見解です:
private static final MAX_UPDATES_PER_PASS = 100;
private final ConcurrentHashMap<Key, Set<Value>> updates = new ConcurrentHashMap<Key, Set<Value>>();
@Override
public void updatesReceived(Key key, Set<Value> values) {
Set<Value> valuesSet = updates.get(key);
if (valuesSet == null){
valuesSet = Collections.newSetFromMap(new ConcurrentHashMap<Value, Boolean>());
Set<Value> previousValues = updates.putIfAbsent(key, valuesSet);
if (previousValues != null){
valuesSet = previousValues;
}
}
valuesSet.addAll(values);
}
private class UpdatesProcessor implements Runnable {
@Override
public void run() {
int updatesProcessed = 0;
Map<Key, Set<Value>> valuesToProcess = new HashMap<Key, Set<Value>>();
Iterator<Map.Entry<Key, Set<Value>>> iterator = updates.entrySet().iterator();
while(iterator.hasNext() && updatesProcessed < MAX_UPDATES_PER_PASS) {
Map.Entry<Key, Set<Value>> next = iterator.next();
iterator.remove(); // <-- here
Key key = next.getKey();
Set<Value> values = valuesToProcess.get(key);
if (values == null){
values = new HashSet<Value>();
valuesToProcess.put(key, values);
}
values.addAll(next.getValue());
updatesProcessed++;
}
if (!valuesToProcess.isEmpty()){
process(valuesToProcess);
}
}
}
このメソッドupdatesRecevied()
は、任意のスレッドからの値のプロデューサーによって呼び出されます。はUpdatesProcessor
を介して定期的に実行されるようにスケジュールされてScheduledExecutorService
いるため、任意のスレッドから呼び出すこともできます。
すべての値は1回だけ処理する必要があります。これ以上でもそれ以下でもありません。値が遅かれ早かれ処理されるかどうかは気にしませんが、最終的には処理されるはずです。
ワイルドスピードにしたいので、synchronize
すべてをやりたくありません。
イテレータを含むこの不器用なコードは、のUpdatesProcessor
ようなものがあれば簡単に達成できる1つの目標を果たしますConcurrentHashMap.poll()
。しかし、ありません。
だから、質問に。まず、これは機能することが保証されていますか?呼び出しiterator.remove()
た後、エントリはマップから削除され、すべての追加の値は新しいエントリのセットに移動しますよね?
そして第二に、私は物事を複雑にしていますか?この種のシナリオ(のデータ構造)に対する一般的なアプローチはありますか?