3

解決策の概要:

現在出回っているほとんどの RSocket の例では、SpringBoot 関連のチュートリアルであっても、サーバー側のアクセプターは単純に新しいオブジェクト (以下の new MqttMessageService() など) として構築されています。アクセプター クラスでサンプル コンテンツを生成する場合は問題ありませんが、アクセプターがコンテナー内の他の Bean に依存している場合、以下の依存性注入に関連する混乱が生じる可能性があります。

元の質問:

Rsocket の Java サーバー経由で Spring Data Reactive Mongodb リポジトリを使用してデータベース エントリをストリーミングしようとすると、NullPointerException が発生します。

問題は、デバッグ中にすべてのコンポーネントが個別に動作することです。同じ Mongodb リポジトリを介して要求されたデータを取得でき、Rsocket を使用して同じサーバーとクライアント間でランダムに生成されたデータをストリーミングすることもできます。

したがって、本当に基本的なものが欠けているか、Reactive Mongodb と Rsocket を一緒に使用すると問題が発生する可能性があります。

元のサーバー側の Rsocket 構成は次のとおりです。

@Configuration
public class RsocketConfig {

    @PostConstruct
    public void startServer() {
        RSocketFactory.receive()
                .acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
                .transport(TcpServerTransport.create(8802))
                .start()
                .block()
                .onClose()
    }
}

そして、これが適切なDIを備えたサーバー側のRsocket構成です。

@Configuration
public class RsocketConfig {

    @Autowired
    MqttMessageService messageService;

    @PostConstruct
    public void startServer() {
        RSocketFactory.receive()
                .acceptor((setup, sendingSocket) -> Mono.just(messageService))
                .transport(TcpServerTransport.create(8802))
                .start()
                .block()
                .onClose()
    }
}

これは、return service.findAll() で NullPointerException がスローされるサーバー側の AbstractRSocket 実装です。

@Service
public class MqttMessageService extends AbstractRSocket {



    @Autowired 
    private MqttMessageEntityService service;

    @Override
    public Flux<Payload> requestStream(Payload payload) {
        return service.findAll()
            .map(mqttMessageEntity -> DefaultPayload.create(mqttMessageEntity.toString()));

    }
}

リアクティブ リポジトリと関連サービスは次のとおりです。サービスは、サーバーの AbstractRSocket 実装に注入されると null を返しますが、他のクラスに注入されると正常に動作します。

@Service
public class MqttMessageEntityService {

    @Autowired
    private MqttMessageEntityRepository repository;

    public Flux<MqttMessageEntity> findAll() {
        return repository.findAll();
    }

}

public interface MqttMessageEntityRepository extends ReactiveMongoRepository<MqttMessageEntity, String> {

}

テストの内容と完全に連携するクライアント側のコードは次のとおりです。

@Configuration
public class RsocketConfig {

    @PostConstruct
    public void testRsocket() {

        RSocket rSocketClient = RSocketFactory
                .connect()
                .transport(TcpClientTransport.create(8802))
                .start()
                .block();

        rSocketClient
                .requestStream(DefaultPayload.create(""))
                .blockLast();
    }        
}

私はここで私の知識レベルを少し超えている可能性があり、このトピックに関するリソースは非常に限られているため、解決策へのヒントをいただければ幸いです:)

4

1 に答える 1