JavaでスレッドセーフなMap[K、Set[V]]実装を作成しようとしています。
- 一意のキーがマップに追加された場合は、新しいセットを作成(および追加)する必要があります
- 一意でないキーがマップに追加された場合は、既存のセットをに追加する必要があります。
- セットから値が削除されてセットが空になった場合は、メモリリークを回避するために、エントリをマップから削除する必要があります。
- 全体を同期させずにこれを解決したい
以下に失敗したテストケースを含めました。解決策がある場合はお知らせください。
package org.deleteme;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import junit.framework.Assert;
import org.junit.Test;
public class ConcurrentSetMapTest {
public static class ConcurrentSetMap<K, V> {
private final ConcurrentMap<K, Set<V>> map = new ConcurrentHashMap<K, Set<V>>();
public void add(K key, V value) {
Set<V> set = map.get(key);
if (set != null) {
set.add(value);
} else {
Set<V> candidateSet = createConcurrentSet(value);
set = map.putIfAbsent(key, candidateSet);
if (set != null) {
// candidate set not accepted, use existing
set.add(value);
}
}
}
public void remove(K key, V value) {
Set<V> set = map.get(key);
if (set != null) {
boolean removed = set.remove(value);
if (removed && set.isEmpty()) {
// this is not thread-safe and causes the test to fail
map.remove(key, set);
}
}
}
public boolean contains(K key, V value) {
Set<V> set = map.get(key);
if (set == null) {
return false;
}
return set.contains(value);
}
protected Set<V> createConcurrentSet(V element) {
Set<V> set = Collections.newSetFromMap(new ConcurrentHashMap<V, Boolean>());
set.add(element);
return set;
}
}
@Test
public void testThreadSafe() throws InterruptedException, ExecutionException {
ConcurrentSetMap<String, String> setMap = new ConcurrentSetMap<String, String>();
ExecutorService executors = Executors.newFixedThreadPool(4);
List<Future<?>> futures = new ArrayList<Future<?>>();
futures.add(executors.submit(new TestWorker(setMap, "key1")));
futures.add(executors.submit(new TestWorker(setMap, "key1")));
futures.add(executors.submit(new TestWorker(setMap, "key2")));
futures.add(executors.submit(new TestWorker(setMap, "key2")));
for (Future<?> future : futures) {
future.get();
}
}
public static class TestWorker implements Runnable {
ConcurrentSetMap<String, String> setMap;
String key;
public TestWorker(ConcurrentSetMap<String, String> setMap, String key) {
super();
this.setMap = setMap;
this.key = key;
}
public void run() {
int sampleSize = 100000;
for (int i = 0; i < sampleSize; ++ i) {
// avoid value clashes with other threads
String value = Thread.currentThread().getName() + i;
Assert.assertFalse("Should not exist before add", setMap.contains(key, value));
setMap.add(key, value);
Assert.assertTrue("Should exist after add", setMap.contains(key, value));
setMap.remove(key, value);
Assert.assertFalse("Should not exist after remove", setMap.contains(key, value));
}
}
}
}