こんにちは、私は Webflux を初めて使用します。リアクティブ マイクロサービスを構築するためのチュートリアルに従っています。私のプロジェクトでは、次の問題に直面しました。
製品サービスの crud API を作成したいのですが、以下は Create メソッドです。
@Override
public Product createProduct(Product product) {
Optional<ProductEntity> productEntity = Optional.ofNullable(repository.findByProductId(product.getProductId()).block());
productEntity.ifPresent((prod -> {
throw new InvalidInputException("Duplicate key, Product Id: " + product.getProductId());
}));
ProductEntity entity = mapper.apiToEntity(product);
Mono<Product> newProduct = repository.save(entity)
.log()
.map(mapper::entityToApi);
return newProduct.block();
}
問題は、郵便配達員からこのメソッドを呼び出すと、 「block()/blockFirst()/blockLast() がブロックされています。これは、スレッド reactor-http-nio-3 ではサポートされていません」というエラーが表示されることですが、StreamListener を使用するとこの呼び出しは正常に機能します。ストリーム リスナーは、rabbit-mq チャネルからイベントを取得します
ストリームリスナー
@EnableBinding(Sink.class)
public class MessageProcessor {
private final ProductService productService;
public MessageProcessor(ProductService productService) {
this.productService = productService;
}
@StreamListener(target = Sink.INPUT)
public void process(Event<Integer, Product> event) {
switch (event.getEventType()) {
case CREATE:
Product product = event.getData();
LOG.info("Create product with ID: {}", product.getProductId());
productService.createProduct(product);
break;
default:
String errorMessage = "Incorrect event type: " + event.getEventType() + ", expected a CREATE or DELETE event";
LOG.warn(errorMessage);
throw new EventProcessingException(errorMessage);
}
}
}
2 つの質問があります。
- これが単純なリクエストではなく StreamListener で機能するのはなぜですか?
- Mono のオブジェクトを返すための webflux の適切な方法はありますか、または常に Mono を返す必要がありますか?