2

私はHazelcast 2.0.1を使用してデータを頻繁に更新しています(約2分)。これには、最初にDBからデータを削除してからロードすることが含まれます。ただし、どこかで、スレッドの 1 つがキーのロックを保持しているため、削除操作が妨げられ、例外がスローされます ( java.util.ConcurrentModificationException: Another thread holds a lock for the key: abc@gmail.com)。hazelcast のマップが更新されるように助けてください。

以下にコードを示します

DeltaParallelizer

def customerDetails = dataOperations.getDistributedStore(DataStructures.customer_project.name()).keySet()
ExecutorService service = Hazelcast.getExecutorService()

def result
try{
    customerDetails?.each{customerEmail->
        log.info String.format('Creating delta task for customer:%s',customerEmail)
        def dTask = new DistributedTask(new EagerDeltaTask(customerEmail))
        service.submit(dTask);
    }
    customerDetails?.each {customerEmail ->
        log.info String.format('Creating task customer aggregation for %s',customerEmail)
        def task = new DistributedTask(new EagerCustomerAggregationTask(customerEmail))
        service.submit(task)
    }
}
catch(Exception e){
    e.printStackTrace()
}

EagerDeltaTask

class EagerDeltaTask implements Callable,Serializable {
    private final def emailId
    EagerDeltaTask(email){
        emailId = email
    }
    @Override
    public Object call() throws Exception {
        log.info(String.format("Eagerly computing delta for %s",emailId))       
        def dataOperations = new DataOperator()
        def tx = Hazelcast.getTransaction()
        tx.begin()
        try{
            deleteAll(dataOperations)
            loadAll(dataOperations)
            tx.commit()
        }
        catch(Exception e){
            tx.rollback()
            log.error(String.format('Delta computation is screwed while loading data for the project:%s',emailId),e)
        }       
    }

    private void deleteAll(dataOperations){
        log.info String.format('Deleting entries for customer %s',emailId)      
        def projects = dataOperations.getDistributedStore(DataStructures.customer_project.name()).get(emailId)
        projects?.each{project->
            log.info String.format('Deleting entries for project %s',project[DataConstants.PROJECT_NUM.name()])
            def srs = dataOperations.srs(project[DataConstants.PROJECT_NUM.name()])?.collect{it[DataConstants.SR_NUM.name()]}
            def activitiesStore = dataOperations.getDistributedStore(DataStructures.sr_activities.name())
            srs?.each{sr ->
                activitiesStore.remove(sr)
            }
            dataOperations.getDistributedStore(DataStructures.project_sr_aggregation.name()).remove(project[DataConstants.PROJECT_NUM.name()])
        }       
        dataOperations.getDistributedStore(DataStructures.customer_project.name()).remove(emailId)
    }

    private void loadAll(dataOperations){
        log.info(String.format('Loading entries for customer %s',emailId))
        def projects = dataOperations.projects(emailId)
        projects?.each{project->
            log.info String.format('Loading entries for project %s',project[DataConstants.PROJECT_NUM.name()])
            def srs = dataOperations.srs(project[DataConstants.PROJECT_NUM.name()])
            srs?.each{sr->
                dataOperations.activities(sr[DataConstants.SR_NUM.name()])
            }
        }       
    }   
}

データ演算子

class DataOperator {
def getDistributedStore(String name){
    Hazelcast.getMap(name)
}
}

deleteAll srs で例外が発生したため、一部のマップ コンテンツが削除され、コンテンツが削除されたマップに対してのみ新しいデータが読み込まれ、残りのマップには古いデータが含まれています。そのため、Hazelcast マップで更新されたデータを取得できません。Hazelcast マップに更新されたデータを取得する方法について、意見をお聞かせください。

また、このHazelcast.getTransactionクライアントはこの目的のために機能しますか?

注: 顧客は複数の project_num を持つことができます。1 つの project_num は複数の顧客で共有することもできます。1 つの project_num は複数の SR_NUM を持つことができます。

4

1 に答える 1

3

I used Hazelcast eviction policy which solved my problem. I used a <time-to-live-seconds>300</time-to-live-seconds> which clears map content every 5 minute and when any request comes from UI for any map it reloads that map content from a loader.

Below is one of the Hazelcast map config

...
<map name="customer_project" >
    <map-store enabled="true">
        <class-name>com.abc.arena.datagrid.loader.CustomerProjectData</class-name>
    </map-store>
    <time-to-live-seconds>300</time-to-live-seconds>
</map>
...

CustomerProjectData loader class simply loads data into map from DB. So now I no longer need DeltaParallelizer or EagerDeltaTask class

Different approaches are also welcome :)

于 2013-09-04T06:21:55.637 に答える