解決策の概要:
現在出回っているほとんどの RSocket の例では、SpringBoot 関連のチュートリアルであっても、サーバー側のアクセプターは単純に新しいオブジェクト (以下の new MqttMessageService() など) として構築されています。アクセプター クラスでサンプル コンテンツを生成する場合は問題ありませんが、アクセプターがコンテナー内の他の Bean に依存している場合、以下の依存性注入に関連する混乱が生じる可能性があります。
元の質問:
Rsocket の Java サーバー経由で Spring Data Reactive Mongodb リポジトリを使用してデータベース エントリをストリーミングしようとすると、NullPointerException が発生します。
問題は、デバッグ中にすべてのコンポーネントが個別に動作することです。同じ Mongodb リポジトリを介して要求されたデータを取得でき、Rsocket を使用して同じサーバーとクライアント間でランダムに生成されたデータをストリーミングすることもできます。
したがって、本当に基本的なものが欠けているか、Reactive Mongodb と Rsocket を一緒に使用すると問題が発生する可能性があります。
元のサーバー側の Rsocket 構成は次のとおりです。
@Configuration
public class RsocketConfig {
@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose()
}
}
そして、これが適切なDIを備えたサーバー側のRsocket構成です。
@Configuration
public class RsocketConfig {
@Autowired
MqttMessageService messageService;
@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(messageService))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose()
}
}
これは、return service.findAll() で NullPointerException がスローされるサーバー側の AbstractRSocket 実装です。
@Service
public class MqttMessageService extends AbstractRSocket {
@Autowired
private MqttMessageEntityService service;
@Override
public Flux<Payload> requestStream(Payload payload) {
return service.findAll()
.map(mqttMessageEntity -> DefaultPayload.create(mqttMessageEntity.toString()));
}
}
リアクティブ リポジトリと関連サービスは次のとおりです。サービスは、サーバーの AbstractRSocket 実装に注入されると null を返しますが、他のクラスに注入されると正常に動作します。
@Service
public class MqttMessageEntityService {
@Autowired
private MqttMessageEntityRepository repository;
public Flux<MqttMessageEntity> findAll() {
return repository.findAll();
}
}
public interface MqttMessageEntityRepository extends ReactiveMongoRepository<MqttMessageEntity, String> {
}
テストの内容と完全に連携するクライアント側のコードは次のとおりです。
@Configuration
public class RsocketConfig {
@PostConstruct
public void testRsocket() {
RSocket rSocketClient = RSocketFactory
.connect()
.transport(TcpClientTransport.create(8802))
.start()
.block();
rSocketClient
.requestStream(DefaultPayload.create(""))
.blockLast();
}
}
私はここで私の知識レベルを少し超えている可能性があり、このトピックに関するリソースは非常に限られているため、解決策へのヒントをいただければ幸いです:)