トライデント ストームでストリームに関数を適用するには、新しく作成したインスタンスをeach
、次のようにストリームで呼び出されるメソッドに渡します。
stream.each(inputFields, new SomeFunc(), outputFields)
はSomeFunc
BaseFunc の子孫です。
に状態変数が必要だとしますSomeFunc
。
class SomeFunc extends BaseFunction {
var someState: String = _
override def execute(tuple: TridentTuple, collector: TridentCollector) = ???
}
SomeFunc コンポーネントの並列処理ヒントを 1 より大きい値に設定すると、SomeFunc
?の複数のインスタンスが作成されます。SomeFunc の someState へのアクセス/更新はスレッドセーフ操作ですか? SomeClass をクラスとして定義する代わりにオブジェクトとして定義すると、smth は変更されますか?
編集 OK、彼の回答へのコメントでユーザー @Shaw の助けを借りて、ストームがエグゼキューターごとにストーム コンポーネント (ストーム/ボルト/関数/アグリゲーターなど) のインスタンスを 1 つ作成することを知りました。問題は、これをどのように行うかです。この行動のメカニズムを知りたい