時間枠内でアクセスされた上位 n 個の URL を見つけるためのスパーク アプリを作成しています。389451
ただし、このジョブは実行され続け、インスタンスの ES のレコードに数時間かかります。この時間を減らしたい。
私は以下のようにスパークのエラスティック検索から読んでいます
val df = sparkSession.read
.format("org.elasticsearch.spark.sql")
.load(date + "/" + business)
.withColumn("ts_str", date_format($"ts", "yyyy-MM-dd HH:mm:ss")).drop("ts").withColumnRenamed("ts_str", "ts")
.select(selects.head, selects.tail:_*)
.filter($"ts" === ts)
.withColumn("url", split($"uri", "\\?")(0)).drop("uri").withColumnRenamed("url", "uri").cache()
上記の DF では、ElasticSearch から読み取りとフィルタリングを行っています。また、URI からクエリ パラメータを削除しています。
次に、グループ化を行っています
var finalDF = df.groupBy("col1","col2","col3","col4","col5","uri").agg(sum("total_bytes").alias("total_bytes"), sum("total_req").alias("total_req"))
次に、ウィンドウ関数を実行しています
val partitionBy = Seq("col1","col2","col3","col4","col5")
val window = Window.partitionBy(partitionBy.head, partitionBy.tail:_*).orderBy(desc("total_req"))
finalDF = finalDF.withColumn("rank", rank.over(window)).where($"rank" <= 5).drop("rank")
それから私はcassandraにfinalDFを書いています
finalDF.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "table", "keyspace" -> "keyspace")).mode(SaveMode.Append).save()
ES クラスターに 4 つのデータ ノードがあり、Spark マシンは 16 コア 64GB RAM VM です。問題がどこにあるかを見つけるのを手伝ってください。