8

アイデアは、後で使用するためにチャネルを開いたままにしておくことです。playframework 2.5.x では、ドキュメントには akka ストリームを使用する必要があると記載されていますが、この例を実現する方法については何も記載されていません。誰かが私を助けることができますか?

import play.api.mvc._
import play.api.libs.iteratee._
import play.api.libs.concurrent.Execution.Implicits.defaultContext

def socket =  WebSocket.using[String] { request =>

  // Concurrent.broadcast returns (Enumerator, Concurrent.Channel)
  val (out, channel) = Concurrent.broadcast[String]

  // log the message to stdout and send response back to client
  val in = Iteratee.foreach[String] {
    msg => println(msg)
      // the Enumerator returned by Concurrent.broadcast subscribes to the channel and will
      // receive the pushed messages
      channel push("I received your message: " + msg)
  }
  (in,out)
}
4

3 に答える 3

1

私は最終的にアクターを使用して解決策を見つけました。私はこれを見つけました:

def conect = WebSocket.accept[JsValue, JsValue] {request => 
  ActorFlow.actorRef(out => UserWebSocket.props(out, users))
}

次に、ActorFlow.actorRef のソース コードを確認しました: https://github.com/playframework/playframework/blob/2.5.0/framework/src/play-streams/src/main/scala/play/api/libs/ストリーム/ActorFlow.scala

そして、この解決策を思いつきました:

import javax.inject._
import play.api.Configuration
import play.api.mvc._
import scala.concurrent._

import akka.stream.{Materializer, OverflowStrategy}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.actor._

class UserActor(out: ActorRef) extends Actor {
  def receive = {
    // receives messages from client browser here
    // out is actor that will send messages back to client(s)
    case msg: String => out ! "Received message "+msg
  }
}
object UserActor {
  def props(out: ActorRef) = Props(new UserActor(out))
}

@Singleton
class NotificationController @Inject()(val config:Configuration)
                          (implicit ec: ExecutionContext, actorSystem:ActorSystem, materializer: Materializer) extends Controller {

  // outActor can be used to send messages to client(s)
  // Sink.asPublisher(true) makes this a broadcast channel (multiple clients can connect to this channel, and messages sent to outActor are broadcast to all of them).  Use Sink.asPublisher(false) to create a unicast channel.
  val (outActor, publisher) = Source.actorRef[String](99, OverflowStrategy.dropNew)
        .toMat(Sink.asPublisher(true))(Keep.both).run()


  def flowsocket = WebSocket.accept[String, String] {request =>
    val aflow:Flow[String, String, _] = {

        val sink = Sink.actorRef( actorSystem.actorOf(UserActor.props(outActor)), akka.actor.Status.Success(()) )

        val source = Source.fromPublisher(publisher)

        Flow.fromSinkAndSource(
            sink, source
        )
    }
    aflow
  }

}

その後、Actor モデルをより完全に受け入れるようにソリューションを修正しました。これで、他のすべての「UserActor」が接続し、それを介して通信できるシングルトン アクターである「UsersBroadcastActor」ができました。

lazy val broadcastActorRef = actorSystem.actorOf(Props[UsersBroadcastActor])

def flowsocket = WebSocket.accept[JsValue, JsValue] { request =>
    ActorFlow.actorRef(out => UserActor.props(out, broadcastActorRef))
}

UserActor がインスタンス化されると、その preStart() メソッドで、サブスクリプション メッセージが broadcastActorRef に送信されます。これにより、「サブスクライブ」するすべての UserActors への参照が保存されます。メッセージを BroadcastActorRef に送信すると、それが各 UserActors に転送されます。このソリューションの完全なコード サンプルもご希望の場合はお知らせください。

于 2016-03-24T20:34:27.890 に答える