7

次のクラスがあります。ConcurrentHashMapを使用しています。マップに書き込むスレッドと、5分ごとにマップにデータを保存するタイマーがあります。マップにエントリを書き込むときにputIfAbsent()を使用することで、スレッドセーフを実現できます。ただし、そこから読み取り、clear()メソッドですべてのエントリを削除する場合、マップの内容を読み取って削除している間は、他のスレッドがマップに書き込みを行わないようにする必要があります。明らかに、私のコードは、synchronized(lock){}を使用してもスレッドセーフではありません。b/ c saveEntries()のロックを所有するスレッドは、log()メソッドでマップに書き込むスレッドと必ずしも同じではありません。同じロックオブジェクトを使用してlog()のコード全体をロックしない限り!

外部ロックによる同期を強制せずにスレッドセーフを実現する他の方法はありますか?どんな助けでも大歓迎です。

public class Logging {

private static Logging instance;    
private static final String vendor1 = "vendor1";
private static final String vendor2 = "vendor2";    
private static long delay = 5 * 60 * 1000;

private ConcurrentMap<String, Event> vendor1Calls = new ConcurrentHashMap<String, Event>();
private ConcurrentMap<String, Event> vendor2Calls = new ConcurrentHashMap<String, Event>();

private Timer timer;    
private final Object lock = new Object();

private Logging(){
    timer = new Timer();                
    timer.schedule(new TimerTask() {
        public void run() {
            try {
                saveEntries();
            } catch (Throwable t) {
                timer.cancel();
                timer.purge();
            }
        }       
    }, 0, delay);
}

public static synchronized Logging getInstance(){     
    if (instance == null){
        instance = new Logging();
    }
    return instance;
 }

public void log(){      
    ConcurrentMap<String, Event> map;
    String key = "";        

    if (vendor1.equalsIgnoreCase(engine)){
        map = vendor1Calls;
    }else if(vendor2.equalsIgnoreCase(engine)){  
        map = vendor2Calls;
    }else{
        return;
    }       


    key = service + "." + method;
// It would be the code if I use a regular HashMap instead of ConcurrentHashMap
    /*Event event = map.get(key);       

    // Map does not contain this service.method, create an Event for the first     time.
    if(event == null){
        event = new Event();            
        map.put(key, event);

        // Map already contains this key, just adjust the numbers.
    }else{
        // Modify the object fields
    }*/
    //}

    // Make it thread-safe using CHM
    Event newEvent = new Event();
    Event existingEvent= map.putIfAbsent(key, newEvent); 

    if(existingEvent!=null && existingEvent!=newEvent){
        // Modify the object fields
}       

private void saveEntries(){

    Map<String, List<Event>> engineCalls = null;
    try {           

        engineCalls = new HashMap<String, List<Event>>();
        List<Event> events = null;

// How can I achieve therad safety here w/o applying any lock?
        //synchronized(lock){
            if(!vendor1Calls.isEmpty()){
                events = new ArrayList<Event>();
                events.addAll(vendor1Calls.values());
                engineCalls.put(vendor1, events);
                vendor1Calls.clear();
            }
            if(!vendor2Calls.isEmpty()){
                events = new ArrayList<Event>();
                events.addAll(vendor2Calls.values());
                engineCalls.put(vendor2, events);
                vendor2Calls.clear();
            }
        //}

// logICalls() saves the events in the DB.          
        DBHandle.logCalls(engineCalls);
    } catch (Throwable t) {         
    } finally {
        if(engineCalls!=null){
            engineCalls.clear();
        }                       
    }   
}       

}

4

5 に答える 5

3

ただし、そこから読み取り、clear()メソッドですべてのエントリを削除する場合、マップの内容を読み取って削除している間は、他のスレッドがマップに書き込みを行わないようにする必要があります。

あなたが言おうとしているのは、マップを厳密にロックすることを本当に気にしていないということだと思います。代わりに、vender1Calls.values()とvendor1Calls.clear()の間のログエントリの損失だけを本当に気にしますよね?

その場合、私はあなたが置き換えることができると想像することができます

events.addAll(vendor1Calls.values());
vendor1Calls.clear();

saveEntriesでこれを使用して:

for (Iterator<Event> iter = vendor1Calls.values().iterator(); iter.hasNext(); ) {
    Event e = iter.next();
    events.add(e);
    iter.remove();
}

そうすれば、すでにイベントリストに追加したイベントのみを削除できます。saveEntries()の実行中に、vendor1Callsマップに書き込むことはできますが、イテレーターは追加された値をスキップします。

于 2012-08-24T20:04:09.420 に答える
3

外部同期がないと、CHMでこれを実現することはできません。返されるイテレータビューの一貫性は弱く、実際にマップを反復しているときにマップの内容が変更される可能性があります。

Collections.synchronizedMap探している機能を取得するには、を使用する必要があるようです。

私の主張をより明確にするために編集してください:

これを実現するには、synchronizedMap最初synchronizeにマップ上に配置する必要があります。次に、コンテンツを繰り返し処理するか、別のマップにコピーしてからクリアします。

Map map = Collections.synchronizedMap(new HashMap());

public void work(){
  Map local = new HashMap();
  synchronized(map){
     local.putAll(map);
     map.clear();
  }
  //do work on local instance 
}

インスタンスの代わりに、local私が述べたように、 iterate + remove@KevinJinの答えに似たものにすることができます。

于 2012-08-24T21:04:42.760 に答える
0

この例のアトミックバージョンは、このスレッドに示されています(ConcurrentMapの機能のみを使用)。

于 2012-08-25T12:35:47.007 に答える
0

私の最善の提案はReadWriteLockを使用することですが、ロックを使用したくないと具体的に述べているので(ところでConcurrentHashMap、おそらく内部で使用します)、次のことを試すことができます。

各マップにAtomicReferenceを使用し、それらのコンテンツをログに記録するときが来たら、getAndSetを使用して、古いマップを新しい空のマップに置き換えます。

これで、必要なだけ繰り返してクリアできる専用のマップができました。残念ながら、1つの小さな問題があります(ロックを使用すると削除されます)。それは、空のスレッドと交換するときに別のスレッドがマップに追加されている場合です。他のスレッドが実行を終了するのに十分な時間待機することを期待して、この時点で遅延を追加することもできます。ConcurrentHashMapおそらく、すべての人がそれを使い終えるまで待つために使用できるいくつかの内部機能があります。

于 2012-08-25T22:42:48.947 に答える
0

次のコードは、機能的なJavaプロジェクトの永続マップを使用しています。より多くのメモリを使用しますが、(AFAIK:)複数のスレッドで安全に使用できます。の唯一の変更可能な値であり、 compare-and-setで更新されます。マップとイベントは不変であるため、スレッドセーフです。また、マップをクリアする代わりに、マップへの参照を置き換えます。AtomicReference

import fj.F;
import fj.Ord;
import fj.data.TreeMap;

import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

public class Logging
{
    // Event is immutable
    private static class Event
    {
        // updates are done by creating new values
        Event update(String key)
        {
            return new Event();
        }
    }

