4

私は Akka 2.2.4 を使用するシステムを持っています。これは、多数のローカル アクターを作成し、それらをブロードキャスト ルーターのルートとして設定します。各ワーカーは、渡されたハッシュ範囲に従って、全作業の一部のセグメントを処理します。それはうまくいきます。

ここで、フェイルオーバーのためにこのアプリケーションをクラスター化する必要があります。ハッシュ範囲ごとに1つのワーカーのみが存在する/クラスターでトリガーされるという要件に基づいて、それぞれをClusterSingletonManagerとして設定することは理にかなっているように思えます..しかし、私はそれを機能させるのに問題があります. アクター システムが起動し、ClusterSingletonManager を作成し、以下に引用されているコードのパスをブロードキャスト ルーターに追加しますが、何らかの理由で実際のワーカー アクターをインスタンス化してメッセージを処理することはありません。私が得るのはログメッセージだけです:「未処理のイベント ${my message} in state Start」。私は何を間違っていますか?この単一インスタンス クラスタを起動するために他に何かする必要がありますか? 間違ったアクターにメッセージを送信していませんか?

これが私のakka構成です(デフォルトの構成をフォールバックとして使用しています):

akka{
    cluster{
        roles=["workerSystem"]
        min-nr-of-members = 1
        role {
        workerSystem.min-nr-of-members = 1
}
    }
    daemonic = true
    remote {
        enabled-transports = ["akka.remote.netty.tcp"]
        netty.tcp {
          hostname = "127.0.0.1"
          port = ${akkaPort}
        }
    }
    actor{
        provider = akka.cluster.ClusterActorRefProvider
        single-message-bound-mailbox {
              # FQCN of the MailboxType. The Class of the FQCN must have a public
              # constructor with
              # (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
              mailbox-type = "akka.dispatch.BoundedMailbox"

              # If the mailbox is bounded then it uses this setting to determine its
              # capacity. The provided value must be positive.
              # NOTICE:
              # Up to version 2.1 the mailbox type was determined based on this setting;
              # this is no longer the case, the type must explicitly be a bounded mailbox.
              mailbox-capacity = 1

              # If the mailbox is bounded then this is the timeout for enqueueing
              # in case the mailbox is full. Negative values signify infinite
              # timeout, which should be avoided as it bears the risk of dead-lock.
              mailbox-push-timeout-time = 1

        }
        worker-dispatcher{
         type = PinnedDispatcher
         executor = "thread-pool-executor"
          # Throughput defines the number of messages that are processed in a batch
          # before the thread is returned to the pool. Set to 1 for as fair as possible.
         throughput = 500
         thread-pool-executor {
            # Keep alive time for threads
            keep-alive-time = 60s

            # Min number of threads to cap factor-based core number to
            core-pool-size-min = ${workerCount}

            # The core pool size factor is used to determine thread pool core size
            # using the following formula: ceil(available processors * factor).
            # Resulting size is then bounded by the core-pool-size-min and
            # core-pool-size-max values.
            core-pool-size-factor = 3.0

            # Max number of threads to cap factor-based number to
            core-pool-size-max = 64

            # Minimum number of threads to cap factor-based max number to
            # (if using a bounded task queue)
            max-pool-size-min = ${workerCount}

            # Max no of threads (if using a bounded task queue) is determined by
            # calculating: ceil(available processors * factor)
            max-pool-size-factor  = 3.0

            # Max number of threads to cap factor-based max number to
            # (if using a  bounded task queue)
            max-pool-size-max = 64

            # Specifies the bounded capacity of the task queue (< 1 == unbounded)
            task-queue-size = -1

            # Specifies which type of task queue will be used, can be "array" or
            # "linked" (default)
            task-queue-type = "linked"

            # Allow core threads to time out
            allow-core-timeout = on
          }
         fork-join-executor {
            # Min number of threads to cap factor-based parallelism number to
            parallelism-min = 1

            # The parallelism factor is used to determine thread pool size using the
            # following formula: ceil(available processors * factor). Resulting size
            # is then bounded by the parallelism-min and parallelism-max values.
            parallelism-factor = 3.0

            # Max number of threads to cap factor-based parallelism number to
            parallelism-max = 1
          }
        }
    }
}

アクターを作成する場所は次のとおりです(Groovyで記述されています):

            Props clusteredProps = ClusterSingletonManager.defaultProps("worker".toString(), PoisonPill.getInstance(), "workerSystem",
                    new ClusterSingletonPropsFactory(){

                        @Override
                        Props create(Object handOverData) {
                            log.info("called in ClusterSingetonManager")
                            Props.create(WorkerActorCreator.create(applicationContext, it.start, it.end)).withDispatcher("akka.actor.worker-dispatcher").withMailbox("akka.actor.single-message-bound-mailbox")
                        }
                    } )
            ActorRef manager = system.actorOf(clusteredProps, "worker-${it.start}-${it.end}".toString())
            String path = manager.path().child("worker").toString()
            path

実際のワーカー アクターにメッセージを送信しようとすると、上記のパスは解決されますか? 現在はありません。私は何を間違っていますか?また、これらのアクターは Spring アプリケーション内に存在し、ワーカー アクターはいくつかの @Autowired 依存関係でセットアップされます。この Spring 統合はクラスター化されていない環境でうまく機能しましたが、クラスター化された環境で気をつけなければならない問題はありますか?

ありがとうございました

参考までに:これは akka-user google グループにも投稿しました。ここにリンクがあります。

4

1 に答える 1

3

コード内のパスは、「workerSystem」ロールを持つ各ノードで開始する ClusterSingletonManager アクターへのパスです。クラスター内の最も古いノード、つまりクラスター内のシングルトンに「worker-${it.start}-${it.end}」という名前の子アクター (WorkerActor) を作成します。

ClusterSingletonManagerの名前も定義する必要がありますsystem.actorOf(clusteredProps, "workerSingletonManager")

にメッセージを送信することはできませんClusterSingletonManager。それらをアクティブなワーカーのパスに送信する必要があります。つまり、最も古いノードのアドレスを含めます。ConsumerProxyこれは、ドキュメントの で示されています。

これにシングルトンを使用する必要があるかどうかはわかりません。すべてのワーカーは、最も古い同じノードで実行されます。akka-user google グループで、問題の代替ソリューションについて話し合いたいと思います。

于 2014-03-21T08:33:52.600 に答える