0

このコードを実行すると

    public class Test {
      public static void main(String[] args) {
        Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(new EventFactoryImpl<MyEvent>(),
    Executors.newFixedThreadPool(2), new MultiThreadedClaimStrategy(32), new                    BusySpinWaitStrategy());

        MyEventHandler myEventHandler1 = new MyEventHandler("1");
        MyEventHandler myEventHandler2 = new MyEventHandler("2");

        disruptor.handleEventsWith(myEventHandler1, myEventHandler2);
        RingBuffer<MyEvent> ringBuffer = disruptor.start();

        ByteBuffer bb = ByteBuffer.allocate(8);

        for (long l = 0; l < 2; l++) {
          bb.putLong(0, l);
          long sequence = ringBuffer.next();

          try {
             MyEvent event = ringBuffer.get(sequence);
             event.set(bb.getLong(0));
          }
          finally {
            ringBuffer.publish(sequence);
          }
        }
      }
    }
    public class MyEvent {
      private long value;

      public void set(long value) {
        this.value = value;
      }

      public long get() {
        return value;
      }
    }

    public class MyEventHandler implements EventHandler<MyEvent> {
       private String id;

       public MyEventHandler(String id) {
          this.id = id;
       }

       public void onEvent(MyEvent event, long sequence, boolean endOfBatch) {
           System.out.println("id: " + id + ", event: " + event.get() + ", sequence: " + sequence +                  "," + Thread.currentThread().getName());
       }
    }

    public class EventFactoryImpl<T> implements EventFactory<T> {
       @SuppressWarnings("unchecked")
       public T newInstance() {
         return (T) new MyEvent();
       }
    }

私はこの出力を得ています

ID: 1、イベント: 0、シーケンス: 0、プール 1 スレッド 1
ID: 1、イベント: 1、シーケンス: 1、プール-1-スレッド-1
ID: 2、イベント: 0、シーケンス: 0、プール-1-スレッド-2
ID: 2、イベント: 1、シーケンス: 1、プール-1-スレッド-2

しかし、私は各イベントが別々のスレッドによって一度だけ処理されることを期待していました。どうすれば実現できますか?

4

2 に答える 2

1

With the Disruptor, each EventHandler subscribed to the ring buffer will read each message once.

If you want to have multiple threads processing messages as they come out of the ring buffer, there are a couple of options. The first and best option is to set up a separate Disruptor for each reader thread, and have the writer alternate between buffers in a round robin fashion. If you must use a single ring buffer (perhaps to sequence the events), then you could set the thread ID which should process each event onto the events themselves (again in an alternating fashion), and have the threads which do not match that ID discard the event.

于 2014-03-26T22:58:41.450 に答える