0

現在、mongoDb から Elasticsearch にデータをストリーミングするソリューションを構築しています。私の目標は、elasticsearch に送信されたすべての成功したアイテムを追跡することです。私はakka-streamsとelastic4sを使用しています。現在、es へのストリーミングは次のようになっています。

val esSubscriber: BulkIndexingSubscriber[CustomT] = esClient.subscriber[CustomT](
    batchSize = batchSize,
    completionFn = { () => elasticFinishPromise.success(()); ()},
    errorFn = { (t: Throwable) => elasticFinishPromise.failure(t); ()},
    concurrentRequests = concurrentRequests
    )
val esSink: Sink[CustomT, NotUsed] = Sink.fromSubscriber(esSubscriber)

そして、私の情報源からは次のようになります。

val a: [NotUsed] = mongoSrc
  .via(some operations..)
  .to(esSink)
  .run()

これですべてが正常に動作し、現在、2 番目のシンクで項目数などを記録しています。しかし、実際にelasticsearchに送信されたアイテムをログに記録したいと思います。Elastic4s サブスクライバーはlistener: ResponseListenerwithonAck(): Unitを提供しておりonFailure(): Unit、この情報をこのようにストリームに戻したいと思っています

val mongoSrc: [Source..]
val doStuff: [Flow..]
val esSink: [Flow..] //now as flow instead of sink
val logSink: [Sink[Int...]] //now gets for example a 1 for each successful transported item

mongoSrc ~> doStuff ~> esSink ~> logSink

どうすればそれを実装できますか?onAckとの要素をバッファリングするカスタム ステージが必要onFailureですか? それとももっと簡単な方法がありますか?

助けてくれてありがとう。

4

1 に答える 1

1

Subscriber[T]を活用して、シンクを「フロー化」できますFlow.fromSinkAndSourcedocsの「複合フロー (シンクとソースから)」の図を確認してください。

この場合、カスタムのactorPublisherをソースとしてアタッチし、からメッセージを送信しますonAck()

あなたがより簡単な方法を求めたので:

val doStuff = Flow[DocToIndex]
                .grouped(batchSize)
                .mapAsync(concurrentRequests)(bulkopFuture)

簡単に言うと、すべての有用な抽象化は別として、elastic4s サブスクライバーは単なる一括更新リクエストです。

于 2016-07-30T17:38:32.847 に答える