私のストリームは、「生」として 60 秒の集計でデータベースに保存したいキーと値のペアです。もともと私はこれをやっていました:
->foreach
/
kStreamBuilder.stream->aggregateBy->process
しかし、それから私はそれを知りました
を。.aggregateby()
一致するペアのみを返します(一致するかどうかにかかわらず、すべてが必要です)
b. フェーズでHashMapを使用して、同じ集計効果を実現できました。.process()
次に.punctuate()
が呼び出されると、すべての k/v ペアをデータベースに書き込みます。
したがって、結果のトポロジは次のようになります。
kStreamBuilder.stream->foreach
kStreamBuilder.stream->process
質問:
- これは、一致するすべての kv ペアを書き込む結果を達成するための「合理的な」方法ですか? ( foreachおよび任意のペアを介したすべての値+ processを介した残り)
- 元のストリームを送信する前に(どういうわけか)分割する必要がありますか
.foreach()
、.process()
または上記を実行するだけで十分ですか?