これらの構成を持つ2台のマシンでSparkをスタンドアロンモードで実行しています
- 500GB メモリ、4 コア、7.5 RAM
- 250GB メモリ、8 コア、15 RAM
8 コア マシンでマスターとスレーブを作成し、ワーカーに 7 コアを与えました。3 つのワーカー コアを備えた 4 コア マシンで別のスレーブを作成しました。UI には、8 コアと 4 コアでそれぞれ 13.7 G と 6.5 G の使用可能な RAM が表示されます。
ここで、15 日間にわたるユーザー評価の集計を処理する必要があります。Pyspark を使用してこれを実行しようとしています。このデータは、s3 バケットの日単位のディレクトリにある時間単位のファイルに保存されます。たとえば、すべてのファイルは約 100MB である必要があります。
s3://some_bucket/2015-04/2015-04-09/data_files_hour1
私はこのようなファイルを読んでいます
a = sc.textFile(files, 15).coalesce(7*sc.defaultParallelism) #to restrict partitions
files は、「s3://some_bucket/2015-04/2015-04-09/*,s3://some_bucket/2015-04/2015-04-09/*」という形式の文字列です。
次に、一連のマップとフィルターを実行し、結果を保持します
a.persist(StorageLevel.MEMORY_ONLY_SER)
次に、reduceByKey を実行して、数日間の集計スコアを取得する必要があります。
b = a.reduceByKey(lambda x, y: x+y).map(aggregate)
b.persist(StorageLevel.MEMORY_ONLY_SER)
次に、ユーザーが評価したアイテムの実際の用語を redis 呼び出しする必要があるため、このように mapPartitions を呼び出します。
final_scores = b.mapPartitions(get_tags)
get_tags関数は、呼び出しのたびに redis 接続を作成し、redis を呼び出して(ユーザー、アイテム、レート)タプルを生成します (redis ハッシュは 4core に保存されます)。
SparkConf の設定を微調整しました
conf = (SparkConf().setAppName(APP_NAME).setMaster(master)
.set("spark.executor.memory", "5g")
.set("spark.akka.timeout", "10000")
.set("spark.akka.frameSize", "1000")
.set("spark.task.cpus", "5")
.set("spark.cores.max", "10")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryoserializer.buffer.max.mb", "10")
.set("spark.shuffle.consolidateFiles", "True")
.set("spark.files.fetchTimeout", "500")
.set("spark.task.maxFailures", "5"))
ここではクラスター モードがサポートされていないようなので、クライアント モードで 2g のドライバー メモリを使用してジョブを実行します。上記のプロセスは、2 日間のデータ (約 2.5 時間) に長い時間がかかり、14 日間で完全に断念されます。
ここで何を改善する必要がありますか?
- このインフラストラクチャは、RAM とコアの点で不十分ですか (これはオフラインであり、数時間かかる場合がありますが、5 時間程度で終了する必要があります)
- パーティションの数を増減する必要がありますか?
- Redis はシステムを遅くしている可能性がありますが、キーの数が多すぎて 1 回の呼び出しを行うことができません。
- ファイルの読み取り中または縮小中のどこでタスクが失敗しているのかわかりません。
- Scala でより優れた Spark API を使用する場合、Python を使用しない方がよいでしょうか? それによって効率も向上しますか?
これは例外トレースです
Lost task 4.1 in stage 0.0 (TID 11, <node>): java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:152)
at java.net.SocketInputStream.read(SocketInputStream.java:122)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
at sun.security.ssl.InputRecord.read(InputRecord.java:509)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:934)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:891)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
at org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:198)
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:200)
at org.apache.http.impl.io.ContentLengthInputStream.close(ContentLengthInputStream.java:103)
at org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:164)
at org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:227)
at org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:174)
at org.apache.http.util.EntityUtils.consume(EntityUtils.java:88)
at org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.releaseConnection(HttpMethodReleaseInputStream.java:102)
at org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.close(HttpMethodReleaseInputStream.java:194)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.seek(NativeS3FileSystem.java:152)
at org.apache.hadoop.fs.BufferedFSInputStream.seek(BufferedFSInputStream.java:89)
at org.apache.hadoop.fs.FSDataInputStream.seek(FSDataInputStream.java:63)
at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:126)
at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:236)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:212)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
事前に感謝します。
ここに私のメインコードがどのように見えるかがあります
def main(sc):
f=get_files()
a=sc.textFile(f, 15)
.coalesce(7*sc.defaultParallelism)
.map(lambda line: line.split(","))
.filter(len(line)>0)
.map(lambda line: (line[18], line[2], line[13], line[15])).map(scoring)
.map(lambda line: ((line[0], line[1]), line[2])).persist(StorageLevel.MEMORY_ONLY_SER)
b=a.reduceByKey(lambda x, y: x+y).map(aggregate)
b.persist(StorageLevel.MEMORY_ONLY_SER)
c=taggings.mapPartitions(get_tags)
c.saveAsTextFile("f")
a.unpersist()
b.unpersist()
get_tags 関数は
def get_tags(partition):
rh = redis.Redis(host=settings['REDIS_HOST'], port=settings['REDIS_PORT'], db=0)
for element in partition:
user = element[0]
song = element[1]
rating = element[2]
tags = rh.hget(settings['REDIS_HASH'], song)
if tags:
tags = json.loads(tags)
else:
tags = scrape(song, rh)
if tags:
for tag in tags:
yield (user, tag, rating)
get_files 関数は次のとおりです。
def get_files():
paths = get_path_from_dates(DAYS)
base_path = 's3n://acc_key:sec_key@bucket/'
files = list()
for path in paths:
fle = base_path+path+'/file_format.*'
files.append(fle)
return ','.join(files)
get_path_from_dates(DAYS) は
def get_path_from_dates(last):
days = list()
t = 0
while t <= last:
d = today - timedelta(days=t)
path = d.strftime('%Y-%m')+'/'+d.strftime('%Y-%m-%d')
days.append(path)
t += 1
return days