1

「プロデューサーとコンシューマー」の問題があるとします。プロデューサーはメッセージをコンシューマーに送信し、コンシューマーは Scala を使用してメッセージを非同期に処理しFuturesますfuture { /* do the processing */ }

プロデューサーが 1 秒あたり 100 メッセージを生成するとします。ただし、コンシューマは 1 秒あたり 10 メッセージしか処理しません。何が起こるか ?メモリリークが発生すると思います。多くのFutureオブジェクトが存在し、スレッド プールの内部メッセージ キューも大きくなります。それは理にかなっていますか?

それを処理する最良の方法は何ですか?

4

3 に答える 3

2

akka では、実行コンテキストが使用されていますが、メールボックスがないようです。ソースを読む価値はありますが、実験によってあなたの質問に答えることができます。

Future には「メールボックス」がなく、Akka が内部で何を行っているか、または実行コンテキストに実際に何が含まれているかについては 100% 正確にはわかりませんが、future を直接使用すると akka がメモリ不足になることがわかります。

scala> import scala.concurrent.Future
import scala.concurrent.Future

scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global                  ^

scala> while(1==1) Future(Thread.sleep(100))
java.lang.OutOfMemoryError: Java heap space

メッセージについて話している場合、アクター メッセージ キューの動作を説明するメールボックスがあります (一度に 1 つのメッセージしか処理されないため、いっぱいになります)。これについては以下で説明します。

境界のあるメールボックス (サイズ制限のあるメールボックスなど) を想定すると、メッセージはどうなりますか。答えは、メールボックスによって異なります。まず、制限付きメールボックスには、サイズ制限などのいくつかの設定があります。

bounded-mailbox {
  mailbox-type = "akka.dispatch.BoundedMailbox"
  mailbox-capacity = 1000
  mailbox-push-timeout-time = 10s
}

その制限に達すると、akka は、メールボックスの構成方法に応じて、古いメッセージまたは新しいメッセージのいずれかを破棄します。たとえば、この設定で

# whether to drop older items (instead of newer) when the queue is full
discard-old-when-full = on

明らかに、メモリ不足などの他のリソースの問題がある場合、アプリケーションがクラッシュする可能性があります。つまり、メモリに保存されているメッセージが失われる可能性があります。制限のないメールボックスは、エラー状態が発生するまでメッセージを積み重ね続けます。

エラー状態でのメッセージの損失が望ましくない場合は、別のオプションがあります。ファイルなど、より永続的な場所にメッセージを保存する耐久性のあるメールボックスを使用できます。耐久性の高いメッセージ ストレージにファイルを使用するメールボックス構成の例を次に示します。

akka {
  actor {
    mailbox {
      file-based {
        # directory below which this queue resides
        directory-path = "./_mb"

        # attempting to add an item after the queue reaches this size (in items)
        # will fail.
        max-items = 2147483647

        # attempting to add an item after the queue reaches this size (in bytes)
        # will fail.
        max-size = 2147483647 bytes

        # attempting to add an item larger than this size (in bytes) will fail.
        max-item-size = 2147483647 bytes

        # maximum expiration time for this queue (seconds).
        max-age = 0s

        # maximum journal size before the journal should be rotated.
        max-journal-size = 16 MiB

        # maximum size of a queue before it drops into read-behind mode.
        max-memory-size = 128 MiB

        # maximum overflow (multiplier) of a journal file before we re-create it.
        max-journal-overflow = 10

        # absolute maximum size of a journal file until we rebuild it,
        # no matter what.
        max-journal-size-absolute = 9223372036854775807 bytes

        # whether to drop older items (instead of newer) when the queue is full
        discard-old-when-full = on

        # whether to keep a journal file at all
        keep-journal = on

        # whether to sync the journal after each transaction
        sync-journal = off

        # circuit breaker configuration
        circuit-breaker {
          # maximum number of failures before opening breaker
          max-failures = 3

          # duration of time beyond which a call is assumed to be timed out and
          # considered a failure
          call-timeout = 3 seconds

          # duration of time to wait until attempting to reset the breaker during
          # which all calls fail-fast
          reset-timeout = 30 seconds
        }
      }
    }
  }
}
于 2013-08-06T13:41:43.347 に答える
0

複数のコンシューマーを持つ - アクター プールを使用します。プールの負荷に応じて、サイズを動的に変更できます。http://doc.akka.io/docs/akka/snapshot/scala/routing.htmlを参照してください。

于 2013-08-06T12:54:21.993 に答える