1

Reactor を使用してパブリッシャー/サブスクライバーのシナリオを適切に実装する方法がわかりません。私は実用的な解決策を持っていますが、実装は私には正しくないようです:

私の問題は、サブスクライバーを登録してイベントを渡すためにパブリッシャーを手動で実装する必要があることです。

public void publishQuotes(String ticker) throws InterruptedException {

// [..] Here I generate some "lines" to be publisher

for (Subscriber<? super String> subscriber : subscribers) {
    lineList.forEach(line -> subscriber.onNext(line));
}

}

@Override
public void subscribe(Subscriber<? super String> subscriber) {
    subscribers.add(subscriber);

}

次に、WorkQueue Processor (コンシューマーである必要があります) があります。

WorkQueueProcessor<String> sink = WorkQueueProcessor.create();

// Here I subscribe to my publiser
publisher.subscribe(sink);

// Creates a Reactive Stream from the processor (having converted the lines to Quotations)
Flux<StockQuotation> mappedRS = sink.map(quotationConverter::convertHistoricalCSVToStockQuotation);

// Here I perform a number of stream transformations 

// Each call to consume will be executed in a separated Thread
filteredRS.consume(i -> System.out.println(Thread.currentThread() + " data=" + i));

それは正常に動作しますが、地獄のように醜いです。Spring ガイドから抜粋したこの例では、EventBus を使用してパブリッシャーからコンシューマーにイベントをルーティングしますが、それを自分のプロセッサにリンクしようとすると、次のコンパイラ エラーが発生します。

eventBus.on($("quotes"),sink);

The method on(Selector, Consumer<T>) in the type EventBus is not applicable for the arguments (Selector<String>, WorkQueueProcessor<String>)

パブリッシャーとプロセッサーをリンクさせる最善の方法は何ですか? EventBus の使用をお勧めしますか? もしそうなら、適切な呼び出しは何ですか?

ありがとう!

4

1 に答える 1

1

EventBus を使用する場合は、次の方法で行を公開します。

 eventBus.notify("quotes", Event.wrap(line);

そして、経由で購読する

eventBus.on($("quotes"), e -> System.out.println(Thread.currentThread() + " data=" + e);

ここで、「e」は Event<String> 型です。

于 2016-03-14T13:38:22.337 に答える