2

JavaでスレッドセーフなMap[K、Set[V]]実装を作成しようとしています。

  1. 一意のキーがマップに追加された場合は、新しいセットを作成(および追加)する必要があります
  2. 一意でないキーがマップに追加された場合は、既存のセットをに追加する必要があります。
  3. セットから値が削除されてセットが空になった場合は、メモリリークを回避するために、エントリをマップから削除する必要があります。
  4. 全体を同期させずにこれを解決したい

以下に失敗したテストケースを含めました。解決策がある場合はお知らせください。

package org.deleteme;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import junit.framework.Assert;

import org.junit.Test;

public class ConcurrentSetMapTest {
    public static class ConcurrentSetMap<K, V> {
        private final ConcurrentMap<K, Set<V>> map = new ConcurrentHashMap<K, Set<V>>();

        public void add(K key, V value) {
            Set<V> set = map.get(key);
            if (set != null) {
                set.add(value);
            } else {
                Set<V> candidateSet = createConcurrentSet(value);
                set = map.putIfAbsent(key, candidateSet);
                if (set != null) {
                    // candidate set not accepted, use existing
                    set.add(value);
                }
            }
        }

        public void remove(K key, V value) {
            Set<V> set = map.get(key);
            if (set != null) {
                boolean removed = set.remove(value);
                if (removed && set.isEmpty()) {
                    // this is not thread-safe and causes the test to fail
                    map.remove(key, set);
                }
            }
        }

        public boolean contains(K key, V value) {
            Set<V> set = map.get(key);
            if (set == null) {
                return false;
            }
            return set.contains(value);
        }

        protected Set<V> createConcurrentSet(V element) {
            Set<V> set = Collections.newSetFromMap(new ConcurrentHashMap<V, Boolean>());
            set.add(element);
            return set;
        }
    }

    @Test
    public void testThreadSafe() throws InterruptedException, ExecutionException {
        ConcurrentSetMap<String, String> setMap = new ConcurrentSetMap<String, String>();
        ExecutorService executors = Executors.newFixedThreadPool(4);
        List<Future<?>> futures = new ArrayList<Future<?>>();

        futures.add(executors.submit(new TestWorker(setMap, "key1")));
        futures.add(executors.submit(new TestWorker(setMap, "key1")));
        futures.add(executors.submit(new TestWorker(setMap, "key2")));
        futures.add(executors.submit(new TestWorker(setMap, "key2")));

        for (Future<?> future : futures) {
            future.get();
        }
    }

    public static class TestWorker implements Runnable {
        ConcurrentSetMap<String, String> setMap;
        String key;

        public TestWorker(ConcurrentSetMap<String, String> setMap, String key) {
            super();
            this.setMap = setMap;
            this.key = key;
        }

        public void run() {
            int sampleSize = 100000;
            for (int i = 0; i < sampleSize; ++ i) {
                // avoid value clashes with other threads
                String value = Thread.currentThread().getName() + i;

                Assert.assertFalse("Should not exist before add", setMap.contains(key, value));
                setMap.add(key, value);
                Assert.assertTrue("Should exist after add", setMap.contains(key, value));
                setMap.remove(key, value);
                Assert.assertFalse("Should not exist after remove", setMap.contains(key, value));
            }
        }
    }
}
4

5 に答える 5

4

そのようなマップを作成しないでください。他の誰かのものを使用してください。HashMultimapなどの Guava のSetMultimap実装の 1 つを使用し、 Multimaps.synchronizedSetMultimapを使用して同期します。

于 2012-08-30T11:36:08.333 に答える
2

私はなんとか私の問題を解決することができました:)

私は最初の投稿で、コレクションからの高速読み取りが必要であり、書き込み速度についてあまり心配していないことを言及しませんでした。このため、書き込みアクセスを同期するが、同期読み取りアクセスを必要としないソリューションを考え出しました。以下のコードは私のテストケースに合格しています。

皆様のご提案に感謝いたします。

public static class ConcurrentSetMap<K, V> {
    private final ConcurrentMap<K, Set<V>> map = new ConcurrentHashMap<K, Set<V>>();

    public synchronized void add(K key, V value) {
        Set<V> set = map.get(key);
        if (set != null) {
            set.add(value);
        } else {
            map.put(key, createConcurrentSet(value));
        }
    }

    public synchronized void remove(K key, V value) {
        Set<V> set = map.get(key);
        if (set != null) {
            set.remove(value);
            if (set.isEmpty()) {
                map.remove(key);
            }
        }
    }

    public boolean contains(K key, V value) {
        return get(key).contains(value);
    }

    public Set<V> get(K key) {
        Set<V> set = map.get(key);
        return set == null ? Collections.<V> emptySet() : set;
    }

