私は実験的な Akka Persistence Query モジュールを調査しており、アプリケーションにカスタムの読み取りジャーナルを実装することに非常に興味があります。CurrentPersistenceIdsQuery
ドキュメントでは、ジャーナルの現在の状態を返すクエリ (例: ) と、イベントがアプリケーションの書き込み側を介してジャーナルにコミットされるときにイベントを発行するサブスクライブ可能なストリームを返すクエリ (例: AllPersistenceIdsQuery
)の 2 つの主要なクエリについて説明しています。
私の不自然なアプリケーションでは、Postgres と Slick 3.1.1 を使用してこれらのクエリの根幹を動かしています。次のようなことを行うことで、データベース クエリの結果を正常にストリーミングできます。
override def allPersistenceIds = {
val db = Database.forConfig("postgres")
val metadata = TableQuery[Metadata]
val query = for (m <- metadata) yield m.persistenceId
Source.fromPublisher(db.stream(query.result))
}
ただし、基になる Slick DB アクションが完了するとすぐに、ストリームは完了したと通知されます。これは、新しいイベントを発行できる永続的にオープンなストリームの要件を満たしていないようです。
私の質問は次のとおりです。
- Akka Streams DSL を純粋に使用してそれを行う方法はありますか? つまり、クローズできないフローを送信できますか?
- LevelDB の読み取りジャーナルがどのように機能するかを調査しましたが、読み取りジャーナルを書き込みジャーナルにサブスクライブさせることで、新しいイベントを処理しているようです。これは理にかなっているように思えますが、質問する必要があります。一般的に、この要件に対処するための推奨されるアプローチはありますか?
- 私が考えたもう1つのアプローチはポーリングです(たとえば、定期的に読み取りジャーナルにDBを照会させ、新しいイベント/ IDをチェックします)。私よりも経験豊富な人がアドバイスを提供できますか?
ありがとう!