8

私自身の質問に対する私自身の答えのように、私はキューに到着する多数のイベントを処理している状況にあります。各イベントはまったく同じ方法で処理され、他のすべてのイベントとは独立して処理することもできます。

私のプログラムはScala並行性フレームワークを利用しており、関連するプロセスの多くはActorsとしてモデル化されています。はメッセージを順番に処理するため、このActor特定の問題にはあまり適していません(他のアクター順番にアクションを実行している場合でも)。Scalaにすべてのスレッド作成を「制御」させたいので(そもそも並行性システムを持っているという点だと思います)、2つの選択肢があるようです。

  1. 私が管理しているイベントプロセッサのプールにイベントを送信します
  2. Actor他のメカニズムでそれらを同時に処理するように指示します

#1は、アクターサブシステムを使用するポイントを否定すると思います。プロセッサーアクターをいくつ作成する必要がありますか?1つの明白な質問です。これらのことはおそらく私から隠されており、サブシステムによって解決されています。

私の答えは次のことをすることでした:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //I want to be able to handle multiple events at the same time
        //create a new actor to handle it
        actor {
          //processing code here
          process(x)
        }
    }
  }
}

より良いアプローチはありますか?これは間違っていますか?

編集:おそらくより良いアプローチは次のとおりです:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //Pass processing to the underlying ForkJoin framework
        Scheduler.execute(process(e))
    }
  }
}
4

5 に答える 5

8

これは別の質問の複製のようです。だから私は私の答えを複製します

アクターは一度に 1 つのメッセージを処理します。複数のメッセージを処理するための古典的なパターンは、コンシューマー アクターのプールに対して 1 つのコーディネーター アクター フロントを配置することです。反応を使用する場合、コンシューマー プールは大きくなる可能性がありますが、それでも少数の JVM スレッドしか使用しません。以下は、10 人のコンシューマーと 1 人のコーディネーターのプールを作成して、その前に置く例です。

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

このコードは、どのコンシューマが利用可能かをテストし、そのコンシューマにリクエストを送信します。代わりに、消費者にランダムに割り当てるか、ラウンド ロビン スケジューラを使用することもできます。

何をしているかによっては、Scala の Futures を使用したほうがよい場合があります。たとえば、アクターが本当に必要ない場合、上記の機構はすべて次のように記述できます。

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))
于 2009-06-18T00:56:23.467 に答える
3

すべてのイベントを個別に処理できる場合、なぜそれらはキューに入れられるのでしょうか? デザインについて他に何も知らないので、これは不必要なステップのように思えます。これらのイベントを発生させているもので関数を構成できればprocess、潜在的にキューを不要にすることができます。

アクターは基本的に、キューを備えた並行効果です。複数のメッセージを同時に処理したい場合、実際にはアクターは必要ありません。関数 (Any => ()) を都合の良い時間に実行するようにスケジュールするだけです。

そうは言っても、アクター ライブラリ内にとどまりたい場合や、イベント キューが制御できない場合は、このアプローチは合理的です。

Scalazは、Actor と並行効果を区別します。Actor非常に軽量ですが、scalaz.concurrent.Effectさらに軽量です。Scalaz ライブラリに大まかに翻訳したコードは次のとおりです。

val eventProcessor = effect (x => process x)

未発売の最新トランクヘッド付きです。

于 2009-06-17T14:56:37.240 に答える
1

アクター (そのうちの 1 つ) の目的は、一度に 1 つのスレッドだけがアクター内の状態にアクセスできるようにすることです。メッセージの処理がアクター内の変更可能な状態に依存しない場合は、タスクをスケジューラまたはスレッド プールに送信して処理する方がおそらく適切です。アクターが提供する追加の抽象化は、実際には邪魔になっています。

これには scala.actors.Scheduler に便利なメソッドがあります。または、java.util.concurrent の Executor を使用することもできます。

于 2009-06-17T17:22:53.430 に答える
1

アクターはスレッドよりもはるかに軽量であるため、別のオプションの 1 つは、スレッド プールへの送信に慣れている Runnable オブジェクトなどのアクター オブジェクトを使用することです。主な違いは、ThreadPool について心配する必要がないことです。スレッド プールはアクター フレームワークによって管理され、主に構成の問題です。

def submit(e: MyEvent) = actor {
  // no loop - the actor exits immediately after processing the first message
  react {
    case MyEvent(x) =>
      process(x)
  }
} ! e // immediately send the new actor a message

メッセージを送信するには、次のように言います。

submit(new MyEvent(x))

に対応します。

eventProcessor ! new MyEvent(x)

あなたの質問から。

このパターンは、クアッドコア i7 ラップトップで約 10 秒で送受信された 100 万件のメッセージで正常にテストされました。

お役に立てれば。

于 2011-05-20T14:56:47.703 に答える