5

私は akka ストリームを使用してデータを処理しています。要素UUIDで構成される1つのソースがあります。

流れは次のとおりです。

  1. プロパティを含む完全な Element を返すサードパーティの HTTP サービスから Element を取得しています。
  2. 次に、その要素から必要なデータを取得し、アプリケーションが理解できるオブジェクトに変換します。
  3. 次に、そのオブジェクトのデータを DB に書き込みます。
  4. 最後に、ストリーム内のすべての要素のステータスで DB を更新します。

ここで、このフローに再試行メカニズムを追加して、フローのステージのいずれかが失敗した場合にステージを再試行する回数を 3 回に設定し、その後失敗した場合はストリームの唯一の失敗が発生するようにします。たとえば、HTTP 504 エラーのようなサードパーティ サービスに問題がある場合、ほとんどの場合、この要素を再試行すると成功します。akka でこれを達成する方法はありますか。

現在、失敗したすべての要素 ID を以下のように保存する 1 つのリストを維持しています。

コード :

List<UUID> failedId = new CopyOnWriteArrayList<>();
Source.from(elementIdToProcess).map(f -> {
            failedId.add(f);
            return f;
        }).via(featureFlow()).via(filterNullFeaturesFlow())
            .via(mapToFeatureFlow()).via(setFeaturesAdditionalInfo())
            .runForeach(features -> {
                features.parallelStream().forEach(feature -> {
                    try {
                        featureCommitter.save(feature);
                        failedId.remove(UUID.fromString(feature.getFeatureId()));
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
            }, materializer);
4

2 に答える 2