1

akka-persistence の PersistenceQuery を使用して、初期状態を管理するアクターにロードしています。起動時に一度だけ再生したいのですが、ログに送信し続けます。

14:11:28.405 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:28.407 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
14:11:31.376 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:31.377 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
14:11:34.376 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:34.378 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]

これは私がそれを達成しようとして書いたプログラムです。

implicit val mat = ActorMaterializer()(context)
val queries = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](
      LeveldbReadJournal.Identifier)
val src: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("rooms", 0L, Long.MaxValue)
val events: Source[Any, NotUsed] = src.map(_.event)
val future = events.runWith(Sink.foreach{
  case x: RoomCreated => process(x)
  case x: RoomDeleted => process(x)
  case x => logger.error(s"Could not spawn $x")
})
4

1 に答える 1

1

eventsByPersistenceId予想される動作と実際に見たものとの違いは、それが「ライブ」ストリームであることだと思います。つまり、指定したオフセット範囲内で開始するイベントを返すだけでなく (0 から開始し、Long.MaxValue に移動するため、すべて)、新しいイベントが発生すると、新しいイベントを送信し続けます。ライブ ストリームが必要な場合は、currentEventsByPersistenceId代わりに呼び出しを変更します。これには、その時点 (リクエストを行っている時点) までのもののみが含まれ、ライブ ストリームではありません。それはあなたが探しているものでなければなりません。

于 2016-07-25T16:26:36.790 に答える