18

必要なユースケースがあります

  • キーが ConcurrentHashMap に存在しない場合は、新しい値を挿入します
  • キーが ConcurrentHashMap に既に存在する場合、古い値を新しい値に置き換えます。新しい値は古い値から派生します (高価な操作ではありません)。

私は提供する次のコードを持っています:

public void insertOrReplace(String key, String value) {
        boolean updated = false;
        do {
            String oldValue = concurrentMap.get(key);
            if (oldValue == null) {
                oldValue = concurrentMap.putIfAbsent(key, value);
                if (oldValue == null) {
                    updated = true;
                }
            }
            if (oldValue != null) {
                final String newValue = recalculateNewValue(oldValue, value);
                updated = concurrentMap.replace(key, oldValue, newValue);
            }
        } while (!updated);
    }

それは正しく、スレッドセーフだと思いますか?

もっと簡単な方法はありますか?

4

4 に答える 4

9

あなたのコードと同等の以下のコードを使用して、少し短くすることができます。同時にアクセスする数千のスレッドで少しストレステストを行いました。期待どおりに動作し、多数の再試行 (ループ) が実行されます (明らかに、並行世界でのテストで正確性を証明することはできません)。

public void insertOrReplace(String key, String value) {
    for (;;) {
        String oldValue = concurrentMap.putIfAbsent(key, value);
        if (oldValue == null)
            return;

        final String newValue = recalculateNewValue(oldValue, value);
        if (concurrentMap.replace(key, oldValue, newValue))
            return;
    }
}
于 2012-04-24T21:56:49.703 に答える
2

Eclipse CollectionsMutableMapIterable.updateValueWith(K key, Function0<? extends V> factory, Function2<? super V,? super P,? extends V> function, P parameter)から使用できます。

マップに何もない場合、factory引数は初期値を作成します。引数は、function追加のパラメーターとともにマップ値に適用され、新しいマップ値が作成されます。それparameterは への最終引数として渡されますupdateValueWith()。キーがマップにない場合でも関数が呼び出されます。したがって、初期値は実際にはとfunctionの出力に適用されます。値を変更してはなりません。新しい値を返す必要があります。あなたの例では、マップ値は不変の文字列であるため、問題ありません。factoryparameterfunction

のような ConcurrentMapsorg.eclipse.collections.impl.map.mutable.ConcurrentHashMapでは、 の実装updateValueWith()もスレッドセーフでアトミックです。functionマップ値を変更しないことが重要です。そうしないと、スレッドセーフではなくなります。代わりに新しい値を返す必要があります。あなたの例では、マップ値は不変の文字列であるため、問題ありません。

メソッドrecalculateNewValue()が文字列の連結のみを行う場合は、次のように使用できますupdateValueWith()

Function0<String> factory = () -> "initial ";
Function2<String, String, String> recalculateNewValue = String::concat;

MutableMap<String, String> map = new ConcurrentHashMap<>();
map.updateValueWith("test", factory, recalculateNewValue, "append1 ");
Assert.assertEquals("initial append1 ", map.get("test"));
map.updateValueWith("test", factory, recalculateNewValue, "append2");
Assert.assertEquals("initial append1 append2", map.get("test"));

Java 8 のConcurrentMap.compute(K key, BiFunction remappingFunction)を使用して同じことを達成できますが、いくつかの欠点があります。

ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
map.compute("test", (key, oldValue) -> oldValue == null ? "initial append1 " : oldValue + "append1 ");
Assert.assertEquals("initial append1 ", map.get("test"));
map.compute("test", (key, oldValue) -> oldValue == null ? "initial append1 " : oldValue + "append2");
Assert.assertEquals("initial append1 append2", map.get("test"));
  • キーが存在しない場合を処理する別のファクトリがないため、ラムダの本体で値と初期値を処理する必要があります。
  • API は、ラムダの再利用に適していません。へのすべての呼び出しupdateValueWith()は同じラムダを共有しますが、へのすべての呼び出しcompute()はヒープに新しいガベージを作成します。

注: 私は Eclipse Collections のコミッターです。

于 2014-07-10T19:14:29.870 に答える
2

あなたの方法はスレッドセーフのようです。ConcurrentHashMap のパフォーマンス上の利点が必要ない場合は、代わりに通常の HashMap を使用し、それへのすべてのアクセスを同期することを検討してください。あなたのメソッドは AtomicInteger.getAndSet(int) に似ているので、問題ないはずです。あなたのために作業を行うためのライブラリ呼び出しを探していない限り、これを行うためのより簡単な方法があるとは思えません。

