1 つの名前ノードと 2 つのデータ ノードを持つ YARN ec2 クラスターにデプロイされた Spark ストリーミング アプリがあります。それぞれ 1 つのコアと 588 MB の RAM を備えた 11 のエグゼキューターでアプリを提出します。アプリは、常に書き込まれている S3 のディレクトリからストリーミングします。これは、それを実現するコード行です。
val ssc = new StreamingContext(sparkConf, Seconds(10))
val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](Settings.S3RequestsHost , (f:Path)=> true, true )
//some maps and other logic here
ssc.start()
ssc.awaitTermination()
textFileStream の代わりに fileStream を使用する目的は、プロセスの開始時に Spark が既存のファイルを処理する方法をカスタマイズすることです。プロセスの起動後に追加された新しいファイルのみを処理し、既存のファイルを省略したいと考えています。10 秒のバッチ期間を構成しました。
s3 に少数のファイル (4 つまたは 5 つとしましょう) を追加している間、プロセスはうまくいきます。ストリーミング UI で、処理されるファイルごとに 1 つずつ、エグゼキューターでステージが正常に実行される様子を確認できます。しかし、より多くのファイルを追加しようとすると、奇妙な動作に直面することがあります。アプリケーションは、既にストリーミングされたファイルのストリーミングを開始します。
たとえば、s3 に 20 個のファイルを追加します。ファイルは 3 つのバッチで処理されます。最初のバッチは 7 個のファイルを処理し、2 回目は 8 個、3 回目は 5 個のファイルを処理します。この時点で S3 に追加されるファイルはありませんが、Spark は同じファイルを使用してこれらのフェーズを際限なく繰り返します。 これを引き起こしている可能性のある考えはありますか?
この問題の Jira チケットを投稿しました: https://issues.apache.org/jira/browse/SPARK-3553