0

メッセージの内容に基づいて S3 からファイルをダウンロードする必要があります。つまり、ダウンロードするファイルは以前は不明であり、実行時に検索して見つける必要がありました。S3StreamingMessageSourceは、次の理由で適切ではないようです。

  1. メッセージを待つ必要がある場合は、ポーリングに依存しています。
  2. S3StreamingMessageSourceフローの途中で動的に作成する方法が見つかりません。gateway(IntegrationFlow)面白そうに見えますが、必要なのはgateway(Function<Message<?>, IntegrationFlow>)存在しない です。

別の候補はS3MessageHandler ですが、目的のファイルを見つけるために必要なファイルの一覧表示はサポートされていません。

AWS API を直接使用して独自のメッセージ ハンドラーを実装できますが、これは珍しい要件ではないように思われるため、何か不足しているのではないかと考えています。結局のところ、すべてのアプリがそこに座って新しいファイルを求めて S3 をポーリングし続けるわけではありません。

4

2 に答える 2

0

この質問に出くわした人にとって、これは私がやったことです。トリックは次のとおりです。

  1. 構築時ではなく、後でフィルターを設定します。addFiltersorgetFiltersメソッドがないため、フィルターは一度しか設定できず、後で追加できないことに注意してください。@artem-bilan、これは不便です。
  2. 手動で呼び出しS3StreamingMessageSource.receiveます。

    .handle(String.class, (fileName, h) -> {
    if (messageSource instanceof S3StreamingMessageSource) {
        S3StreamingMessageSource s3StreamingMessageSource = (S3StreamingMessageSource) messageSource;
    
        ChainFileListFilter<S3ObjectSummary> chainFileListFilter = new ChainFileListFilter<>();
        chainFileListFilter.addFilters(
                new S3SimplePatternFileListFilter("**/*/*.json.gz"),
                new S3PersistentAcceptOnceFileListFilter(metadataStore, ""),
                new S3FileListFilter(fileName)
        );
        s3StreamingMessageSource.setFilter(chainFileListFilter);
    
        return s3StreamingMessageSource.receive();
    }
    log.warn("Expected: {} but got: {}.",
            S3StreamingMessageSource.class.getName(), messageSource.getClass().getName());
    return messageSource.receive();
    }, spec -> spec
        .requiresReply(false) // in case all messages got filtered out
    )
    
于 2017-12-31T08:20:27.303 に答える