2

Streamparse を使用して Python で単純な Storm トポロジを記述しようとしています。私が書いた単純な Kafka スパウトを除いて、すべてが機能しています。「next_tuple」を継続的に呼び出しているようです。私のボルトはかなり遅いので、システムはメモリ内で非常に急速に爆発するようです.

トポロジを起動し、トポロジに多くのメッセージが追加されないように、topology.max.spout.pending を 1 に設定しようとしました。

lein run -m streamparse.commands.run/-main topologies/.clj -t 100 --option 'topology.max.spout.pending=1' --option 'topology.workers=1' --option 'topology.acker.executors=1' 

ただし、ボルトがはるかに遅いにもかかわらず、結果は依然として次のとおりです。

24790 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24942 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24944 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24946 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25143 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25144 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25350 [Thread-16-metadata-spout] INFO  backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
......

私の単純なカフカスパウト:

class MetadataSpout(Spout):

    def initialize(self, stormconf, context):
        self.log('----CONFIG: %s----' % stormconf)
        k = KafkaClient(os.getenv('KAFKA'))
        self.consumer = SimpleConsumer(k, 'vacuum', 'metadata')

    def next_tuple(self):
        self.log('----NEXT TUPLE----')
        messages = self.consumer.get_messages(count=os.getenv('BATCH_COUNT', 20))
        self.emit([json.dumps([m.message.value for m in messages])])

私のボルトにはデフォルトの構成しかありませんが、process() メソッドを完了するのにかなりの時間がかかります。それらがどのように問題になるかはわかりませんが、関連性がある場合は投稿できます。

4

1 に答える 1