4

適度な数の長期実行アクターがあり、完了した最初のアクターを返す同期関数を書きたいと考えています。先物のスピン待機でそれを行うことができます(例:

while (! fs.exists(f => f.isSet) ) {
  Thread.sleep(100)
}
val completeds = fs.filter(f => f.isSet)
completeds.head()

)、しかしそれは非常に「俳優らしくない」ようです

このscala.actors.Futuresクラスには 2 つのメソッドが awaitAll()ありawaitEither() 、非常に近いように見えます。もしあれば、awaitAny()私はそれに飛びつきます。これを行う簡単な方法がありませんか、それとも適用可能な一般的なパターンはありますか?

4

2 に答える 2

2

完了を待つより「アクター的な」方法は、完了した結果の処理を担当するアクターを作成することです (それを と呼びましょうResultHandler)

ワーカーは、応答する代わりにResultHandler、ファイア アンド フォーゲット方式で応答を送信します。後者は、他のワーカーがジョブを完了している間、結果の処理を続けます。

于 2011-11-03T08:23:14.610 に答える
0

Actor.react{ }私にとっての鍵は、すべての (?) Scala オブジェクトが暗黙的に Actor であるため、ブロックに使用できるという発見でした。ここに私のソースコードがあります:

    import scala.actors._
    import scala.actors.Actor._

    //Top-level class that wants to return the first-completed result from some long-running actors
    class ConcurrentQuerier() {
        //Synchronous function; perhaps fulfilling some legacy interface
        def synchronousQuery : String = {
            //Instantiate and start the monitoring Actor
            val progressReporter = new ProgressReporter(self) //All (?) objects are Actors 
            progressReporter.start()

            //Instantiate the long-running Actors, giving each a handle to the monitor
            val lrfs = List ( 
                new LongRunningFunction(0, 2000, progressReporter), new LongRunningFunction(1, 2500, progressReporter), new LongRunningFunction(3, 1500, progressReporter), 
                new LongRunningFunction(4, 1495, progressReporter), new LongRunningFunction(5, 1500, progressReporter), new LongRunningFunction(6, 5000, progressReporter) ) 

            //Start 'em
            lrfs.map{ lrf => 
                lrf.start()
            }
            println("All actors started...")

            val start = System.currentTimeMillis()
            /* 
            This blocks until it receives a String in the Inbox.
            Who sends the string? A: the progressReporter, which is monitoring the LongRunningFunctions
            */ 
            val s = receive {
                  case s:String => s
            }
            println("Received " + s + " after " + (System.currentTimeMillis() - start) + " ms")
            s
        }
    }

    /* 
    An Actor that reacts to a message that is a tuple ("COMPLETED", someResult) and sends the
    result to this Actor's owner. Not strictly necessary (the LongRunningFunctions could post
    directly to the owner's mailbox), but I like the idea that monitoring is important enough
    to deserve its own object
*/
    class ProgressReporter(val owner : Actor) extends Actor {
        def act() = {
            println("progressReporter awaiting news...")
            react {
                case ("COMPLETED", s) => 
                    println("progressReporter received a completed signal " + s);
                    owner ! s
                case s => 
                    println("Unexpected message: " + s ); act()
            }
        }
    }

/*
    Some long running function
*/

    class LongRunningFunction(val id : Int, val timeout : Int, val supervisor : Actor) extends Actor {
        def act() = {
            //Do the long-running query
            val s = longRunningQuery()
            println(id.toString + " finished, sending results")
            //Send the results back to the monitoring Actor (the progressReporter)
            supervisor ! ("COMPLETED", s)
        }

        def longRunningQuery() : String = { 
            println("Starting Agent " + id + " with timeout " + timeout)
            Thread.sleep(timeout)
            "Query result from agent " + id
        }
    }


    val cq = new ConcurrentQuerier()
    //I don't think the Actor semantics guarantee that the result is absolutely, positively the first to have posted the "COMPLETED" message
    println("Among the first to finish was : " + cq.synchronousQuery)

典型的な結果は次のようになります。

scala ActorsNoSpin.scala 
progressReporter awaiting news...
All actors started...
Starting Agent 1 with timeout 2500
Starting Agent 5 with timeout 1500
Starting Agent 3 with timeout 1500
Starting Agent 4 with timeout 1495
Starting Agent 6 with timeout 5000
Starting Agent 0 with timeout 2000
4 finished, sending results
progressReporter received a completed signal Query result from agent 4
Received Query result from agent 4 after 1499 ms
Among the first to finish was : Query result from agent 4
5 finished, sending results
3 finished, sending results 
0 finished, sending results
1 finished, sending results
6 finished, sending results
于 2011-11-03T19:11:12.280 に答える