0

こんにちは、私は Webflux を初めて使用します。リアクティブ マイクロサービスを構築するためのチュートリアルに従っています。私のプロジェクトでは、次の問題に直面しました。

製品サービスの crud API を作成したいのですが、以下は Create メソッドです。

@Override
public Product createProduct(Product product) {

    Optional<ProductEntity> productEntity = Optional.ofNullable(repository.findByProductId(product.getProductId()).block());
             productEntity.ifPresent((prod -> {
                  throw new InvalidInputException("Duplicate key, Product Id: " + product.getProductId());
              }));

    ProductEntity entity = mapper.apiToEntity(product);
    Mono<Product> newProduct = repository.save(entity)
        .log()
        .map(mapper::entityToApi);

    return newProduct.block();
}

問題は、郵便配達員からこのメソッドを呼び出すと、 「block()/blockFirst()/blockLast() がブロックされています。これは、スレッド reactor-http-nio-3 ではサポートされていません」というエラーが表示されることですが、StreamListener を使用するとこの呼び出しは正常に機能します。ストリーム リスナーは、rabbit-mq チャネルからイベントを取得します

ストリームリスナー

@EnableBinding(Sink.class)
public class MessageProcessor {

    private final ProductService productService;

    public MessageProcessor(ProductService productService) {
        this.productService = productService;
    }

    @StreamListener(target = Sink.INPUT)
    public void process(Event<Integer, Product> event) {

       
        switch (event.getEventType()) {

            case CREATE:
                Product product = event.getData();
                LOG.info("Create product with ID: {}", product.getProductId());
                productService.createProduct(product);
                break;

   
            default:
                String errorMessage = "Incorrect event type: " + event.getEventType() + ", expected a CREATE or DELETE event";
                LOG.warn(errorMessage);
                throw new EventProcessingException(errorMessage);
        }
    }
}

2 つの質問があります。

  1. これが単純なリクエストではなく StreamListener で機能するのはなぜですか?
  2. Mono のオブジェクトを返すための webflux の適切な方法はありますか、または常に Mono を返す必要がありますか?
4

1 に答える 1