私の Hazelcast ベースのプログラムは、サブミッターとワーカーの 2 つのモードで動作します。
サブミッターは、いくつかの POJO をいくつかのキーによって分散マップに配置します。たとえば、次のようになります。hazelcastInstance.getMap(MAP_NAME).put(key, value);
ワーカーには、マップからエンティティを処理する必要がある無限ループ (Thread.sleep(1000L);
内部でタイムアウト) があります。今のところ、このループでマップ サイズを出力しているだけです。
ここで問題です。ワーカーアプリを起動します。次に、4 つのサブミッターを同時に開始します (それぞれがマップにエントリを追加し、その作業を終了します)。しかし、すべてのサブミッター アプリが完了すると、ワーカー アプリは任意のサイズを出力します。追加されたエントリが 1 つだけ検出される場合もあれば、2 つ、場合によっては 3 つ検出されることもあります (実際には、4 つのエントリすべてが表示されることはありません)。
この単純な流れの問題点は何ですか? メソッドが同期であることをHazelcastのドキュメントで読んだput()
ので、戻った後、エントリが分散マップに配置され、複製されることが保証されます。しかし、私の実験ではそうではないようです。
UPD (コード)
送信者:
public void submit(String key) {
Object mySerializableObject = ...
IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
map.putIfAbsent(key, mySerializableObject, TASK_TTL_IN_HOURS, TimeUnit.HOURS);
}
ワーカー:
public void process() {
while (true) {
IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME);
System.out.println(map.size());
// Optional<Map.Entry<String, Object>> objectToProcess = getObjectToProcess();
// objectToProcess.ifPresent(objectToProcess-> processObject(id, objectToProcess));
try {
Thread.sleep(PAUSE);
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
}
}
}
「処理」部分自体をコメントアウトしました。これは、マップの一貫した状態を取得しようとしているだけだからです。上記のコードは、毎回異なる結果を出力します。たとえば、「4, 3, 1, 1, 1, 1, 1...」 (したがって、4 つの送信されたタスクを一瞬見ることさえできますが、その後、それらは... 消えます)。 .
UPD (ログ)
ワーカー:
...
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 0
tasksMap.size() = 1
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
tasksMap.size() = 2
...
提出者 1:
Before: tasksMap.size() = 0
After: tasksMap.size() = 1
提出者 2:
Before: tasksMap.size() = 1
After: tasksMap.size() = 4
提出者 3:
Before: tasksMap.size() = 1
After: tasksMap.size() = 2
提出者 4:
Before: tasksMap.size() = 3
After: tasksMap.size() = 4