私は英語のネイティブ スピーカーではありませんが、質問をできるだけ明確に表現するように努めています。この問題に遭遇したため、2 日間混乱しましたが、まだ解決策が見つかりません。
Hadoop YARN の Spring Could Data Flow で実行されるストリームを構築しました。
ストリームは、HTTP ソース、プロセッサ、およびファイル シンクで構成されます。
1.Http ソース
HTTP ソース コンポーネントには、application.properties で定義された dest1 と dest2 という 2 つの異なる宛先にバインドする 2 つの出力チャネルがあります。
spring.cloud.stream.bindings.output.destination=dest1 spring.cloud.stream.bindings.output2.destination=dest2
以下は、参考用の HTTP ソースのコード スニペットです。
@Autowired
private EssSource channels; //EssSource is the interface for multiple output channels
##output channel 1:
@RequestMapping(path = "/file", method = POST, consumes = {"text/*", "application/json"})
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
logger.info("enter ... handleRequest1...");
channels.output().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
##output channel 2:
@RequestMapping(path = "/test", method = POST, consumes = {"text/*", "application/json"})
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest2(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
logger.info("enter ... handleRequest2...");
channels.output2().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
2. プロセッサ
プロセッサには、2 つの複数入力チャネルと、異なる宛先にバインドする 2 つの出力チャネルがあります。宛先バインディングは、プロセッサ コンポーネント プロジェクトの application.properties で定義されます。
//input channel binding
spring.cloud.stream.bindings.input.destination=dest1
spring.cloud.stream.bindings.input2.destination=dest2
//output channel binding
spring.cloud.stream.bindings.output.destination=hdfsSink
spring.cloud.stream.bindings.output2.destination=fileSink
以下は、プロセッサのコード スニペットです。
@Transformer(inputChannel = EssProcessor.INPUT, outputChannel = EssProcessor.OUTPUT)
public Object transform(Message<?> message) {
logger.info("enter ...transform...");
return "processed by transform1";;
}
@Transformer(inputChannel = EssProcessor.INPUT_2, outputChannel = EssProcessor.OUTPUT_2)
public Object transform2(Message<?> message) {
logger.info("enter ... transform2...");
return "processed by transform2";
}
3. ファイル シンク コンポーネント。
Spring の公式の fil sink コンポーネントを使用します。maven://org.springframework.cloud.stream.app:file-sink-kafka:1.0.0.BUILD-SNAPSHOT
そして、applicaiton.properties ファイルに宛先バインディングを追加するだけです。spring.cloud.stream.bindings.input.destination=fileSink
4.発見:
私が期待したデータフローは次のようになります。
Source.handleRequest() -->Processor.handleRequest()
Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();
"processed by transform2" という文字列のみがファイルに保存されます。
しかし、私のテストの後、データフローは実際には次のようになります。
Source.handleRequest() -->Processor.handleRequest() --> Sink.fileWritingMessageHandler();
Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();
"processed by transform1" と "processed by transform2" の両方の文字列がファイルに保存されます。
5.質問:
Processor.handleRequest() の出力チャネルの宛先は、fileSink ではなく hdfsSink にバインドされますが、データは引き続きファイル Sink に流れます。私はこれを理解できず、これは私が望むものではありません。Processor.handleRequest2() からのデータのみが、両方ではなくファイル シンクに流れます。私が正しく行わない場合、誰かがそれを行う方法と解決策を教えてもらえますか? 2日間私を混乱させました。
ご親切にありがとうございました。
アレックス