私がやろうとしているのは、ストリームを 2 つのフィールド ( "remote-client-ip", "request-params"
) でグループ化し、各グループ内のタプルの数を数えることです。そして、それらを組み合わせて地図にします。これが私のトポロジです:
topology.newStream("kafka-spout-stream-1", repeatSpout)
.each(new Fields("str"), new URLParser(), new Fields(fieldNames))
.each(new Fields("remote-client-ip", "request-params"), new HTTPParameterExtractor(), new Fields("query-string"))
.groupBy(new Fields("remote-client-ip", "query-string"))
.aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count"))
.groupBy(new Fields("remote-client-ip"))
.persistentAggregate(new MemoryMapState.Factory(), new UserQueryStringCombiner(), new Fields("user-word-count-list"));
しかし、デバッグ後、最初はデータ ストリームがブロックされていることがわかりましgroupBy()
た。これは複数フィールドのグループ化です。Count()
後続の集計ステートメントでは何も実行されませんでした。
そのため、複数フィールドのグループ化と集計の間の相互作用に関するいくつかの概念を誤解していると思います。
私の憶測が正しいか間違っているか教えてください。ありがとうございました!