25

akka がメッセージ配信を保証しない方法と理由について、いくつかの記事を読みました。ドキュメンテーション、この議論およびグループに関する他の議論は、それをよく説明しています。

私はかなり akka を使い始めたばかりで、ケースの適切なデザインを知りたいと思っています。たとえば、3 人の異なるアクターがすべて異なるマシンにいるとします。1 人は料理本を担当し、もう 1 人は歴史を担当し、最後はテクノロジー本を担当しています。

別のマシンに主役がいます。利用可能な本があるかどうかを検索するための主役へのクエリがあるとします。メイン アクターは 3 つのリモート アクターにリクエストを送信し、結果を期待します。だから私はこれを行います:

  val scatter = system.actorOf(
        Props[SearchActor].withRouter(ScatterGatherFirstCompletedRouter(
              routees=someRoutees, within = 10 seconds)), "router")
  implicit val timeout = Timeout(10 seconds)
  val futureResult = scatter ?  Text("Concurrency in Practice")
  //      What should I do here?.
  //val result = Await.result(futureResult, timeout.duration) line(a)

要するに、私は 3 つのリモート アクターすべてにリクエストを送信し、10 秒で結果を期待しています。

アクションはどうあるべきですか?

  1. 10 秒以内に結果が得られなかったとします。それらすべてに新しいリクエストを再度送信する必要がありますか?
  2. within上記の時間が時期尚早である場合はどうなりますか。しかし、どれくらいの時間がかかるかは事前にわかりません。
  3. within時間が十分だったのに、メッセージがドロップされたとしたらどうでしょう。

時間内に応答が得られない場合withinは、要求を再送信します。このようなもので、非同期のままです。

futureResult onComplete{
  case Success(i) => println("Result "+i)
  case Failure(e) => //send again
}

しかし、クエリが多すぎると、呼び出しのスレッドが多すぎてかさばりませんか? コメントを外すline(a)と、同期になり、負荷がかかるとパフォーマンスが低下する可能性があります。

10 秒以内に応答がないとします。時間が時期尚早だった場合within、無駄な計算が再び発生します。メッセージがドロップされた場合、10数秒の貴重な時間が無駄になります。場合によっては、メッセージが配信されたことを知っていたとしたら、懐疑的ではなく、おそらくより長い時間待つでしょう.

人々はそのような問題をどのように解決しますか?ACK? しかし、すべてのクエリのアクターに状態を保存する必要があります。それはよくあることで、正しいデザインを探しています。

4

1 に答える 1

25

私はあなたのためにこれらの質問のいくつかに答えようとします. すべてに具体的な答えを出すことはできませんが、正しい方向に導くことができれば幸いです。

手始めに、本の検索を行う 3 つのアクターにリクエストを伝える方法を変更する必要があります。ここで a を使用することScatterGatherFirstCompletedRouterは、おそらく正しいアプローチではありません。このルーターは、ルートの 1 つ (最初に応答したルート) からの応答のみを待機するため、他の 2 つのルートからの結果が含まれないため、結果のセットは不完全になります。もありBroadcastRouterますが、それは を処理するだけtell (!)ask (?). やりたいことを行うための 1 つのオプションは、各受信者に要求を送信し、応答を取得してから、を使用しFuturesてそれらを集約に結合することです。簡単な例は次のようになります。FutureFuture.sequence

case class SearchBooks(title:String)
case class Book(id:Long, title:String)

class BookSearcher extends Actor{

  def receive = {
    case req:SearchBooks =>
      val routees:List[ActorRef] = ...//Lookup routees here
      implicit val timeout = Timeout(10 seconds)
      implicit val ec = context.system.dispatcher

      val futures = routees.map(routee => (routee ? req).mapTo[List[Book]])
      val fut = Future.sequence(futures)

      val caller = sender //Important to not close over sender
      fut onComplete{
        case Success(books) => caller ! books.flatten

        case Failure(ex) => caller ! Status.Failure(ex)
      }
  }
}

これは最終的なコードではありませんが、サンプルが実行しようとしていたことの近似値です。この例では、ダウンストリーム ルートのいずれかが失敗/タイムアウトした場合、Failureブロックにヒットし、呼び出し元も失敗します。それらがすべて成功した場合、呼び出し元はBook代わりにオブジェクトの集計リストを取得します。