    // Refactored code pertaining to one vendor into a separate class.
    private static class EngineLogger
    {
        public final String vendor;
        private final AtomicReference<TreeMap<String, Event>> mapRef =
                new AtomicReference<TreeMap<String, Event>>(TreeMap.<String, Event>empty(Ord.stringOrd));

        private EngineLogger(String vendor)
        {
            this.vendor = vendor;
        }

        public void log(String service, String method)
        {
            final String key = service + "." + method;
            boolean updated = false;
            while (! updated)
            {
                // get the current value of the map
                TreeMap<String, Event> currMap = mapRef.get();

                // create an updated value of the map, which is the current map plus info about the new key
                TreeMap<String, Event> newMap = currMap.update(key, new F<Event, Event>()
                {
                    @Override
                    public Event f(Event event)
                    {
                        // Modify the object fields of event, if the map contains the key
                        return event.update(key);
                    }
                    // create a new event if the map does not contain the key
                }, new Event());

                // compare-and-set the new value in .. repeat until this succeeds
                updated = mapRef.compareAndSet(currMap, newMap);
            }
        }

        public List<Event> reset()
        {
            /* replace the reference with a new map */
            TreeMap<String, Event> oldMap = mapRef.getAndSet(TreeMap.<String, Event>empty(Ord.stringOrd));

            /* use the old map to generate the list */
            return new ArrayList<Event>(oldMap.toMutableMap().values());
        }
    }

    private static Logging instance;
    private static long delay = 5 * 60 * 1000;
    private final Timer timer;

    private final EngineLogger vendor1 = new EngineLogger("vendor1");
    private final EngineLogger vendor2 = new EngineLogger("vendor2");

    private Logging()
    {
        timer = new Timer();
        timer.schedule(new TimerTask()
        {
            public void run()
            {
                try
                {
                    saveEntries();
                }
                catch (Throwable t)
                {
                    timer.cancel();
                    timer.purge();
                }
            }
        }, 0, delay);
    }

    public static synchronized Logging getInstance()
    {
        if (instance == null)
        {
            instance = new Logging();
        }
        return instance;
    }

    public void log(String engine, String service, String method)
    {
        if (vendor1.vendor.equals(engine))
        {
            vendor1.log(service, method);
        }
        else if (vendor2.vendor.equals(engine))
        {
            vendor2.log(service, method);
        }
    }

    private void saveEntries()
    {
        Map<String, List<Event>> engineCalls = new HashMap<String, List<Event>>();
        engineCalls.put(vendor1.vendor, vendor1.reset());
        engineCalls.put(vendor2.vendor, vendor2.reset());
        DBHandle.logCalls(engineCalls);
    }
}
于 2012-08-28T09:30:47.107 に答える