3

ネイティブの Enumerator コンストラクトを使用して Play で websocket を正常にセットアップし、文字列を返すコードを呼び出しました。

def operationStatusFeed = WebSocket.using[String] { implicit request =>
  val in = Iteratee.ignore[String]
  val out = Enumerator.repeatM {
   Promise.timeout(operation, 3 seconds)
  }
  (in, out)
}

Now I want my operation function to return an rx.lang.scala.Observable[String] instead of a String, and I want to output any String as soon as it enters. How can I map this Observable to a play.api.libs.iteratee.Enumerator?

4

2 に答える 2

2

Bryan Gilbert の暗黙的な変換を使用できます。これは問題なく動作しますが、Bryan Gilbert の変換の更新版を使用するように注意してください。Jeroen Kransen からの回答で Unsubscribe が呼び出されることはありません (これは悪いことです!)。

  /*
   * Observable to Enumerator
   */
  implicit def observable2Enumerator[T](obs: Observable[T]): Enumerator[T] = {
    // unicast create a channel where you can push data and returns an Enumerator
    Concurrent.unicast { channel =>
      val subscription = obs.subscribe(new ChannelObserver(channel))
      val onComplete = { () => subscription.unsubscribe }
      val onError = { (_: String, _: Input[T]) => subscription.unsubscribe }
      (onComplete, onError)
    }
  }

  class ChannelObserver[T](channel: Channel[T]) extends rx.lang.scala.Observer[T] {
    override def onNext(elem: T): Unit = channel.push(elem)
    override def onCompleted(): Unit = channel.end()
    override def onError(e: Throwable): Unit = channel.end(e)
  }

完全にするために、 Enumerator から Observable への変換は次のとおりです。

  /*
   * Enumerator to Observable
   */
  implicit def enumerator2Observable[T](enum: Enumerator[T]): Observable[T] = {
    // creating the Observable that we return
    Observable({ observer: Observer[T] =>
      // keeping a way to unsubscribe from the observable
      var cancelled = false

      // enumerator input is tested with this predicate
      // once cancelled is set to true, the enumerator will stop producing data
      val cancellableEnum = enum through Enumeratee.breakE[T](_ => cancelled)

      // applying iteratee on producer, passing data to the observable
      cancellableEnum (
        Iteratee.foreach(observer.onNext(_))
      ).onComplete { // passing completion or error to the observable
        case Success(_) => observer.onCompleted()
        case Failure(e) => observer.onError(e)
      }

      // unsubscription will change the var to stop the enumerator above via the breakE function
      new Subscription { override def unsubscribe() = { cancelled = true } }
    })
  }

Play の WebSocket の Rx

一方、Play で Iteratees と Enumerator を扱うのは、ほとんどの場合、(ここで行っているように) WebSocket を使用するときであることに気付くかもしれません。Iteratees は Observables ほど直観的ではないということには誰もが同意しており、これがおそらく Play プロジェクトで Rx を使用している理由です。

その観察から、まさにこれを行うWidgetManagerと呼ばれるライブラリを構築しました: Rx を Play に統合し、Iteratees 操作を取り除きます。

そのライブラリを使用すると、コードは次のようになります。

def operationStatusFeed = WebSocket.using[String] { implicit request =>

  // you can optionally give a function to process data from the client (processClientData)
  // and a function to execute when connection is closed (onClientClose)
  val w = new WidgetManager()

  w.addObservable("op", operation)

  // subscribe to it and push data in the socket to the client (automatic JS callback called)
  w.subscribePush("op")

  // deals with Iteratees and Enumerators for you and returns what's needed
  w.webSocket
}

ライブラリは GitHub にあります: RxPlay (貢献は大歓迎です)

于 2014-06-28T11:28:16.333 に答える