5

私はイベントの無限の流れを持っています:

(timestamp, session_uid, traffic)

すなわち

...
(1448089943, session-1, 10)
(1448089944, session-1, 20)
(1448089945, session-2, 50)
(1448089946, session-1, 30)
(1448089947, session-2, 10)
(1448089948, session-3, 10)
...

これらのイベントを session_uid でグループ化し、各セッションのトラフィックの合計を計算します。

akka-streams私は有限ストリームの使用で問題なく動作するフローを書きました(クックブックのこのgroupByに基づいた私のコード ベース)。ただし、無限ストリームでは機能しません。関数はすべての受信ストリームを処理し、その後でのみ結果を返す準備ができるためです。groupBy

タイムアウト付きのグループ化を実装する必要があると思います。つまり、指定された stream_uid のイベントを最後から 5 分以上受信しない場合は、この session_uid のグループ化されたイベントを返す必要があります。しかし、それを実装する方法akka-streamsは?

4

3 に答える 3

2

これは次のユースケースのようですSource.groupedWithin

def groupedWithin(n: Int, d: FiniteDuration): Source[List[Out], Mat]

「このストリームを、時間枠内に受信した要素のグループに分割するか、指定された数の要素によって制限されます。最初に何が起こるかは関係ありません。」

ここにドキュメントへのリンクがあります

于 2015-12-07T17:42:37.123 に答える
0

おそらく、アクターによって簡単に実装できます

case class SessionCount(name: String)

class Hello private() extends Actor {
  var sessionMap = Map[String, Int]()

  override def receive: Receive = {
    case (_, session: String, _) =>
      sessionMap = sessionMap + (session -> (sessionMap.getOrElse(session, 0) + 1))

    case SessionCount(name: String) => sender() ! sessionMap.get(name).getOrElse(0)
  }
}


object Hello {
  private val actor = ActorSystem.apply().actorOf(Props(new Hello))
  private implicit val timeOver = Timeout(10, TimeUnit.SECONDS)
  type Value = (String, String, String)

  def add(value: Value) = actor ! value

  def count(name:String) = (actor ? SessionCount(name )).mapTo[Int]
}
于 2015-11-21T11:03:18.750 に答える