0

トライデント ストームでストリームに関数を適用するには、新しく作成したインスタンスをeach、次のようにストリームで呼び出されるメソッドに渡します。

stream.each(inputFields, new SomeFunc(), outputFields)

SomeFuncBaseFunc の子孫です。

に状態変数が必要だとしますSomeFunc

class SomeFunc extends BaseFunction {

  var someState: String = _

  override def execute(tuple: TridentTuple, collector: TridentCollector) = ???
}

SomeFunc コンポーネントの並列処理ヒントを 1 より大きい値に設定すると、SomeFunc?の複数のインスタンスが作成されます。SomeFunc の someState へのアクセス/更新はスレッドセーフ操作ですか? SomeClass をクラスとして定義する代わりにオブジェクトとして定義すると、smth は変更されますか?

編集 OK、彼の回答へのコメントでユーザー @Shaw の助けを借りて、ストームがエグゼキューターごとにストーム コンポーネント (ストーム/ボルト/関数/アグリゲーターなど) のインスタンスを 1 つ作成することを知りました。問題は、これをどのように行うかです。この行動のメカニズムを知りたい

4

1 に答える 1

1

Trident が正確にどのように機能するかはわかりませんが、Storm で並列処理のヒント > 1 を定義すると、ワーカー プロセスによって生成されたスレッドであるそのコンポーネント の複数のエグゼキュータが作成されます。

そのエグゼキューターは X (タスクの数、デフォルトでは 1) の SomeFunc の「インスタンス」を作成し、変数 someState をそれらの間で共有しません。ストームが機能するため、 someStateタプルがコンポーネントに到着するときに「独自のスレッドで順次」実行されるため、スレッドセーフです。

単純にストームに対するマイクロバッチの抽象化であるため、トライデントでも同じであるとほぼ確信しています。

読んだことは確かですが、そうでない場合は、Storm の並列処理に関するこの素晴らしい記事を読むことを強くお勧めします。

于 2014-09-20T09:31:10.387 に答える