私は akka ストリームを使用してデータを処理しています。要素UUIDで構成される1つのソースがあります。
流れは次のとおりです。
- プロパティを含む完全な Element を返すサードパーティの HTTP サービスから Element を取得しています。
- 次に、その要素から必要なデータを取得し、アプリケーションが理解できるオブジェクトに変換します。
- 次に、そのオブジェクトのデータを DB に書き込みます。
- 最後に、ストリーム内のすべての要素のステータスで DB を更新します。
ここで、このフローに再試行メカニズムを追加して、フローのステージのいずれかが失敗した場合にステージを再試行する回数を 3 回に設定し、その後失敗した場合はストリームの唯一の失敗が発生するようにします。たとえば、HTTP 504 エラーのようなサードパーティ サービスに問題がある場合、ほとんどの場合、この要素を再試行すると成功します。akka でこれを達成する方法はありますか。
現在、失敗したすべての要素 ID を以下のように保存する 1 つのリストを維持しています。
コード :
List<UUID> failedId = new CopyOnWriteArrayList<>();
Source.from(elementIdToProcess).map(f -> {
failedId.add(f);
return f;
}).via(featureFlow()).via(filterNullFeaturesFlow())
.via(mapToFeatureFlow()).via(setFeaturesAdditionalInfo())
.runForeach(features -> {
features.parallelStream().forEach(feature -> {
try {
featureCommitter.save(feature);
failedId.remove(UUID.fromString(feature.getFeatureId()));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}, materializer);