リクエストを受け取ってそれに返信する akka アクター (ワーカー) があります。リクエストの処理には 3 ~ 60 分かかる場合があります。発信者(俳優でもある)は現在使用中です!!! ただし、Caller アクターのデザインは必要に応じて変更できます。また、現在 EventDriven ディスパッチャーを使用しています。
ワーカー アクターが解放され、新しいリクエストを受信できる状態に戻るように、リクエスト処理をキャンセル (ユーザーが開始) するにはどうすればよいですか? java.util.concurrent.Future の cancel メソッドに似たメソッドを期待していましたが、Akka 1.1.3 では見つかりませんでした。
編集:
探している動作を取得しようとしましたcompleteWithException
:
object Cancel {
def main(args: Array[String]) {
val actor = Actor.actorOf[CancelActor].start
EventHandler.info(this, "Getting future")
val future = (actor ? "request").onComplete(x => EventHandler.info(this, "Completed!! " + x.get))
Thread.sleep(500L)
EventHandler.info(this, "Cancelling")
future.completeWithException(new Exception("cancel"))
EventHandler.info(this, "Future is " + future.get)
}
}
class CancelActor extends Actor {
def receive = {
case "request" =>
EventHandler.info(this, "start")
(1 to 5).foreach(x => {
EventHandler.info(this, "I am a long running process")
Thread.sleep(200L)
})
self reply "response"
EventHandler.info(this, "stop")
}
}
しかし、それは長期にわたるプロセスを停止しませんでした.
[INFO] [9/16/11 1:46 PM] [main] [Cancel$] Getting future
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] start
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
[INFO] [9/16/11 1:46 PM] [main] [Cancel$] Cancelling
[ERROR] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-7] [ActorCompletableFuture]
java.lang.Exception: cancel
at kozo.experimental.Cancel$.main(Cancel.scala:15)
...
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
[INFO] [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] stop
対照的に、java.util.concurrent.Future の動作を考えてみましょう。
object Cancel2 {
def main(args: Array[String]) {
val executor: ExecutorService = Executors.newSingleThreadExecutor()
EventHandler.info(this, "Getting future")
val future = executor.submit(new Runnable {
def run() {
EventHandler.info(this, "start")
(1 to 5).foreach(x => {
EventHandler.info(this, "I am a long running process")
Thread.sleep(200L)
})
}
})
Thread.sleep(500L)
EventHandler.info(this, "Cancelling")
future.cancel(true)
EventHandler.info(this, "Future is " + future.get)
}
}
長時間実行されているプロセスを停止するもの
[INFO] [9/16/11 1:48 PM] [main] [Cancel2$] Getting future
[INFO] [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] start
[INFO] [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
[INFO] [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
[INFO] [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
Exception in thread "main" java.util.concurrent.CancellationException
...
[INFO] [9/16/11 1:48 PM] [main] [Cancel2$] Cancelling