MQTT ブローカーからデータを受信するトポロジがあり、スパウトを次のように動作させたい:
タプルのバッチ (または単一のタプル内の文字列のリスト) を x 秒ごとに発行します。どうすればこれを達成できますか? Storm Trident について少し読みましたが
IBatchSpout
、特定の時間間隔でタプルをバッチで発行できないようです。新しいデータが入ってこない場合、スパウトは何をすべきでしょうか? ストームのメイン スレッドなので、スレッドをブロックすることはできませんよね?
MQTT ブローカーからデータを受信するトポロジがあり、スパウトを次のように動作させたい:
タプルのバッチ (または単一のタプル内の文字列のリスト) を x 秒ごとに発行します。どうすればこれを達成できますか? Storm Trident について少し読みましたがIBatchSpout
、特定の時間間隔でタプルをバッチで発行できないようです。
新しいデータが入ってこない場合、スパウトは何をすべきでしょうか? ストームのメイン スレッドなので、スレッドをブロックすることはできませんよね?
独自の MQTT スパウトを実装できます。例として、MongoSpoutを見てください。
重要な部分はnextTuple
方法です。
このメソッドが呼び出されると、Storm はスパウトがタプルを出力コレクターに発行するように要求します。このメソッドはノンブロッキングである必要があるため、Spout に発行するタプルがない場合、このメソッドは返されます。 nextTuple、ack、fail はすべて、スパウト タスクの単一スレッド内のタイトなループで呼び出されます。発行するタプルがない場合、あまり CPU を浪費しないように、nextTuple を短時間 (1 ミリ秒程度) スリープさせるのが礼儀です。
一度に指定された時間を待ってはいけませんが、時々nextTuple
タプルを発行するように実装できます。
private static final EMISSION_PERIOD = 2000; // 2 seconds
private long lastEmission;
@Override
public void nextTuple() {
if (lastEmission == null ||
lastEmission + EMISSION_PERIOD >= System.currentMillis()) {
List<Object> tuple = pollMQTT();
if (tuple != null) {
this.collector.emit(tuple);
return;
}
}
Utils.sleep(50);
}
オープンソースのMQTT スパウトを見つけたことに注意してください。本番環境には対応していないように見えますが、出発点として使用できます。
Christian に加えて、Storm の MQTT クライアントのこの実装を見つけました。前述のリンクはまだ開発されていません。