現在、mongoDb から Elasticsearch にデータをストリーミングするソリューションを構築しています。私の目標は、elasticsearch に送信されたすべての成功したアイテムを追跡することです。私はakka-streamsとelastic4sを使用しています。現在、es へのストリーミングは次のようになっています。
val esSubscriber: BulkIndexingSubscriber[CustomT] = esClient.subscriber[CustomT](
batchSize = batchSize,
completionFn = { () => elasticFinishPromise.success(()); ()},
errorFn = { (t: Throwable) => elasticFinishPromise.failure(t); ()},
concurrentRequests = concurrentRequests
)
val esSink: Sink[CustomT, NotUsed] = Sink.fromSubscriber(esSubscriber)
そして、私の情報源からは次のようになります。
val a: [NotUsed] = mongoSrc
.via(some operations..)
.to(esSink)
.run()
これですべてが正常に動作し、現在、2 番目のシンクで項目数などを記録しています。しかし、実際にelasticsearchに送信されたアイテムをログに記録したいと思います。Elastic4s サブスクライバーはlistener: ResponseListener
withonAck(): Unit
を提供しておりonFailure(): Unit
、この情報をこのようにストリームに戻したいと思っています
val mongoSrc: [Source..]
val doStuff: [Flow..]
val esSink: [Flow..] //now as flow instead of sink
val logSink: [Sink[Int...]] //now gets for example a 1 for each successful transported item
mongoSrc ~> doStuff ~> esSink ~> logSink
どうすればそれを実装できますか?onAck
との要素をバッファリングするカスタム ステージが必要onFailure
ですか? それとももっと簡単な方法がありますか?
助けてくれてありがとう。