2

Zeppelin NB と Spark をスタンドアロン モードで使用して、MacBook (i5、2.6GHz、8GB RAM) でいくつかの実験を行います。spark.executor/driver.memory は両方とも 2g になります。spark-defaults.conf にも設定spark.serializer org.apache.spark.serializer.KryoSerializerしましたが、zeppelin では無視されているようです


ALS モデル

〜400k(暗黙的)評価でALSモデルをトレーニングしましたが、推奨事項を取得したいと考えていますval allRecommendations = model.recommendProductsForUsers(1)

サンプルセット

次に、サンプルを使って遊んでみます

val sampledRecommendations = allRecommendations.sample(false, 0.05, 1234567).cache

これには 3600 件の推奨事項が含まれています。

ユーザーが所有するおすすめ商品を削除する

次に、特定のユーザーが既に所有している製品のすべての評価を削除します。このリストは、(user_id, Set[product_ids]) の形式の RDD で保持しています。RDD[(Long, scala.collection.mutable.HashSet[Int])]

val productRecommendations = (sampledRecommendations
// add user portfolio to the list, but convert the key from Long to Int first
.join(usersProductsFlat.map( up => (up._1.toInt, up._2) ))
.mapValues(
    // (user, (ratings: Array[Rating], usersOwnedProducts: HashSet[Long]))
    r => (r._1
        .filter( rating => !r._2.contains(rating.product))
        .filter( rating => rating.rating > 0.5)
        .toList
    )
  )
  // In case there is no recommendation (left), remove the entry
  .filter(rating => !rating._2.isEmpty)
).cache

質問 1キャッシュされたサンプル セットで これ ( ) を呼び出すと、10,000 のタスク、263.6 MB の入力データ、および 196.0 MB のシャッフル書き込みproductRecommendations.countを含むステージが生成されます。代わりに小さなキャッシュされたRDDを使用すべきではありませんか?ここで(wr)on(g)は何ですか? カウントの実行には約 5 分かかります。flatMap at MatrixFactorizationModel.scala:278

質問 2usersProductsFlat.countアプリケーション UI の「ストレージ」ビューに従って完全にキャッシュされる 呼び出しには、毎回最大 60 秒かかります。サイズは 23Mb です。これよりずっと高速ではないでしょうか。

読み取り可能な形式にマップする

次に、ID をブロードキャストされたルックアップ マップの名前に置き換えて、DF/テーブルに配置する読み取り可能な形式にします。

val readableRatings = (productRecommendations
    .flatMapValues(x=>x)
    .map( r => (r._1, userIdToMailBC.value(r._1), r._2.product.toInt, productIdToNameBC.value(r._2.product), r._2.rating))
).cache
val readableRatingsDF = readableRatings.toDF("user","email", "product_id", "product", "rating").cache
readableRatingsDF.registerTempTable("recommendations")

辛抱強く選択してください

非常識な部分はここから始まります。SELECT を実行するには数時間かかります(完了するまで待つことはできませんでした)。

%sql
SELECT COUNT(user) AS usr_cnt, product, AVG(rating) AS avg_rating
FROM recommendations
GROUP BY product

クエリは事実上永遠にかかります


ここでボトルネックを見つけるためにどこを探すべきかわかりません。明らかに、ここで大きな混乱が起こっています! どこから探し始めることができますか?

4

1 に答える 1

1

パーティションの数が多すぎる可能性があります。ローカル モードで実行する場合は、10000 ではなく 200 程度を使用する必要があると思います。パーティションの数はさまざまな方法で設定できます。Spark 構成ファイルの spark.default.parallelism フラグを編集することをお勧めします。

于 2015-11-16T17:27:49.220 に答える