フラックスで応答する rsocket エンドポイントがあります。
@MessageMapping("responses")
Flux<?> deal(@Payload String message) {
return myService.generateResponses(message);
}
応答は、次のコードを使用して非同期的に生成された 3 種類のオブジェクトのいずれかになります (機能した場合)。
public Flux<?> generateResponses(String request) {
// Setup response sinks
final FluxProcessor publish = EmitterProcessor.create().serialize();
final FluxSink<Response1> sink1 = publish.sink();
final FluxSink<Response2> sink2 = publish.sink();
final FluxSink<Response3> sink3 = publish.sink();
// Get async responses: starts new thread to gather responses and update sinks
new MyResponses(request, sink1, sink2, sink3)
// Return the Flux
Flux<?> output = Flux
.from(publish
.log());
}
問題は、シンクにさまざまなオブジェクトを設定すると、最初のシンクだけが実際にサブスクライバーに公開されることです。
public class MyResponses extends CacheListenerAdapter {
private FluxSink<Response1> sink1;
private FluxSink<Response2> sink2;
private FluxSink<Response3> sink3;
// Constructor is omitted for brevity
@Override
public void afterCreate(EntryEvent event) {
if (event.getNewValue() instanceof Response1) {
Response1 r1 = (Response1)event.getNewValue();
sink1.next(r1);
}
if (event.getNewValue() instanceof Response2) {
Response2 r2 = (Response2)event.getNewValue();
sink2.next(r2);
}
if (event.getNewValue() instanceof Response3) {
Response3 r3 = (Response3)event.getNewValue();
sink3.next(r3);
}
}
}
タイプのシンクを作成すると<?>
、.next
エラーが発生します。
The method next(capture#2-of ?) in the type FluxSink<capture#2-of ?> is not applicable for the arguments (Response1)
この要件に対するより良いアプローチはありますか?