1

databricksスパーク ストリーミングを使用してチェックポイントを使用すると、多くの問題が発生しました。以下のコードにより、クラスターでOOM エラーが発生しました。クラスターのメモリ使用量を調査すると、メモリが時間の経過とともにゆっくりと増加し、メモリ リークが発生していることがわかります (OOM の 10 日前まで、バッチは数分しか持続しません)。チェックポイントを削除して新しいチェックポイントを作成すると、メモリ リークがなくなりエラーがチェックポイントから発生したことが示されました。同様のストリーミング ジョブでは、一部のデータが処理されないという問題もありました (これも、チェックポイントを再作成した後に修正されました)。

免責事項: オンライン ドキュメントは回避的であるため、チェックポイントの詳細な動作を完全には理解していません。したがって、構成が適切かどうかはわかりません。

以下は、問題の最小限の例です。

pyspark 3.0.1、パイソン 3.7

クラスターの json conf には、次の要素があります。

  "spark_conf": {
    "spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite": "true",
    "spark.databricks.delta.properties.defaults.autoOptimize.autoCompact": "true"
  }

コード:

import pandas as pd
from pyspark.sql import functions as F

def for_each_batch(data, epoch_id):
   pass

spark.readStream.format("delta").load("path/to/delta").filter(
F.col("TIME") > pd.Timestamp.utcnow() - pd.Timedelta(hours=1)
).writeStream.option(
"ignoreChanges", "true"
).option(
"checkpointLocation", "path/to/checkpoint"
).trigger(
processingTime="3 minutes"
).foreachBatch(
for_each_batch
).start()

PS: 関数「for_each_batch」の内容やフィルタリング条件を変更した場合、チェックポイントを再作成する必要がありますか?

4

0 に答える 0