1

まず、私は Storm/Trident を初めて使用し、すでに何時間も問題に取り組んでいます。

私が持っているのは、1 つのパーティションを持つ 1 つの Kafka トピックです。プロデューサーは x ミリ秒ごとにこのトピックにタプルを送信します。TransactionalTridentKafkaSpout がこのトピックから読み取り、一部の Trident オペレーターがそれらを処理します。トポロジ全体がローカル モードで実行されています (リモート モードは今のところテストされていません)。

トポロジのメイン コードは次のとおりです。

TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(spoutConf);
TridentTopology topology = new TridentTopology();
Stream inStream = topology.newStream("kafka-spout", spout).parallelismHint(4);

TridentState state1=inStream
    .groupBy(new Fields(ID_FIELD))
    .persistentAggregate(new MemoryMapState.Factory(), new Fields(ID_FIELD, FIELD1, FIELD2, FIELD3), new CustomCombinerAgg1(), new Fields(COMB_AGG_1_FIELD))
    .parallelismHint(4);

state1.newValuesStream().groupBy(new Fields(ID_FIELD)).
    persistentAggregate(new MemoryMapState.Factory(), new Fields(ID_FIELD, COMB_AGG_1_FIELD), new CustomCombinerAgg2(), new Fields(COMB_AGG_2_FIELD))
    .parallelismHint(4);

state1.newValuesStream().filter(new Fields(ID_FIELD, COMB_AGG_1_FIELD), new CustomBaseFilter1());

inStream.groupBy(new Fields(ID_FIELD))
    .persistentAggregate(new MemoryMapState.Factory(), new Fields(ID_FIELD, FIELD1, FIELD2), new CustomCombinerAgg3(), new Fields(COMB_AGG_3_FIELD));

inStream.groupBy(new Fields(ID_FIELD))
    .persistentAggregate(new MemoryMapState.Factory(), new Fields(ID_FIELD, FIELD1, FIELD2, FIELD3), new CustomCombinerAgg4(), new Fields(COMB_AGG_4_FIELD))
    .newValuesStream().filter(new Fields(ID_FIELD, COMB_AGG_4_FIELD), new CustomBaseFilter2());

今私が抱えている問題は、プロデューサーのメッセージ間隔が短いほど、実行されるオペレーターが少なくなることです。
たとえば、プロデューサがそれぞれ 100 ミリ秒の間隔で 200 のタプルを送信する場合、すべてのオペレーターは 200 のタプルすべてを正しく処理しますが、間隔が 20 ミリ秒に設定されている場合、たとえば、オペレーターは次の数のタプルのみを処理/実行します:
CustomCombinerAgg1: 200
CustomCombinerAgg2: 50
CustomBaseFilter1: 50
CustomCombinerAgg3: 150
CustomCombinerAgg4: 180
CustomBaseFilter2: 60

私が理解している限り、(トランザクション) Trident は正確に 1 回の処理を保証し、タプルの新しいバッチは、前のバッチが完全に処理された後にスパウトから取得する必要があります。これはここでは当てはまらないようで、むしろ最初のオペレーターである CustomCombinerAgg1 が速度を指示し、その後のオペレーターは指定された時間内にすべてのタプルを処理できないように見えますか?

私が期待するのは、すべてのタプルに対してすべてのオペレーターが適切に実行され、タプル/バッチがすべてのオペレーターによって処理されると、次のオペレーターがフェッチされることです。これは Trident を使用した場合に当てはまりませんか? 私は何か間違ったことをしていますか?どうすればこの動作を達成できますか?
Trident は、タプルが完全に処理されたことをどのようにして知るのでしょうか? 私の知る限り、Storm ではタプルを ack() する必要がありますが、Trident オペレーターには OutputCollector がないため、ack() を呼び出すことはできませんか? 私の問題は何とかこれに結びついていますか?

ありがとう。

4

0 に答える 0