1

クライアントサーバーアプリケーションがあり、rxjava を使用してクライアントからのサーバーリクエストを実行しています。クライアントは一度に 1 つの要求のみを実行する必要があるため、トランポリン スケジューラに似たスレッド キュー スケジューラを使用するつもりです。

ここで、サーバー上の変更を監視するメカニズムを実装しようとしています。そのため、サーバーに何らかの変更が加えられて結果が返されるまでブロックする長期要求を送信します (ロング プル)。

この長いプル リクエストは、ジョブ キューがアイドル状態のときにのみ実行する必要があります。通常のリクエストがスケジュールされているときに監視リクエストを自動的に停止し、キューが空になったときに再び開始する方法を探しています。この動作を得るためにトランポリン スケジューラを変更することを考えましたが、これは一般的な問題であり、より簡単な解決策があるのではないかと感じています。

4

1 に答える 1

1

ロング ポーリング タスクをスケジュールすることによって返されたサブスクリプションを保持し、キューが空でなくなった場合はサブスクリプションを解除し、キューが空になった場合は再スケジュールすることができます。

編集:これは基本的なExecutorSchedulerの例です:

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;


public class IdleScheduling {

    static final class TaskQueue {
        final ExecutorService executor;
        final AtomicReference<Future<?>> idleFuture;
        final Runnable idleRunnable;
        final AtomicInteger wip;
        public TaskQueue(Runnable idleRunnable) {
            this.executor = Executors.newFixedThreadPool(1);
            this.idleRunnable = idleRunnable;
            this.idleFuture = new AtomicReference<>();
            this.wip = new AtomicInteger();
            this.idleFuture.set(executor.submit(idleRunnable));
        }
        public void shutdownNow() {
            executor.shutdownNow();
        }
        public Future<?> enqueue(Runnable task) {
            if (wip.getAndIncrement() == 0) {
                idleFuture.get().cancel(true);
            }
            return executor.submit(() -> {
                task.run();
                if (wip.decrementAndGet() == 0) {
                    startIdle();
                }
            });
        }
        void startIdle() {
            idleFuture.set(executor.submit(idleRunnable));
        }
    }

    public static void main(String[] args) throws Exception {
        TaskQueue tq = new TaskQueue(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ex) {
                    System.out.println("Idle interrupted...");
                    return;
                }
                System.out.println("Idle...");
            }
        });
        try {
            Thread.sleep(1500);
            tq.enqueue(() -> System.out.println("Work 1"));
            Thread.sleep(500);
            tq.enqueue(() -> {
                System.out.println("Work 2");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException ex) {

                }
            });
            tq.enqueue(() -> System.out.println("Work 3"));
            Thread.sleep(1500);
        } finally {
            tq.shutdownNow();
        }
    }
}
于 2015-03-10T21:55:32.680 に答える