于 2012-04-24T20:19:48.317 に答える
2

それは正しくないと思います。私が理解しているように、merge() メソッドはその仕事に適したツールです。私は現在同じ問題を抱えており、結果を確認するためにちょっとしたテストを書きました。

このテストでは、100 個のワーカーを開始します。それらのそれぞれは、マップ内の値を 100 回インクリメントしています。したがって、期待される結果は 10000 になります。

労働者には2つのタイプがあります。置換アルゴリズムを使用するものと、マージを使用するもの。テストは、異なる実装で 2 回実行されます。

import java.util.concurrent.ArrayBlockingQueue;                                                                     
import java.util.concurrent.ConcurrentHashMap;                                                                      
import java.util.concurrent.ConcurrentMap;                                                                          
import java.util.concurrent.ExecutorService;                                                                        
import java.util.concurrent.ThreadPoolExecutor;                                                                     
import java.util.concurrent.TimeUnit;                                                                               

public class ConcurrentMapTest                                                                                      
{                                                                                                                   

   private static ConcurrentMap<String, Integer> map = new ConcurrentHashMap<>();                                   

   private final class ReplaceWorker implements Runnable                                                            
   {                                                                                                                
      public void run()                                                                                             
      {                                                                                                             
         for(int i = 0; i<100; i++)                                                                                 
         {                                                                                                          
            Integer putIfAbsent = map.putIfAbsent("key", Integer.valueOf(1));                                       
            if(putIfAbsent == null)                                                                                 
               return;                                                                                              
            map.replace("key", putIfAbsent + 1);                                                                    
         }                                                                                                          
      }                                                                                                             
   }                                                                                                                

   private final class MergeWorker implements Runnable                                                              
   {                                                                                                                
      public void run()                                                                                             
      {                                                                                                             
         for(int i = 0; i<100; i++)                                                                                 
         {                                                                                                          
            map.merge("key", Integer.valueOf(1), (ov, nv) -> {                                                      
               return ov + 1;                                                                                       
            });                                                                                                     
         }                                                                                                          
      }                                                                                                             
   }                                                                                                                

   public MergeWorker newMergeWorker()                                                                              
   {                                                                                                                
      return new MergeWorker();                                                                                     
   }                                                                                                                

   public ReplaceWorker newReplaceWorker()                                                                          
   {                                                                                                                
      return new ReplaceWorker();                                                                                   
   }                                                                                                                

   public static void main(String[] args)                                                                           
   {                                                                                                                
      map.put("key", 1);                                                                                            
      ConcurrentMapTest test = new ConcurrentMapTest();                                                             
      ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQu
      for(int i = 0; i<100; i++)                                                                                    
      {                                                                                                             
         threadPool.submit(test.newMergeWorker());                                                                  
      }                                                                                                             
      awaitTermination(threadPool);                                                                                 
      System.out.println(test.map.get("key"));                                                                      

      map.put("key", 1);                                                                                            
      threadPool = new ThreadPoolExecutor(10, 10, 100, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000));      
      for(int i = 0; i<100; i++)                                                                                    
      {                                                                                                             
         threadPool.submit(test.newReplaceWorker());                                                                
      }                                                                                                             
      awaitTermination(threadPool);                                                                                 
      System.out.println(test.map.get("key"));                                                                      
   }                                                                                                                

   private static void awaitTermination(ExecutorService threadPool)                                                 
   {                                                                                                                
      try                                                                                                           
      {                                                                                                             
         threadPool.shutdown();                                                                                     
         boolean awaitTermination = threadPool.awaitTermination(1, TimeUnit.SECONDS);                               
         System.out.println("terminted successfull: " + awaitTermination);                                          
      }                                                                                                             
      catch (InterruptedException e)                                                                                
      {                                                                                                             
         // TODO Auto-generated catch block                                                                         
         e.printStackTrace();                                                                                       
      }                                                                                                             
   }                                                                                                                
}                                                                                          
結果:
正常終了: true
10000
正常終了: true
1743年

問題は、あなたのケースでは get と put の間にギャップがあるため、マップへの同時アクセスで結果が上書きされることです。ドキュメントには何も記載されていませんが、マージではアトミック操作です。

于 2014-06-30T08:57:56.587 に答える