1

他の誰かが patchPanel を使用して複数の列挙子を組み合わせ、WebSocket を介してクライアントに到達することを願っています。私が直面している問題は、patchPanel が最後に追加された列挙子からのデータ フィードのみを送信していることです。

私は次の例に従いました。http://lambdaz.blogspot.ca/2012/12/play-21-multiplexing-enumerators-into.html patchPanel に関して私が見つけた唯一のリファレンスです。

バージョン; 遊ぶ!2.1.1 (Java 1.7.0_11 および Scala 2.10.0 を使用)

Web ソケット方式。

def monitorStream = WebSocket.async[JsValue] { request =>
  val promiseIn = promise[Iteratee[JsValue, Unit]]
  val out = Concurrent.patchPanel[JsValue] { patcher =>
    val in = Iteratee.foreach[JsValue] { json =>
      val event:Option[String] = (json \ "event").asOpt[String]
      val systemId = (json \ "systemId").as[Long]
      event.getOrElse("") match {
        case "join" => 
          val physicalSystem = SystemIdHandler.getById(systemId)
          val monitorOut = (MonitorStreamActor.joinMonitor(physicalSystem)) 
          monitorOut map { enum =>
            val success = patcher.patchIn(enum)
        }
      }
    }.mapDone { _ => Logger.info("Disconnected") }
    promiseIn.success(in)
  }
  future(Iteratee.flatten(promiseIn.future),out)
}

MonitorStreamActor 呼び出し。

  def joinMonitor(physicalSystem: PhysicalSystem):
    scala.concurrent.Future[Enumerator[JsValue]]
     = {
    val monitorActor = ActorBase.akkaSystem.actorFor("/user/system-" + physicalSystem.name +"/stream")
    (monitorActor ? MonitorJoin()).map {
      case MonitorConnected(enumerator) =>
        enumerator
      }

  }

列挙子は正常に返され、そこに供給されるデータはアクターを呼び出すタイマーからのものです。アクター定義、タイマーは UpdatedTranStates ケースにヒットします。

class MonitorStreamActor() extends Actor {
  val (monitorEnumerator, monitorChannel) = Concurrent.broadcast[JsValue]
  import play.api.Play.current
  def receive = {
    case MonitorJoin() => {
      Logger.debug ("Actor monitor join")
      sender ! MonitorConnected(monitorEnumerator)
    }
    case UpdatedTranStates(systemName,tranStates) => {
      //println("Got updated Tran States")
      val json = Json.toJson(tranStates.map(m => Map("State" -> m._1, "Count" -> m._2) ))
      //println("Pushing updates to monitorChannel")
      sendUpdateToClients(systemName, "states", json)
    }
  def sendUpdateToClients(systemName:String, updateType:String, json:JsValue) {
    monitorChannel.push(Json.toJson(
      Map(
        "dataType"->Json.toJson(updateType),
        "systemName" -> Json.toJson(systemName),
        "data"->json)))
  }
}
}

これについてしばらく調べてみましたが、 patchPanel に追加された最後の列挙子だけにデータが送信される理由が見つかりませんでした。APIドキュメントはあまり役に立ちません.patchInを呼び出すだけで、すべての列挙子を反復子に結合する必要があるように思えますが、そうではないようです.

4

1 に答える 1

3

PatchPanel は設計上、現在の列挙子を patchIn メソッドによって提供される新しい列挙子に置き換えます。

複数の列挙子を結合するには、interleave または andThen メソッドを使用して列挙子を結合する必要があります。この場合、1 つを空にしてから次のイベントに移動するのではなく (andThen 演算子のように)、使用可能な各列挙子からイベントを取得するため、インターリーブが推奨されます。

つまり、monitorStream 内。

val monitorOut = (MonitorStreamActor.joinMonitor(physicalSystem)) 
monitorOut map { enum =>
  mappedEnums += ((physicalSystem.name, enum))
  patcher.patchIn(Enumerator.interleave[JsValue]( mappedEnums.values.toSeq))
}

patcher はパッチ パネルで、mappedEnums は HashMap[String,Enumerator[JsValue]] です - Enumerators が変更 (追加または削除) されるたびにパッチャーを再追加します - 動作しますが、それが最善の方法かどうかはわかりませんが、'とりあえずやります:)

于 2013-07-03T14:43:54.090 に答える