1

infinispan キャッシュを使用してカウンター システムを実装しようとしています。複数のスレッドが同じキーにアクセスし、そのキーに対して値をインクリメントしようとしています。

カウンター操作は lock(key) で開始されるため、他のスレッドはそれに書き込むことができません。ロックを取得したスレッドは、get(key) を実行し、値をインクリメントして、同じキーに対して戻します。

ロックの解放を待っていた別のスレッドが get(key) を呼び出します。get(KEY) は、予想される更新されたカウンター値ではなく、以前の値を返します。

各スレッドのキャッシュに対して新しいトランザクションを開始しています。トランザクションは自動コミットされます。

自動コミットを使用して、プット後に明示的にコミットしようとしました。また、実行するたびにファイルの永続性とデータベースの永続性をクリアするようにしました。

これは私の infinispan キャッシュ構成です。

<distributed-cache name="COUNTER_CACHE">
<transaction transaction-manager-lookup="com.suntecgroup.tbms.container.services.cluster.ContainerCacheTxManagerLookup" mode="FULL_XA" locking="PESSIMISTIC"/>
<state-transfer timeout="360000" await-initial-transfer="true" enabled="true"/>
<locking isolation="READ_COMMITTED" acquire-timeout="60000" concurrency-level="100" striping="false"/>
<memory>
<off-heap size="1000" eviction="COUNT"/>
 <!--  Determines the max entries to hold in the cache  -->
</memory>
<persistence passivation="false">
<file-store path="./infinispan_persistance_tmp/" max-entries="1000"/>
<store class="com.suntecgroup.tbms.container.cache.store.ispn.InfinispanCounterCacheStore" fetch-state="false" preload="true" shared="false" purge="false" read-only="false" singleton="false"> </store>
</persistence>
</distributed-cache>

期待される結果は、別のスレッドが同じ値を取得しようとしたときに、スレッドによって行われた値の更新が反映されることです。

私の最善の推測は、スレッドごとに作成されるトランザクションで何かをしなければならないということです。

実際のコードを共有することはできませんが、次のコードはプロセスを概算しています。お役に立てれば

class CounterThread implements Runnable{

    public Integer getCounterVal(String key){
        int counter = cacheDelegate.get(key);
        if(counter==null){
                synchronized (Counter.class) {
                    if(counter = cacheDelegate.beginTransactionIfNoneAndGetObjectWithLock(key)){                
                    counter = 0;
                    cacheDelegate.putAndCommit(key,counter);
                    }
            }
        }   else
        counter = cacheDelegate.beginTransactionIfNoneAndGetObjectWithLock(key);
        return  counter;
    }

    public void putCounterVal(String key,int val){
        val++;
        cacheDelegate.putAndCommit(key,val);
    }

    public void run(){
        int i = 0;
        while(i<100){
            int counter = getCounterVal("KEY");
            putCounterVal("KEY",counter);
            i++;
        }
    }
}

// this method is used in beginTransactionIfNoneAndGetObjectWithLock to create a transaction
public void begin()
            throws ContainerPlatformServicesException {
        try {
            if (getCache("COUNTER_CACHE").getAdvancedCache().getTransactionManager()
                    .getTransaction() == null) {
                getCache("COUNTER_CACHE").getAdvancedCache().getTransactionManager()
                        .setTransactionTimeout(transactionTimeOut);
                getCache("COUNTER_CACHE").getAdvancedCache().getTransactionManager()
                        .begin();
            }
        }catch(TimeoutException e){
            logger.warn("TimeoutException exception caught. Required node is suspected. Thus waiting for suspect completion, to begin transaction.");
        } 
        catch (Exception e) {
            logger.error("Failed to begin transaction. Details:-", e);
        }
    }


// Only two threads are running.
4

0 に答える 0