Storm (こちらを参照) をプロジェクトに統合しようとしています。トポロジ、スパウト、ボルトの概念を理解しました。しかし今、私はいくつかのことの実際の実装を理解しようとしています。
A) Java と Clojure を使用した多言語環境があります。私の Java コードは、ストリーミング データを起動するメソッドを持つコールバック クラスです。これらのメソッドにプッシュされたイベント データは、スパウトとして使用したいものです。
したがって、最初の質問は、これらのメソッドに入ってくるデータをスパウトに接続する方法です。i) backtype.storm.topology.IRichSpoutを渡し、次にii) backtype.storm.spout.SpoutOutputCollector (こちらを参照) をそのスパウトのopen関数 (こちらを参照) に渡そうとしています。しかし、実際にマップやリストを渡す方法がわかりません。
B)私のプロジェクトの残りの部分はすべて Clojure です。これらの方法を介して大量のデータが送信されます。各イベントには 1 ~ 100 の ID があります。Clojure では、スパウトからのデータを別の実行スレッドに分割したいと考えています。それらがボルトになると思います。
スパウトからイベント データを取得し、着信イベントの ID に基づいてスレッドを分割するように Clojure ボルトを設定するにはどうすればよいですか?
前もって感謝します ティム
[編集1]
私は実際にこの問題を乗り越えました。結局、1)独自の IRichSpout を実装しました。次に、2)そのスパウトの内部タプルを Java コールバック クラスの着信ストリーム データに接続しました。これが慣用句かどうかはわかりません。しかし、エラーなしでコンパイルおよび実行されます。ただし、3) printstuffボルトを介して着信するストリーム データが (確実に) 表示されません。
イベント データが確実に伝播されるようにするために、スパウトまたはボルトの実装またはトポロジ定義で何か特別な作業を行う必要がありますか? ありがとう。
;; 作成したスパウトに Java コールバックを結び付ける
(.setSpout Java コールバック ibspout)
(storm/defbolt printstuff ["単語"] [タプルコレクター]
(println (str "printstuff --> tuple["tuple"] > コレクター["collector"]"))
)
(ストーム/トポロジー
{ "1" (ストーム/スパウト仕様の ibspout)
}
{ "3" (嵐/ボルト仕様 { "1" :シャッフル }
印刷物
)
}))
[編集2]
SO メンバーの Ankur のアドバイスを受けて、トポロジーを再調整しています。Java コールバックを作成したら、そのタプルを以下の IBSpout に渡します(.setTuple ibspout (.getTuple java-callback))。NotSerializable エラーが発生するため、Java コールバック オブジェクト全体を渡しません。すべてがエラーなしでコンパイルおよび実行されます。しかし、繰り返しますが、プリントスタッフのボルトにデータが来ていません。うーん。
public class IBSpout は IRichSpout を実装します {
/**
*ストームスパウトのもの
*/
プライベート SpoutOutputCollector _collector;
プライベート リスト _tuple = 新しい ArrayList();
public void setTuple(List tuple) { _tuple = タプル; }
public List getTuple() { return _tuple; }
/**
* Storm ISpout インターフェイス関数
*/
public void open(Map conf、TopologyContext コンテキスト、SpoutOutputCollector コレクター) {
_collector = コレクター;
}
public void close() {}
public void activate() {}
public void activate() {}
public void nextTuple() {
_collector.emit(_tuple);
}
public void ack(Object msgId) {}
public void fail(Object msgId) {}
public void declareOutputFields(OutputFieldsDeclarer 宣言者) {}
public java.util.Map getComponentConfiguration() { return new HashMap(); }
}