次のような外部 (つまり、変更できない) Java API があります。
public interface Sender {
void send(Event e);
}
Sender
各イベントを受け入れ、それを JSON オブジェクトに変換し、いくつかのイベントを 1 つのバンドルに収集し、HTTP 経由でエンドポイントに送信するを実装する必要があります。send()
これはすべて、呼び出しスレッドをブロックせずに、固定サイズのバッファーを使用して、バッファーがいっぱいになった場合に新しいイベントをドロップすることなく、非同期で実行する必要があります。
akka-streams を使用すると、これは非常に簡単です: ステージのグラフ (akka-http を使用して HTTP リクエストを送信する) を作成し、それを具体化し、具体化されたものを使用してActorRef
新しいイベントをストリームにプッシュします。
lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue, OverflowStrategy.fail)
.via(CustomBuffer(bufferSize)) // buffer all events
.groupedWithin(batchSize, flushDuration) // group events into chunks
.map(toBundle) // convert each chunk into a JSON message
.mapAsyncUnordered(1)(sendHttpRequest) // send an HTTP request
.toMat(Sink.foreach { response =>
// print HTTP response for debugging
})(Keep.both)
lazy val (eventsActor, completeFuture) = eventPipeline.run()
override def send(e: Event): Unit = {
eventsActor ! e
}
これは、ライブラリが提供するものに非常に似ていますが、特定のニーズに合わせて調整されCustomBuffer
たカスタムです。おそらく、この特定の質問には関係ありません。GraphStage
Buffer
ご覧のとおり、ストリーム以外のコードからのストリームとのやり取りは非常に簡単です。トレイトの!
メソッドActorRef
は非同期であり、追加の機械を呼び出す必要はありません。アクターに送信される各イベントは、リアクティブ パイプライン全体を通じて処理されます。さらに、akka-http がどのように実装されているかにより、接続プールも無料で取得できるため、サーバーに対して複数の接続が開かれることはありません。
しかし、FS2 で同じことを適切に行う方法が見つかりません。バッファリング (おそらく、必要なPipe
追加処理を行うカスタム実装を作成する必要があるでしょう) と HTTP 接続プーリングの問題を捨てても、もっと基本的なこと、つまり、データを「外部から」のリアクティブ ストリーム。
私が見つけることができるすべてのチュートリアルとドキュメントは、プログラム全体が何らかの効果コンテキスト内で発生することを前提としていますIO
。これは私の場合ではありません。send()
メソッドは、指定されていない時間に Java ライブラリによって呼び出されます。したがって、すべてを 1 つのIO
アクション内に保持することはできません。必ずメソッド内で「プッシュ」アクションを終了しsend()
、リアクティブ ストリームを別のエンティティとして持つ必要があります。これは、イベントを集約し、できれば HTTP 接続をプールしたいからです (これは私が信じていることです)。は自然に反応ストリームに結び付けられます)。
のような追加のデータ構造が必要だと思いますQueue
。fs2 には確かに何らかの種類がありfs2.concurrent.Queue
ますが、すべてのドキュメントは単一のIO
コンテキスト内でそれを使用する方法を示しているため、次のようなことを行うと想定しています
val queue: Queue[IO, Event] = Queue.unbounded[IO, Event].unsafeRunSync()
次にqueue
、ストリーム定義内で使用し、send()
メソッド内で別のunsafeRun
呼び出しを使用します。
val eventPipeline = queue.dequeue
.through(customBuffer(bufferSize))
.groupWithin(batchSize, flushDuration)
.map(toBundle)
.mapAsyncUnordered(1)(sendRequest)
.evalTap(response => ...)
.compile
.drain
eventPipeline.unsafeRunAsync(...) // or something
override def send(e: Event) {
queue.enqueue(e).unsafeRunSync()
}
は正しい方法ではなく、おそらくうまくいかないでしょう。
それで、私の質問は、fs2を適切に使用して問題を解決するにはどうすればよいですか?