3

私は英語のネイティブ スピーカーではありませんが、質問をできるだけ明確に表現するように努めています。この問題に遭遇したため、2 日間混乱しましたが、まだ解決策が見つかりません。

Hadoop YARN の Spring Could Data Flow で実行されるストリームを構築しました。

ストリームは、HTTP ソース、プロセッサ、およびファイル シンクで構成されます。

1.Http ソース
HTTP ソース コンポーネントには、application.properties で定義された dest1 と dest2 という 2 つの異なる宛先にバインドする 2 つの出力チャネルがあります。

spring.cloud.stream.bindings.output.destination=dest1 spring.cloud.stream.bindings.output2.destination=dest2

以下は、参考用の HTTP ソースのコード スニペットです。

@Autowired
    private EssSource channels; //EssSource is the interface for multiple output channels

##output channel 1:
    @RequestMapping(path = "/file", method = POST, consumes = {"text/*", "application/json"})
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        logger.info("enter ... handleRequest1...");
        channels.output().send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }

##output channel 2:
    @RequestMapping(path = "/test", method = POST, consumes = {"text/*", "application/json"})
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest2(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        logger.info("enter ... handleRequest2...");
        channels.output2().send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }

2. プロセッサ
プロセッサには、2 つの複数入力チャネルと、異なる宛先にバインドする 2 つの出力チャネルがあります。宛先バインディングは、プロセッサ コンポーネント プロジェクトの application.properties で定義されます。

//input channel binding  
spring.cloud.stream.bindings.input.destination=dest1
spring.cloud.stream.bindings.input2.destination=dest2

//output channel binding  
spring.cloud.stream.bindings.output.destination=hdfsSink
spring.cloud.stream.bindings.output2.destination=fileSink

以下は、プロセッサのコード スニペットです。

@Transformer(inputChannel = EssProcessor.INPUT, outputChannel = EssProcessor.OUTPUT)
    public Object transform(Message<?> message) {
        logger.info("enter ...transform...");

        return "processed by transform1";;
    }


    @Transformer(inputChannel = EssProcessor.INPUT_2, outputChannel = EssProcessor.OUTPUT_2)
    public Object transform2(Message<?> message) {
        logger.info("enter ... transform2...");
        return "processed by transform2";
    }

3. ファイル シンク コンポーネント。

Spring の公式の fil sink コンポーネントを使用します。maven://org.springframework.cloud.stream.app:file-sink-kafka:1.0.0.BUILD-SNAPSHOT

そして、applicaiton.properties ファイルに宛先バインディングを追加するだけです。spring.cloud.stream.bindings.input.destination=fileSink

4.発見:

私が期待したデータフローは次のようになります。

Source.handleRequest() -->Processor.handleRequest()

Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();

"processed by transform2" という文字列のみがファイルに保存されます。

しかし、私のテストの後、データフローは実際には次のようになります。

Source.handleRequest() -->Processor.handleRequest() --> Sink.fileWritingMessageHandler();

Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();

"processed by transform1" と "processed by transform2" の両方の文字列がファイルに保存されます。

5.質問:

Processor.handleRequest() の出力チャネルの宛先は、fileSink ではなく hdfsSink にバインドされますが、データは引き続きファイル Sink に流れます。私はこれを理解できず、これは私が望むものではありません。Processor.handleRequest2() からのデータのみが、両方ではなくファイル シンクに流れます。私が正しく行わない場合、誰かがそれを行う方法と解決策を教えてもらえますか? 2日間私を混乱させました。

ご親切にありがとうございました。

アレックス

4

1 に答える 1

2

ストリーム定義は次のようなものですか (「-2」バージョンは複数のチャネルを持つものです) ?

http-source-2 | processor-2 | file-sink

Spring Cloud Data Flow は で定義された宛先をオーバーライドすることに注意してください。これは、プロセッサが に設定されているapplications.properties場合でも、実際には の入力と一致するためです。spring.cloud.stream.bindings.output.destinationhdfs-sinkfile-sink

ストリーム定義から宛先を構成する方法は、ここで説明されています (タップのコンテキストで): http://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#spring-cloud-dataflow -ストリーム-タップ-dsl

できることは、チャネル 1 と 2 の意味を単純に入れ替えることです。hdfs にはサイド チャネルを使用します。ただし、これは少し脆弱です - ストリームのinput/outputチャネルは自動的に構成され、他のチャネルは経由で構成されますapplication.properties- この場合、ストリーム定義または展開時にサイドチャネルの宛先を構成する方が良いかもしれません - httpを参照してください://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#_application_properties .

これらは、通常のコンポーネントを使用して、別々のエンドポイントをリッスンする 2 つのストリームである可能性があるように思えます-データが並んで流れると想定されている場合。

于 2016-09-06T19:19:34.070 に答える