6

Slick を使用すると、テーブルから結果のストリームを生成するために次のことができます。

val q = for (e <- events) yield e.name
val p: DatabasePublisher[String] = db.stream(q.result)

p.foreach { s => println(s"Event: $s") }

これにより、テーブル内のすべてのイベントが出力されevents、最後の行の後に終了します。

events新しい行がテーブルに入力されたときに何らかの方法で通知できると仮定すると、イベントが挿入されたときにイベントを継続的に出力するストリームを作成することは可能ですか? tail -fDB テーブルの一種。

Slick はこれをネイティブにサポートしないと思いますが、Akka ストリーミングを使用して支援できるはずです。したがって、空になるまで Slick Source から何かを取得し、イベントがテーブル内の追加データを示すのを待ってから、新しいデータをストリーミングすることができた場合。おそらく、を使用しActorPublisherてこのロジックをバインドしますか?

誰かがこの分野での経験やアドバイスを持っているかどうか疑問に思っていますか?

4

2 に答える 2

5

あなたは正しかったです:)これは、PostgreSQL、非同期DBドライバー、およびLISTEN / NOTIFYメカニズムActorPublisherを使用した簡単な例です。

俳優:

class PostgresListener extends ActorPublisher[String] {

  override def receive = {
    case _ ⇒
      val configuration = URLParser.parse(s"jdbc://postgresql://$host:$port/$db?user=$user&password=$password")
      val connection = new PostgreSQLConnection(configuration)
      Await.result(connection.connect, 5.seconds)

      connection.sendQuery(s"LISTEN $channel")
      connection.registerNotifyListener { message ⇒ onNext(message.payload) }
  }
}

サービス:

def stream: Source[ServerSentEvent, Unit] = {
  val dataPublisherRef = Props[PostgresListener]
  val dataPublisher = ActorPublisher[String](dataPublisherRef)

  dataPublisherRef ! "go"

  Source(dataPublisher)
    .map(ServerSentEvent(_))
    .via(WithHeartbeats(10.second))
}

build.sbtlibraryDependencies:

"com.github.mauricio"  %% "postgresql-async"         % "0.2.18"

Postgreトリガーは呼び出す必要がありますselect pg_notify('foo', 'payload')

私の知る限り、Slick は をサポートしていませんLISTEN

于 2015-12-15T22:15:37.860 に答える