Streamparse を使用して Python で単純な Storm トポロジを記述しようとしています。私が書いた単純な Kafka スパウトを除いて、すべてが機能しています。「next_tuple」を継続的に呼び出しているようです。私のボルトはかなり遅いので、システムはメモリ内で非常に急速に爆発するようです.
トポロジを起動し、トポロジに多くのメッセージが追加されないように、topology.max.spout.pending を 1 に設定しようとしました。
lein run -m streamparse.commands.run/-main topologies/.clj -t 100 --option 'topology.max.spout.pending=1' --option 'topology.workers=1' --option 'topology.acker.executors=1'
ただし、ボルトがはるかに遅いにもかかわらず、結果は依然として次のとおりです。
24790 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24942 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24944 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24946 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25143 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25144 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25350 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
......
私の単純なカフカスパウト:
class MetadataSpout(Spout):
def initialize(self, stormconf, context):
self.log('----CONFIG: %s----' % stormconf)
k = KafkaClient(os.getenv('KAFKA'))
self.consumer = SimpleConsumer(k, 'vacuum', 'metadata')
def next_tuple(self):
self.log('----NEXT TUPLE----')
messages = self.consumer.get_messages(count=os.getenv('BATCH_COUNT', 20))
self.emit([json.dumps([m.message.value for m in messages])])
私のボルトにはデフォルトの構成しかありませんが、process() メソッドを完了するのにかなりの時間がかかります。それらがどのように問題になるかはわかりませんが、関連性がある場合は投稿できます。