3

リクエストを受け取ってそれに返信する akka アクター (ワーカー) があります。リクエストの処理には 3 ~ 60 分かかる場合があります。発信者(俳優でもある)は現在使用中です!!! ただし、Caller アクターのデザインは必要に応じて変更できます。また、現在 EventDriven ディスパッチャーを使用しています。

ワーカー アクターが解放され、新しいリクエストを受信できる状態に戻るように、リクエスト処理をキャンセル (ユーザーが開始) するにはどうすればよいですか? java.util.concurrent.Future の cancel メソッドに似たメソッドを期待していましたが、Akka 1.1.3 では見つかりませんでした。

編集:

探している動作を取得しようとしましたcompleteWithException:

object Cancel {
  def main(args: Array[String]) {
    val actor = Actor.actorOf[CancelActor].start
    EventHandler.info(this, "Getting future")
    val future = (actor ? "request").onComplete(x => EventHandler.info(this, "Completed!! " + x.get))
    Thread.sleep(500L)
    EventHandler.info(this, "Cancelling")
    future.completeWithException(new Exception("cancel"))
    EventHandler.info(this, "Future is " + future.get)
  }
}

class CancelActor extends Actor {
  def receive = {
    case "request" =>
      EventHandler.info(this, "start")
      (1 to 5).foreach(x => {
        EventHandler.info(this, "I am a long running process")
        Thread.sleep(200L)
      })
      self reply "response"
      EventHandler.info(this, "stop")
  }
}

しかし、それは長期にわたるプロセスを停止しませんでした.

    [INFO]    [9/16/11 1:46 PM] [main] [Cancel$] Getting future
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] start
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [main] [Cancel$] Cancelling
    [ERROR]   [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-7] [ActorCompletableFuture] 
    java.lang.Exception: cancel
        at kozo.experimental.Cancel$.main(Cancel.scala:15)
...

    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] I am a long running process
    [INFO]    [9/16/11 1:46 PM] [akka:event-driven:dispatcher:global-2] [CancelActor] stop

対照的に、java.util.concurrent.Future の動作を考えてみましょう。

object Cancel2 {
  def main(args: Array[String]) {
    val executor: ExecutorService = Executors.newSingleThreadExecutor()
    EventHandler.info(this, "Getting future")
    val future = executor.submit(new Runnable {
      def run() {
        EventHandler.info(this, "start")
        (1 to 5).foreach(x => {
          EventHandler.info(this, "I am a long running process")
          Thread.sleep(200L)
        })
      }
    })
    Thread.sleep(500L)
    EventHandler.info(this, "Cancelling")
    future.cancel(true)
    EventHandler.info(this, "Future is " + future.get)
  }
}

長時間実行されているプロセスを停止するもの

    [INFO]    [9/16/11 1:48 PM] [main] [Cancel2$] Getting future
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] start
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
    [INFO]    [9/16/11 1:48 PM] [pool-1-thread-1] [anon$1] I am a long running process
    Exception in thread "main" java.util.concurrent.CancellationException
...
    [INFO]    [9/16/11 1:48 PM] [main] [Cancel2$] Cancelling
4

2 に答える 2

2

アクターでフューチャーのステータスを確認することもできます。

class MyActor extends Actor {
  def receive = {
    case msg =>
      while(!self.senderFuture.get.isCompleted) {
        performWork(msg)
      }
      self reply result
  }
  ...
}

これには、メッセージを「?」で送信する必要があります。または「尋ねる」。それが役に立てば幸い。

于 2011-09-17T12:55:49.510 に答える
1

VM 内にいるだけの場合は、AtomicBoolean を Job メッセージと共に渡し、アクターで断続的にチェックして、中止する必要があるかどうかを確認できます。

actor ! Job(..., someAtomicBoolean)

class MyActor extends Actor {
  def receive = {
    case Job(..., cancelPlease) =>
      while(cancelPlease.get == false) {
        performWork
      }
      self reply result
  }
}
于 2011-09-15T09:53:26.810 に答える