14

私の Hazelcast ベースのプログラムは、サブミッターとワーカーの 2 つのモードで動作します。

サブミッターは、いくつかの POJO をいくつかのキーによって分散マップに配置します。たとえば、次のようになります。hazelcastInstance.getMap(MAP_NAME).put(key, value);

ワーカーには、マップからエンティティを処理する必要がある無限ループ (Thread.sleep(1000L);内部でタイムアウト) があります。今のところ、このループでマップ サイズを出力しているだけです。

ここで問題です。ワーカーアプリを起動します。次に、4 つのサブミッターを同時に開始します (それぞれがマップにエントリを追加し、その作業を終了します)。しかし、すべてのサブミッター アプリが完了すると、ワーカー アプリは任意のサイズを出力します。追加されたエントリが 1 つだけ検出される場合もあれば、2 つ、場合によっては 3 つ検出されることもあります (実際には、4 つのエントリすべてが表示されることはありません)。

この単純な流れの問題点は何ですか? メソッドが同期であることをHazelcastのドキュメントで読んだput()ので、戻った後、エントリが分散マップに配置され、複製されることが保証されます。しかし、私の実験ではそうではないようです。

UPD (コード)

送信者:

public void submit(String key) {
    Object mySerializableObject = ...
    IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
    map.putIfAbsent(key, mySerializableObject, TASK_TTL_IN_HOURS, TimeUnit.HOURS);
}

ワーカー:

public void process() {
    while (true) {
        IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
        System.out.println(map.size());

        // Optional<Map.Entry<String, Object>> objectToProcess = getObjectToProcess();
        // objectToProcess.ifPresent(objectToProcess-> processObject(id, objectToProcess));
        try {
            Thread.sleep(PAUSE);
        } catch (InterruptedException e) {
            LOGGER.error(e.getMessage(), e);
        }
    }
}

「処理」部分自体をコメントアウトしました。これは、マップの一貫した状態を取得しようとしているだけだからです。上記のコードは、毎回異なる結果を出力します。たとえば、「4, 3, 1, 1, 1, 1, 1...」 (したがって、4 つの送信されたタスクを一瞬見ることさえできますが、その後、それらは... 消えます)。 .

UPD (ログ)

ワーカー:

...
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 1
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
...

提出者 1:

Before: tasksMap.size() = 0
After: tasksMap.size() = 1

提出者 2:

Before: tasksMap.size() = 1
After: tasksMap.size() = 4

提出者 3:

Before: tasksMap.size() = 1
After: tasksMap.size() = 2

提出者 4:

Before: tasksMap.size() = 3
After: tasksMap.size() = 4
4

1 に答える 1

7

さて、私は問題を理解したと思います。私が理解している限り、IMapによって返された分散は、データhazelcastInstance.getMapがクラスター内の既存のすべてのノードに複製されることを保証しません。そのため、私の例では、送信されたタスクの一部が (永続的に動作する) ワーカー ノードではなく、送信後に実行を終了する他のサブミッターにレプリケートされました。そのため、そのようなエントリはサブミッターの終了時に失われました。

に置き換えるhazelcastInstance.getMapことでこの問題を解決しましたhazelcastInstance.getReplicatedMap。このメソッドは を返しますReplicatedMap。これは、AFAIK に配置されたエントリがクラスターのすべてのノードに複製されることを保証します。これで、私のシステムではすべてが正常に機能します。

于 2016-04-30T13:17:46.750 に答える