6

Disruptorフレームワークをいじっていて、イベント ハンドラーが呼び出されていないことがわかりました。

ここに私のセットアップコードがあります:

private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService  EXECUTOR = Executors.newSingleThreadExecutor();

private void initializeDisruptor() {
    if (disruptor != null)
        return;

    disruptor = 
            new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
                    new SingleThreadedClaimStrategy(BUFFER_SIZE),
                    new SleepingWaitStrategy());
    disruptor.handleEventsWith(searchTermMatchingHandler)
        .then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);

    this.ringBuffer = disruptor.start();
}

その他、イベントを掲載しています。次の2つのアプローチをそれぞれ試しました。

イベント公開アプローチ A:

private void handleStatus(final Status status)
{

    long sequence = ringBuffer.next();
    TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
    event.setStatus(status);
    event.setSearchInstruments(searchInstruments);
    ringBuffer.publish(sequence);
}

このシナリオでは、最初のものEventHandlerが呼び出されますが、それ以上のものは呼び出されません。

イベント公開アプローチ B:

private void handleStatus(final Status status)
{
    disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {

        @Override
        public TwitterStatusReceivedEvent translateTo(
                TwitterStatusReceivedEvent event, long sequence) {
            event.setStatus(status);
            event.setSearchInstruments(searchInstruments);
            return event;
        }
    });
}

このシナリオでは、どのイベント ハンドラーもまったく呼び出されないことがわかりました。

私は何を間違っていますか?

アップデート

これが私の EventHandler 全体です。処理が完了したことをどのように通知すればよいですか?

public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {

    @Override
    public void onEvent(TwitterStatusReceivedEvent event, long sequence,
            boolean endOfBatch) throws Exception {
        String statusText = event.getStatus().getText();
        for (Instrument instrument : event.getSearchInstruments())
        {
            if (statusText.contains(instrument.getSearchTerm()))
            {
                event.setMatchedInstrument(instrument);
                break;
            }
        }
    }

}
4

3 に答える 3

7

各イベント ハンドラーは、ディスラプターをシャットダウンするまで終了しない独自のスレッドで実行する必要があります。単一のスレッド化されたエグゼキュータを使用しているため、たまたま実行された最初のイベント ハンドラのみが実行されます。(Disruptor クラスは各ハンドラーをハッシュマップに格納するため、どのハンドラーが最終的に実行されるかは異なります)

cachedThreadPool に切り替えると、すべてが実行を開始するはずです。Disruptor クラスがセットアップして管理する EventProcessor によってすべて処理されるため、シーケンス番号の管理を行う必要はありません。取得した各イベントを処理するだけで十分です。

于 2011-11-29T18:21:57.090 に答える