Reactor を使用してパブリッシャー/サブスクライバーのシナリオを適切に実装する方法がわかりません。私は実用的な解決策を持っていますが、実装は私には正しくないようです:
私の問題は、サブスクライバーを登録してイベントを渡すためにパブリッシャーを手動で実装する必要があることです。
public void publishQuotes(String ticker) throws InterruptedException {
// [..] Here I generate some "lines" to be publisher
for (Subscriber<? super String> subscriber : subscribers) {
lineList.forEach(line -> subscriber.onNext(line));
}
}
@Override
public void subscribe(Subscriber<? super String> subscriber) {
subscribers.add(subscriber);
}
次に、WorkQueue Processor (コンシューマーである必要があります) があります。
WorkQueueProcessor<String> sink = WorkQueueProcessor.create();
// Here I subscribe to my publiser
publisher.subscribe(sink);
// Creates a Reactive Stream from the processor (having converted the lines to Quotations)
Flux<StockQuotation> mappedRS = sink.map(quotationConverter::convertHistoricalCSVToStockQuotation);
// Here I perform a number of stream transformations
// Each call to consume will be executed in a separated Thread
filteredRS.consume(i -> System.out.println(Thread.currentThread() + " data=" + i));
それは正常に動作しますが、地獄のように醜いです。Spring ガイドから抜粋したこの例では、EventBus を使用してパブリッシャーからコンシューマーにイベントをルーティングしますが、それを自分のプロセッサにリンクしようとすると、次のコンパイラ エラーが発生します。
eventBus.on($("quotes"),sink);
The method on(Selector, Consumer<T>) in the type EventBus is not applicable for the arguments (Selector<String>, WorkQueueProcessor<String>)
パブリッシャーとプロセッサーをリンクさせる最善の方法は何ですか? EventBus の使用をお勧めしますか? もしそうなら、適切な呼び出しは何ですか?
ありがとう!