13

説明

Scala に Spark Streaming 1.5.2 アプリケーションがあり、Kinesis Stream から JSON イベントを読み取り、いくつかの変換/集約を行い、結果をさまざまな S3 プレフィックスに書き込みます。現在のバッチ間隔は 60 秒です。1 秒あたり 3000 ~ 7000 のイベントがあります。集計が失われないように、チェックポイントを使用しています。

しばらくの間、例外やクラスターの再起動から回復しており、うまく機能しています。最近、Spark Streaming 1.6.0 のコードを再コンパイルし、build.sbtファイルのライブラリ依存関係のみを変更しました。コードを Spark 1.6.0 クラスターで数時間実行した後、次のことに気付きました。

  1. 1.6.0 では、「入力レート」と「処理時間」のボラティリティが大幅に増加しました (下のスクリーンショットを参照)。
  2. 数時間ごとに、「レコードの書き込み中に例外がスローされます: BlockAdditionEvent ... が WriteAheadLog に記録されます。java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds] 例外 (以下の完全なスタック トレースを参照) は、特定のバッチ (分) の 0 イベント/秒への低下と一致しています。

掘り下げた後、2番目の問題はこのプルリクエストに関連しているように見えます。PR の最初の目標: 「S3 を WAL のディレクトリとして使用すると、書き込みに時間がかかりすぎます。複数のレシーバーが AddBlock イベントを ReceiverTracker に送信すると、ドライバーは非常に簡単にボトルネックになります。この PR は、レシーバーが長時間ドライバーによってブロックされないように、ReceivedBlockTracker にイベントのバッチ処理を追加します。」</p>

Spark 1.5.2 の S3 でチェックポイントを行っており、パフォーマンスや信頼性の問題はありません。S3 およびローカル NAS で Spark 1.6.0 のチェックポイントをテストしましたが、どちらの場合もこの例外が発生しています。バッチのチェックポイントに 5 秒以上かかると、この例外が発生し、そのバッチのイベントが永久に失われることを確認しました。

質問

  • Spark Streaming 1.6.0 で予想される「入力レート」と「処理時間」のボラティリティの増加はありますか? また、それを改善する既知の方法はありますか?

  • これら2以外の回避策を知っていますか?:

    1) チェックポインティング シンクがすべてのファイルを書き込むのに 5 秒未満かかることを保証するため。私の経験では、小さなバッチであっても、S3 でそれを保証することはできません。ローカル NAS の場合は、インフラストラクチャの担当者によって異なります (クラウド プロバイダーでは難しい)。

    2) spark.streaming.driver.writeAheadLog.batchingTimeout プロパティの値を増やします。

  • 説明されているシナリオでイベントが失われると思いますか? バッチ チェックポイントが失敗した場合、シャード/レシーバーのシーケンス番号は増加せず、後で再試行されると思います。

Spark 1.5.2 統計 - スクリーンショット

ここに画像の説明を入力

Spark 1.6.0 統計 - スクリーンショット

ここに画像の説明を入力

完全なスタック トレース

16/01/19 03:25:03 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(3521),Some(SequenceNumberRanges(SequenceNumberRange(StreamEventsPRD,shardId-000000000003,49558087746891612304997255299934807015508295035511636018,49558087746891612304997255303224294170679701088606617650), SequenceNumberRange(StreamEventsPRD,shardId-000000000004,49558087949939897337618579003482122196174788079896232002,49558087949939897337618579006984380295598368799020023874), SequenceNumberRange(StreamEventsPRD,shardId-000000000001,49558087735072217349776025034858012188384702720257294354,49558087735072217349776025038332464993957147037082320914), SequenceNumberRange(StreamEventsPRD,shardId-000000000009,49558088270111696152922722880993488801473174525649617042,49558088270111696152922722884455852348849472550727581842), SequenceNumberRange(StreamEventsPRD,shardId-000000000000,49558087841379869711171505550483827793283335010434154498,49558087841379869711171505554030816148032657077741551618), SequenceNumberRange(StreamEventsPRD,shardId-000000000002,49558087853556076589569225785774419228345486684446523426,49558087853556076589569225789389107428993227916817989666))),BlockManagerBasedStoreResult(input-0-1453142312126,Some(3521)))) to the WriteAheadLog.
java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81)
    at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232)
    at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:87)
    at org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:321)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:500)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1230)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:498)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

ソースコードの抜粋

...
     // Function to create a new StreamingContext and set it up
  def setupContext(): StreamingContext = {
    ...
    // Create a StreamingContext
    val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))

    // Create a Kinesis DStream
    val data = KinesisUtils.createStream(ssc,
      kinesisAppName, kinesisStreamName,
      kinesisEndpointUrl, RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName(),
      InitialPositionInStream.LATEST, Seconds(kinesisCheckpointIntervalSeconds),
      StorageLevel.MEMORY_AND_DISK_SER_2, awsAccessKeyId, awsSecretKey)
...
    ssc.checkpoint(checkpointDir)

    ssc
  }


  // Get or create a streaming context.
  val ssc = StreamingContext.getActiveOrCreate(checkpointDir, setupContext)

  ssc.start()
  ssc.awaitTermination()
4

1 に答える 1

5

私のコメントを回答として投稿することに関するzero323の提案に従います。

spark.streaming.driver.writeAheadLog.batchingTimeout を増やすと、チェックポイントのタイムアウトの問題が解決しました。余裕があることを確認してから行いました。私たちはしばらくの間それをテストしてきました。したがって、慎重に検討してから増やすことをお勧めします。

詳細

$SPARK_HOME/conf/spark-defaults.conf で次の 2 つの設定を使用しました。

spark.streaming.driver.writeAheadLog.allowBatching true spark.streaming.driver.writeAheadLog.batchingTimeout 15000

最初は、spark.streaming.driver.writeAheadLog.allowBatching を true に設定するだけでした。

変更前に、質問に記載されている問題 (「...ReceivedBlockTracker: レコードの書き込み中に例外がスローされました...」) をテスト環境で再現しました。数時間おきに発生しました。変更後、問題はなくなりました。本番環境に移行する前に、数日間実行しました。

次に示すように、 WriteAheadLogUtilsクラスのgetBatchingTimeout()メソッドのデフォルト値が 5000ms であることがわかりました。

def getBatchingTimeout(conf: SparkConf): Long = {
    conf.getLong(DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY, defaultValue = 5000)
}
于 2016-07-29T11:18:56.277 に答える