それでは、質問に移ります。最初に、タイムアウト内にいずれかのルートから応答が得られない場合、すべてのアクターに再度リクエストを送信する必要があるかどうかを尋ねます。この質問に対する答えは、本当にあなた次第です。相手側のユーザーが部分的な結果 (つまり、3 つのアクターのうちの 2 つの結果) を表示できるようにしますか? それとも、常に結果の完全なセットである必要がありますか? 答えが「はい」の場合、ルートに送信するコードを次のように微調整できます。

val futures = routees.map(routee => (routee ? req).mapTo[List[Book]].recover{
  case ex =>
    //probably log something here
    List()
})

このコードを使用すると、いずれかのルートが何らかの理由でタイムアウトまたは失敗した場合、失敗の代わりに 'Book` の空のリストが応答に置き換えられます。ここで、部分的な結果に耐えられない場合は、リクエスト全体を再送信できますが、おそらく相手側に本の結果を待っている人がいて、彼らは永遠に待ちたくないということを覚えておく必要があります.

2 番目の質問では、タイムアウトが早すぎるとどうなるかを尋ねます。選択するタイムアウト値は完全にあなた次第ですが、ほとんどの場合、2 つの要因に基づいている必要があります。最初の要因は、検索の呼び出し時間をテストすることです。平均してどれくらいの時間がかかるかを調べ、それに基づいて安全のために少し余裕を持って値を選択してください。2 番目の要素は、相手が結果をどれだけ待つかということです。タイムアウトを非常に控えめにして、安全のために 60 秒のようにすることもできますが、実際に相手側に結果を待っている人がいる場合、彼らはどれくらい待つつもりですか? 永遠に待つのではなく、再試行する必要があることを示す失敗応答を受け取りたいです。この2つの要素を考慮すると、

質問 3 では、メッセージが破棄された場合にどうなるかを尋ねます。この場合、受信者のアクターが応答するメッセージを受信しないため、応答が得られないため、そのメッセージを受信した人の将来はタイムアウトになると推測しています。Akka は JMS ではありません。受信者がメッセージを受信して​​確認しない場合にメッセージを何度も再送信できる確認モードはありません。

また、私の例からわかるように、をFuture使用して集計をブロックしないことに同意しAwaitます。非ブロッキング コールバックを使用することを好みます。受信関数でのブロックは、Actorブロック操作が完了するまでインスタンスがメールボックスの処理を停止するため、理想的ではありません。ノンブロッキング コールバックを使用することで、そのインスタンスを解放してメールボックスの処理に戻り、結果のExecutionContext処理を、メールボックスを処理するアクターから切り離された で実行される単なる別のジョブにすることができます。

ネットワークが信頼できないときに通信を無駄にしたくない場合は、Akka 2.2 で利用できる Reliable Proxyを調べることができます。このルートに行きたくない場合は、pingタイプメッセージをルートに定期的に送信することで、自分でロールすることができます。時間内に応答しない場合は、ダウンとしてマークし、信頼できる状態になるまで (非常に短い時間で) メッセージを送信しません。pingそれから、ルートごとのFSMのようなものです。この動作が絶対に必要な場合は、どちらでも機能しますが、これらのソリューションは複雑になるため、この動作が絶対に必要な場合にのみ使用する必要があることに注意してください。銀行のソフトウェアを開発していて、保証された配信セマンティクスが絶対に必要な場合は、財務上の悪影響が生じるため、必ずこの種のアプローチを使用してください。このようなものが必要かどうかは慎重に判断してください。90% の確率で必要ないからです。あなたのモデルでは、成功しないことがすでにわかっていることを待つことによって影響を受ける可能性のある唯一の人は、相手側の発信者です。アクターでノンブロッキング コールバックを使用することにより、時間がかかる可能性があるという事実によって中断されることはありません。それ' s はすでに次のメッセージに移動しています。また、失敗して再提出することにした場合も注意が必要です。受信アクターのメールボックスをあふれさせたくありません。再送する場合は、一定回数で上限を設定してください。

これらの保証された種類のセマンティクスが必要な場合に考えられるもう 1 つのアプローチは、Akka のClustering Modelを調べることです。ダウンストリーム ルートをクラスター化し、サーバーの 1 つに障害が発生した場合、すべてのトラフィックは、他のノードが回復するまで稼働していたノードにルーティングされます。

于 2013-05-29T12:25:03.067 に答える