4

MQTT ブローカーからデータを受信するトポロジがあり、スパウトを次のように動作させたい:

  1. タプルのバッチ (または単一のタプル内の文字列のリスト) を x 秒ごとに発行します。どうすればこれを達成できますか? Storm Trident について少し読みましたがIBatchSpout、特定の時間間隔でタプルをバッチで発行できないようです。

  2. 新しいデータが入ってこない場合、スパウトは何をすべきでしょうか? ストームのメイン スレッドなので、スレッドをブロックすることはできませんよね?

4

2 に答える 2

2

独自の MQTT スパウトを実装できます。例として、MongoSpoutを見てください。

重要な部分はnextTuple方法です。

このメソッドが呼び出されると、Storm はスパウトがタプルを出力コレクターに発行するように要求します。このメソッドはノンブロッキングである必要があるため、Spout に発行するタプルがない場合、このメソッドは返されます。 nextTuple、ack、fail はすべて、スパウト タスクの単一スレッド内のタイトなループで呼び出されます。発行するタプルがない場合、あまり CPU を浪費しないように、nextTuple を短時間 (1 ミリ秒程度) スリープさせるのが礼儀です。

一度に指定された時間を待ってはいけませんが、時々nextTupleタプルを発行するように実装できます。

private static final EMISSION_PERIOD = 2000; // 2 seconds
private long lastEmission;

@Override
public void nextTuple() {
    if (lastEmission == null ||
            lastEmission + EMISSION_PERIOD >= System.currentMillis()) {
        List<Object> tuple = pollMQTT();
        if (tuple != null) {
            this.collector.emit(tuple);
            return;
        }
    }
    Utils.sleep(50);
}

オープンソースのMQTT スパウトを見つけたことに注意してください。本番環境には対応していないように見えますが、出発点として使用できます。

于 2014-10-28T21:47:12.857 に答える
1

Christian に加えて、Storm の MQTT クライアントのこの実装を見つけました。前述のリンクはまだ開発されていません。

于 2015-01-20T22:23:21.793 に答える