1

私はのようなデータ構造を持っていますMap<Key, Set<Value>>。次のシナリオを実装しようとしています。

  1. いくつかのプロデューサーがこのマップを更新して、既存のキーまたは新しいキーのいずれかに新しい値を追加します(この場合、新しいマップエントリが作成されます)。
  2. コンシューマーは、マップから限られた数のエントリを定期的にポーリングし、それらをプロセッサに渡します。

これが私の見解です:

private static final MAX_UPDATES_PER_PASS = 100;

private final ConcurrentHashMap<Key, Set<Value>> updates = new ConcurrentHashMap<Key, Set<Value>>();

@Override
public void updatesReceived(Key key, Set<Value> values) {
    Set<Value> valuesSet = updates.get(key);
    if (valuesSet == null){
        valuesSet = Collections.newSetFromMap(new ConcurrentHashMap<Value, Boolean>());
        Set<Value> previousValues = updates.putIfAbsent(key, valuesSet);
        if (previousValues != null){
            valuesSet = previousValues;
        }
    }
    valuesSet.addAll(values);
}

private class UpdatesProcessor implements Runnable {

    @Override
    public void run() {
        int updatesProcessed = 0;
        Map<Key, Set<Value>> valuesToProcess = new HashMap<Key, Set<Value>>();
        Iterator<Map.Entry<Key, Set<Value>>> iterator = updates.entrySet().iterator();
        while(iterator.hasNext() && updatesProcessed < MAX_UPDATES_PER_PASS) {
            Map.Entry<Key, Set<Value>> next = iterator.next();
            iterator.remove(); // <-- here 
            Key key = next.getKey();
            Set<Value> values = valuesToProcess.get(key);
            if (values == null){
                values = new HashSet<Value>();
                valuesToProcess.put(key, values);
            }
            values.addAll(next.getValue());
            updatesProcessed++;
        }
        if (!valuesToProcess.isEmpty()){
            process(valuesToProcess);
        }
    }
}

このメソッドupdatesRecevied()は、任意のスレッドからの値のプロデューサーによって呼び出されます。はUpdatesProcessorを介して定期的に実行されるようにスケジュールされてScheduledExecutorServiceいるため、任意のスレッドから呼び出すこともできます。

すべての値は1回だけ処理する必要があります。これ以上でもそれ以下でもありません。値が遅かれ早かれ処理されるかどうかは気にしませんが、最終的には処理されるはずです。

ワイルドスピードにしたいので、synchronizeすべてをやりたくありません。

イテレータを含むこの不器用なコードは、のUpdatesProcessorようなものがあれば簡単に達成できる1つの目標を果たしますConcurrentHashMap.poll()。しかし、ありません。

だから、質問に。まず、これは機能することが保証されていますか?呼び出しiterator.remove()た後、エントリはマップから削除され、すべての追加の値は新しいエントリのセットに移動しますよね?

そして第二に、私は物事を複雑にしていますか?この種のシナリオ(のデータ構造)に対する一般的なアプローチはありますか?

4

0 に答える 0