3

イベントを処理する並行コードを書きたいと思っています。この処理には時間がかかる場合があります。

そのイベントが処理されている間、受信イベントを記録し、再び自由に実行できるようになったときに最後の受信イベントを処理する必要があります。(他のイベントは破棄できます)。これは FILO キューに少し似ていますが、キューに 1 つの要素を格納するだけで済みます。

理想的には、新しい Executor を以下に示すイベント処理アーキテクチャにプラグインしたいと考えています。

public class AsyncNode<I, O> extends AbstractNode<I, O>  {
    private static final Logger log = LoggerFactory.getLogger(AsyncNode.class);
    private Executor executor;

    public AsyncNode(EventHandler<I, O> handler, Executor executor) {
        super(handler);
        this.executor = executor;
    }

    @Override
    public void emit(O output) {
        if (output != null) {
            for (EventListener<O> node : children) {
                node.handle(output);
            }
        }
    }

    @Override
    public void handle(final I input) {

        executor.execute(new Runnable() {

            @Override
            public void run() {
                try{
                emit(handler.process(input));
                }catch (Exception e){
                    log.error("Exception occured whilst processing input." ,e);
                    throw e;
                }

            }
        });

    }

}

4

5 に答える 5

3

私もしません。処理したいイベントへのAtomicReferenceを用意し、破壊的な方法で処理するタスクを追加します。

final AtomicReference<Event> eventRef =

public void processEvent(Event event) {
   eventRef.set(event);
   executor.submit(new Runnable() {
       public vodi run() {
           Event e = eventRef.getAndSet(null);
           if (e == null) return;
           // process event
       }
   }
}

これは、エグゼキュータまたはキュー(他の目的に使用できる)をカスタマイズせずに、エグゼキュータが空いているときにのみ次のイベントを処理します。

これは、キー付きイベントの発生にも対応します。つまり、キーの最後のイベントを処理する必要があります。

于 2012-07-03T07:40:43.443 に答える
0
public class LatestTaskExecutor implements Executor {
    private final AtomicReference<Runnable> lastTask =new AtomicReference<>();
    private final Executor executor;

    public LatestTaskExecutor(Executor executor) {
        super();
        this.executor = executor;
    }

    @Override
    public void execute(Runnable command) {
        lastTask.set(command);
        executor.execute(new Runnable() {
            @Override
            public void run() {
                Runnable task=lastTask.getAndSet(null);
                if(task!=null){
                    task.run();
                }
            }
        });

    }
}

@RunWith( MockitoJUnitRunner.class )
public class LatestTaskExecutorTest {

    @Mock private Executor executor;
    private LatestTaskExecutor latestExecutor;
    @Before
    public void setup(){
        latestExecutor=new LatestTaskExecutor(executor);
    }
    @Test
    public void testRunSingleTask() {
        Runnable run=mock(Runnable.class);
        latestExecutor.execute(run);
        ArgumentCaptor<Runnable> captor=ArgumentCaptor.forClass(Runnable.class);
        verify(executor).execute(captor.capture());
        captor.getValue().run();
        verify(run).run();
    }

    @Test
    public void discardsIntermediateUpdates(){
        Runnable run=mock(Runnable.class);
        Runnable run2=mock(Runnable.class);
        latestExecutor.execute(run);
        latestExecutor.execute(run2);
        ArgumentCaptor<Runnable> captor=ArgumentCaptor.forClass(Runnable.class);
        verify(executor,times(2)).execute(captor.capture());
        for (Runnable runnable:captor.getAllValues()){
            runnable.run();
        }
        verify(run2).run();
        verifyNoMoreInteractions(run);
    }
}
于 2012-07-06T18:54:18.790 に答える
0

この回答は、余分なタスクの提出を最小限に抑える DD の回答の修正版です。

アトミック参照は、最新のイベントを追跡するために使用されます。カスタム タスクは、潜在的にイベントを処理するためにキューに送信されます。最新のイベントを読み取ることができるタスクのみが実際に先に進み、null へのアトミック参照をクリアする前に有用な作業を行います。他のタスクが実行される機会があり、処理できるイベントがないことがわかった場合、それらは何もせずに静かに終了します。キュー内の使用可能なタスクの数を追跡することで、余分なタスクの送信を回避します。キューに保留中のタスクが少なくとも 1 つある場合は、既にキューに入れられたタスクがデキューされるときにイベントが処理されるため、タスクの送信を回避できます。

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public class EventExecutorService implements Executor {

    private final Executor executor;
    // the field which keeps track of the latest available event to process
    private final AtomicReference<Runnable> latestEventReference = new AtomicReference<>();
    private final AtomicInteger activeTaskCount = new AtomicInteger(0);

    public EventExecutorService(final Executor executor) {
        this.executor = executor;
    }

    @Override
    public void execute(final Runnable eventTask) {
        // update the latest event
        latestEventReference.set(eventTask);
        // read count _after_ updating event
        final int activeTasks = activeTaskCount.get();

        if (activeTasks == 0) {
            // there is definitely no other task to process this event, create a new task
            final Runnable customTask = new Runnable() {
                @Override
                public void run() {
                    // decrement the count for available tasks _before_ reading event
                    activeTaskCount.decrementAndGet();
                    // find the latest available event to process
                    final Runnable currentTask = latestEventReference.getAndSet(null);
                    if (currentTask != null) {
                        // if such an event exists, process it
                        currentTask.run();
                    } else {
                        // somebody stole away the latest event. Do nothing.
                    }
                }
            };
            // increment tasks count _before_ submitting task
            activeTaskCount.incrementAndGet();
            // submit the new task to the queue for processing
            executor.execute(customTask);
        }
    }
}
于 2012-07-07T03:58:04.963 に答える