実際、やりたいことを実行できるかどうかさえわかりませんEvent.class
。Spring Integration を使用して Broker (MQTT) からイベント ストリーム ( ) を消費していて、そのストリームを別のマイクロサービスに転送したいと考えています。両方のサービスは、水平方向にスケーラブルであると想定されています。したがって、他のサービスには as body を除く POST エンドポイントがありますFlux<Event>
(Spring Cloud Functions: を使用spring-cloud-starter-function-webflux
)。転送サービスでリボンと を使いたいWebClient
。
私がこれまでに持っているもの(簡略化):
@Configuration
public class EventFlowConfiguration{
@Autowired
private Webclient webClient;
@Bean
public MessageProducerSupport inboundAdapter() {
final MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(MqttAsyncClient.generateClientId(), this.mqttClientFactory, "/topic");
adapter.setConverter(new DefaultPahoMessageConverter());
return adapter;
}
@Bean
Publisher<Message<Event>> eventFlow() {
return IntegrationFlows
.from(inboundAdapter())
.handle((payload, header) -> this.eventHandler((String) payload))
.toReactivePublisher();
}
private Event eventHandler(String payload) {
// parsing, store in database ...
return event;
}
@PostConstruct
public void setTrigger() {
buildTrigger(webClient, "/receiveEventStream", eventFlow(), Event.class);
}
private <T, E extends Event> void buildTrigger(WebClient webClient, String uri, Publisher<Message<T>> publisher, Class<E> eventClass) {
webClient
.post()
.uri(uri)
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body(
Flux.from(publisher)
.retry()
.map(Message::getPayload)
,
eventClass
)
.retrieve()
.bodyToMono(Void.class)
.retry()
.subscribe();
}
}
私の質問:
(1)これを一般的にテストする機会はありますか?どうすればいいですか?テストの内部スプリング ブート アプリケーションである OkHttp、WireMock については、/eventReceiverEndpoint
アサートするだけです。しかし、実際にはどこにも行きませんでした。実際にatmでもエラーは出ません。起動時にメモリーサーバーが存在しないため、4xxを期待していました。
(2) 2 つのretry()
呼び出しは、両側で接続の問題を本当に再試行しますか? 切断された場合、メッセージは失われますか? どうすればそれをテストできますか?
(3) 接続が確立された後 (最終的な消費者の負荷が高くなった場合など)、リボンはこの長寿命の Flux と組み合わせて負荷分散を行うことができますか? Flux を別のインスタンスに自動的に再接続する必要があると思いますか?