3

私は実験的な Akka Persistence Query モジュールを調査しており、アプリケーションにカスタムの読み取りジャーナルを実装することに非常に興味があります。CurrentPersistenceIdsQueryドキュメントでは、ジャーナルの現在の状態を返すクエリ (例: ) と、イベントがアプリケーションの書き込み側を介してジャーナルにコミットされるときにイベントを発行するサブスクライブ可能なストリームを返すクエリ (例: AllPersistenceIdsQuery)の 2 つの主要なクエリについて説明しています。

私の不自然なアプリケーションでは、Postgres と Slick 3.1.1 を使用してこれらのクエリの根幹を動かしています。次のようなことを行うことで、データベース クエリの結果を正常にストリーミングできます。

override def allPersistenceIds = {
  val db = Database.forConfig("postgres")
  val metadata = TableQuery[Metadata]

  val query = for (m <- metadata) yield m.persistenceId
  Source.fromPublisher(db.stream(query.result))
}

ただし、基になる Slick DB アクションが完了するとすぐに、ストリームは完了したと通知されます。これは、新しいイベントを発行できる永続的にオープンなストリームの要件を満たしていないようです。

私の質問は次のとおりです。

  • Akka Streams DSL を純粋に使用してそれを行う方法はありますか? つまり、クローズできないフローを送信できますか?
  • LevelDB の読み取りジャーナルがどのように機能するかを調査しましたが、読み取りジャーナルを書き込みジャーナルにサブスクライブさせることで、新しいイベントを処理しているようです。これは理にかなっているように思えますが、質問する必要があります。一般的に、この要件に対処するための推奨されるアプローチはありますか?
  • 私が考えたもう1つのアプローチはポーリングです(たとえば、定期的に読み取りジャーナルにDBを照会させ、新しいイベント/ IDをチェックします)。私よりも経験豊富な人がアドバイスを提供できますか?

ありがとう!

4

2 に答える 2

4

この1行のコードほど簡単ではありませんが、あなたはすでに正しい道を歩んでいます。

「無限」ストリームを実装するには、複数回クエリを実行する必要があります。つまり、基礎となるデータベースが無限クエリを許可しない限り、ポーリングを実装します (ここでは AFAICS ではありません)。

ポーリングは「オフセット」を追跡する必要があるため、何らかのタグでクエリを実行していて、別のポーリングを発行する場合は、先頭ではなく「最後に発行された要素」からその (2 番目の) クエリを開始する必要があります。再びテーブルの。そのため、このオフセットを保持する場所 (おそらくアクタ) が必要です。

Query Side LevelDB プラグインは、基になるジャーナルとその仕組みについて多くのことを想定しているため、他の実装にとって最適なロール モデルではありません。また、LevelDB は Akka Persistence を使用した運用向けではありません。これは、(Cassandra などを起動することなく) すぐに使用できる永続的なジャーナルを作成するために出荷されるジャーナルです。

インスピレーションを探しているなら、MongoDB プラグインは、SQL ストアと非常によく似た制限があるため、実際にはそのためのかなり良いソースになるはずです。現在クエリ側を実装している SQL ジャーナルがあるかどうかはわかりません。

于 2016-01-07T01:06:10.317 に答える
0

Postgres レプリケーション APIを使用して、データベース イベントの「無限」ストリームを取得できます。バージョン 42.0.0 以降の Postgres JDBC ドライバーでサポートされています。関連するプル リクエストを参照してください。ただし、これは実際のストリームではなく、データベース WAL からバッファリングされた同期リーダーです。

PGReplicationStream stream =
    pgConnection
        .replicationStream()
        .logical()
        .withSlotName("test_decoding")
        .withSlotOption("include-xids", false)
        .withSlotOption("skip-empty-xacts", true)
        .start();
while (true) {
  ByteBuffer buffer = stream.read();
  //process logical changes
}

このリーダー用にalpakka プロジェクトにAkka Streams アダプター (ソース) があると便利です。

于 2017-03-03T22:39:10.427 に答える