5

http 呼び出しの処理を担当する Akka アクターがいます。scala ディスパッチを使用して、API 経由で複数の HTTP リクエストを送信します。

urls.foreach { u
  val service = url(u)
  val promise = Http(service OK as.String).either
  for(p <- promise)
  {
     p match
     {
       case Left(error) =>
         faultHandler(error)
       case Right(result) =>
         resultHandler(result)
     }
  }

このresultHandler関数では、インスタンス変数をインクリメントし、nbOfResults行った呼び出しの数と比較します。

def resultHandler(result:String)
{
  this.nbOfResults++
  ...
  if(nbOfResults == nbOfCalls)
    // Do something
}

安全ですか?2 つの呼び出しが同時に結果を返す場合、nbOfResults変数に同時にアクセスできますか?

現時点では、アクターは多かれ少なかれスレッドと同等であると考えているため、コールバック関数は同時に実行されません。それが正しいか ?

4

4 に答える 4

3

これは、dispatch のみを使用した Alexey Romanov 応答の変形です。

//Promises will be of type Array[Promise[Either[Throwable, String]]]
val promises = urls.map { u =>
    val service = url(u)

    Http(service OK as.String).either
}

//Http.promise.all transform an Iterable[Promise[A]] into Promise[Iterable[A]]
//So listPromise is now of type Promise[Array[Either[Throwable, String]]]
val listPromise = Http.promise.all(promises)

for (results <- listPromise) {
    //Here results is of type Array[Either[Throwable, String]]

    results foreach { result =>
        result match {
            Left(error) => //Handle error
            Right(response) => //Handle response
        }
    }
}
于 2012-12-03T19:21:16.290 に答える
2

はるかに良い方法があります:

val promises = urls.map {u =>
  val service = url(u)
  val promise = Http(service OK as.String).either
}

val listPromise = Future.sequence(promises)

listPromise.onComplete { whatever }
于 2012-12-03T17:35:07.603 に答える
2

Alexey Romanov の回答に同意します。どのような方法で http リクエストを同期する場合でも、promise の完了を処理する方法に注意してください。同時アクセスがアクターの状態に現れる可能性があるという点で、あなたの直感は正しいです。これを処理するより良い方法は、次のようにすることです。

def resultHandler(result: String) {
    //on completion we are sending the result to the actor who triggered the call
    //as a message
    self ! HttpComplete(result)
}

そして、アクターの受信関数で:

def receive = {
    //PROCESS OTHER MESSAGES HERE
    case HttpComplete(result) => //do something with the result
}

このようにして、http の結果の処理が外部からのアクターの状態に違反しないことを確認しますが、それを行う適切な方法であるアクターの受信ループから

于 2012-12-03T17:44:06.050 に答える
1
val nbOfResults = new java.util.concurrent.atomic.AtomicInteger(nbOfCalls)

// After particular call was ended    
if (nbOfResults.decrementAndGet <= 0) {
  // Do something
}

[編集] AtomicReference CAS の古い回答を削除 - while(true)、compareAndSet など

于 2012-12-03T17:57:06.570 に答える