2 つの異なるストリームに 2 つの異なるスパウトを持つストーム トライデント トポロジを実行しています。私のスパウトは JMS スパウトであり、HDFS State を使用してタプルを永続化します。
スパウトを 1 つだけ実行すると、正常に動作します。HDFS の JMS キューに投稿されたすべてのレコードを取得しています。
2 つの異なるキューに接続する 2 つのスパウトを使用してトポロジを実行しているときに、QUEUE に投稿したものよりも少ないレコードを取得しています。ここで何か間違ったことをしていますか?私がこれを行っている方法に問題がある場合はお知らせください。
TridentTopology topology = new TridentTopology();
topology.newStream("QueueSpout", TridentSpout).partitionPersist(tradeStateFactory,hdfsFields, new HdfsUpdater());
Stream TopicStream = topology.newStream("TopicSpout", TridentTopicSpout);
TopicStream.each(hdfsFields, new CashFilter()).partitionPersist(cashStateFactory, hdfsFields, new HdfsUpdater());
TopicStream.each(hdfsFields, new JournalFilter()).partitionPersist(journalStateFactory, hdfsFields, new HdfsUpdater());
TopicStream.each(hdfsFields, new RcvdlvrFilter()).partitionPersist(rcvdlvrStateFactory, hdfsFields, new HdfsUpdater());