0

データ取り込み用のプログラムを書いています。Kafka から DStream への読み取りは、Dstrem を 3 つのストリームに分割し、それぞれでアクションを実行します。

val stream = createSparkStream(Globals.configs, ssc)
val s1 = stream.filter(<predicat1>)
val s2 = stream.filter(<predicat2>)
val s3 = stream.filter(<predicat3>)

//I'm looking for something like:
s1.forEachRddAsync(...
s2.forEachRddAsync(...
s3.forEachRddAsync(... 

RDDではなくDStream全体で非同期送信をトリガーできる場合。

4

1 に答える 1

0

DStreamアクション メソッドは、実際にはブロックしていますが、データを処理しません。これらDStreamは出力ストリームとしてのみ登録されます。

StreamingContext開始されると、利用可能なリソースに応じて処理がスケジュールされ、これらが許可されている場合は、相互に制限されることなく処理されます。

于 2018-10-14T09:33:16.030 に答える