    protected Set<V> createConcurrentSet(V value) {
        Set<V> set = Collections.newSetFromMap(new ConcurrentHashMap<V, Boolean>());
        set.add(value);
        return set;
    }
} 
于 2012-08-31T09:47:37.120 に答える
2

完全に並行してスレッドセーフな実装を次に示します。

public class ConcurrentSetMap<K,V> {

  private final ConcurrentMap<K, Set<V>> _map = new ConcurrentHashMap<K, Set<V>>();

  public void add(K key, V value) {
    Set<V> curSet = _map.get(key);
    while(true) {

      if((curSet != null) && curSet.contains(value)) {
        break;
      }

      Set<V> newSet = new HashSet<V>();
      newSet.add(value);

      if(curSet == null) {

        curSet = _map.putIfAbsent(key, newSet);
        if(curSet != null) {
          continue;
        }

      } else {

        newSet.addAll(curSet);
        if(!_map.replace(key, curSet, newSet)) {
          curSet = _map.get(key);
          continue;
        }
      }

      break;
    }
  }

  public void remove(K key, V value) {
    Set<V> curSet = _map.get(key);

    while(true) {
      if((curSet == null) || !curSet.contains(value))  {
        break;
      }

      if(curSet.size() == 1) {

        if(!_map.remove(key, curSet)) {
          curSet = _map.get(key);
          continue;
        }

      } else {

        Set<V> newSet = new HashSet<V>();
        newSet.addAll(curSet);
        newSet.remove(value);
        if(!_map.replace(key, curSet, newSet)) {
          curSet = _map.get(key);
          continue;
        }
      }

      break;
    }
  }

  public boolean contains(K key, V value) {
    Set<V> set = _map.get(key);
    return set != null && set.contains(value);
  }
}

タイミングを@PeterLawreyの回答(私のボックス)と比較すると、彼の所要時間は2.9秒、これには1.4秒かかります。

于 2012-08-30T15:40:21.730 に答える
1

まとめてアトミックにする必要がある複数の操作を実行しているため、ロックを使用する必要があります。

public class SynchronousMultiMap<K, V> {
    private final Map<K, Set<V>> map = new LinkedHashMap<K, Set<V>>();

    public synchronized void add(K key, V value) {
        Set<V> set = map.get(key);
        if (set == null)
            map.put(key, set = new LinkedHashSet<V>());
        set.add(value);
    }

    public synchronized void remove(K key, V value) {
        Set<V> set = map.get(key);
        if (set == null) return;
        set.remove(value);
        if (set.isEmpty()) map.remove(key);
    }

    public synchronized boolean contains(K key, V value) {
        Set<V> set = map.get(key);
        return set != null && set.contains(value);
    }

    @Test
    public void testThreadSafe() throws ExecutionException, InterruptedException {
        ExecutorService executors = Executors.newFixedThreadPool(3);
        List<Future<?>> futures = new ArrayList<Future<?>>();
        SynchronousMultiMap<String, Integer> setMap = new SynchronousMultiMap<String, Integer>();
        int sampleSize = 1000000;

        String[] keys = "key1,key2,key3,key4".split(",");
        for (int i = 0; i < 3; i++)
            futures.add(executors.submit(new TestWorker(setMap, keys, sampleSize, i)));

        executors.shutdown();
        for (Future<?> future : futures) {
            future.get();
        }
    }

    static class TestWorker implements Runnable {
        final SynchronousMultiMap<String, Integer> setMap;
        final String[] keys;
        final int sampleSize;
        final int value;

        public TestWorker(SynchronousMultiMap<String, Integer> setMap, String[] keys, int sampleSize, int value) {
            super();
            this.setMap = setMap;
            this.keys = keys;
            this.sampleSize = sampleSize;
            this.value = value;
        }

        public void run() {
            for (int i = 0; i < sampleSize; i += keys.length) {
                for (String key : keys) {
                    boolean contains = setMap.contains(key, value);
                    if (contains)
                        Assert.assertFalse("Should not exist before add", contains);
                    setMap.add(key, value);
                    boolean contains2 = setMap.contains(key, value);
                    if (!contains2)
                        Assert.assertTrue("Should exist after add", contains2);
                    setMap.remove(key, value);
                    boolean contains3 = setMap.contains(key, value);
                    if (contains3)
                        Assert.assertFalse("Should not exist after remove", contains3);
                }
            }
        }
    }
}

実行には 0.35 秒かかります。を使用するsampleSize=1000000と、8 秒未満かかります。

于 2012-08-30T11:34:25.607 に答える
1

ConcurrentMap使用ConcurrentSkipListSet

ConcurrentMap.putIfAbsent()存在しない場合、キーのセットを追加するために呼び出します。

を呼び出し ConcurrentMap.remove( key, emptySet )ます。ここemptySetで、 は空のConcurrentSkipListSetです。

に対応する現在の値keyが空の場合、削除されます。

于 2014-11-03T22:21:02.227 に答える