共有 DelayQueue を使用して、非同期タスクを実行する専用のスレッドのプールがあります。
基本的に、すべてがうまく機能しますが、1 つのことを除いて、既にスケジュールされているタスクの実行を延期できるようにしたいと考えています。たとえば、時刻 t=0 に 30 秒で実行するタスクを送信したとします。10 秒後 (t=10)、そのタスクは t=30 では実行されず、t=50 で実行されると判断しました。したがって、20秒後に延期しました。
その目的のために、タスクに設定された時間を変更して getDelay の戻り値を変更する、延期メソッドがあります。コードはこの記事の最後にあります。
残念ながら、それは機能しません。実際には、システムを壊して期限切れの要素を通常よりもずっと長くキューに残すことは非常に簡単です。具体的には、次の望ましくない動作を観察しました。
- 時間 t=0 で、t=30 で実行する最初のタスクを送信します
- t=10 で、t=20 で実行する 2 番目のタスクを送信します
- t=15 で、2 番目のタスクを t=20 から t=100 に延期します。
- t=30 到着しますが、最初のタスクは実行されず、キューに残ります。その getDelay メソッドが負の値を返し始めるようになりました。
- t=40: 最初のタスクはすでに 10 秒遅れていますが、まだ何も起こりません。最初のタスクの getDelay は、時間の経過とともにますます小さな値を返します。DelayQueue は間違いなく混同されているようです。
- t=90: q.poll 呼び出しで設定された最大ポーリング時間が 30 秒であるため、おそらく希望です。実際、いいえ、null を取得し、次のタスクを待ちます。私の最初のタスクはまだ負の遅延でキューに残っています。
- t=100: ワーラ!両方のタスクが次々と実行されます... 2 番目のタスクは時間どおりですが、最初のタスクは最終的に 70 秒遅れて到着しました。これは受け入れがたい !
また、タスクがキューに入っている間に先頭にならなかった場合、他のタスクを邪魔することなく、安全に延期できることにも気付きました。
だから、私の質問は次のとおりです。
- なぜそうなのか?コードで何か間違ったことをしていますか?
- 延期をシミュレートするには、必ずタスクを削除してから再度送信する必要がありますか? または、安全に行う別の方法はありますか? オブジェクトを削除してから、まったく同じオブジェクトを再度追加することは本当に許可されていますか?それとも、考えられるすべての混乱を避けるために、別のオブジェクトを送信することをお勧めしますか?
- おまけの質問: remove 操作の複雑さはどのくらいですか? キューが一種の優先ヒープとして実装されていると仮定すると、おそらく O(n) です (優先ヒープでバイナリ検索を行うことは不可能です)。
回答ありがとうございます。
これがコードです。関係のない部分は可能な限り削除しました。特にすべての例外処理を削除しました。
public abstract class AbstractTaskExecutor<R extends Runnable> implements Runnable {
private final BlockingQueue<R> q;
...
public boolean submit (R dr) { return q.add(dr); }
public void run () {
while (!Thread.currentThread().isInterrupted()) {
Runnable r = q.poll(30, TimeUnit.SECONDS);
if (r!=null) r.run();
}}
}
public abstract class DelayedRunnable implements Runnable, Delayed {
private long time;
public DelayedRunnable (long l) {
time = System.currentTimeMillis() +l;
}
public final int compareTo (Delayed d) {
return (int)( Math.min(Math.max(Integer.MIN_VALUE, time - ((DelayedRunnable)d).time), Integer.MAX_VALUE) );
}
public final long getDelay (TimeUnit t) {
return t.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
public final long getTime () { return time; }
public void postpone (long l) { time+=l; }
}
public class DelayedTaskExecutor extends AbstractTaskExecutor<DelayedRunnable> {
...
}