1

EMR で PySpark を使用して、S3 で SequenceFiles として保存されているデータを分析しようとしていますが、データの局所性が原因でパフォーマンスの問題が発生しています。以下は、うまく動作しない非常に単純なサンプルです。

seqRDD = sc.sequenceFile("s3n://<access>:<secret>@<bucket>/<table>/day=2015-07-04/hour=*/*")
seqRDD.count()

問題はcountアクションにあります。正常に動作しますが、タスクの分散は非常に貧弱です。何らかの理由で、Spark ログで、クラスターの 2 つの IP のみが実際の作業を行っており、残りはアイドル状態になっています。5 ノード クラスタと 50 ノード クラスタで試してみましたが、ログには常に 2 つの IP しか表示されません。

また、非常に奇妙なことに、これら 2 つの IP には RACK_LOCAL という局所性があります。データが S3 にあるためローカルではないためだと推測していますが、Spark が 2 つのインスタンスだけでなくクラスター全体を使用するようにするにはどうすればよいですか?

EMR での Spark 構成に固有のことは何もしませんでした。ネイティブ アプリを介して EMR にインストールするだけで、構成の最適化が自動的に処理されると思います。

ログでこれを見ました。これはallowLocal=false問題である可能性がありますが、それについては何も見つかりませんでした:


15/07/17 23:55:27 INFO spark.SparkContext: Starting job: count at :1
15/07/17 23:55:27 INFO scheduler.DAGScheduler: Got job 1 (count at :1) with 1354 output partitions (allowLocal=false)
15/07/17 23:55:27 INFO scheduler.DAGScheduler: Final stage: Stage 1(count at :1)

の実行時に続くいくつかのログにはcount、2 つの IP のみが示されています。


15/07/17 23:55:28 INFO scheduler.DAGScheduler: Submitting 1354 missing tasks from Stage 1 (PythonRDD[3] at count at :1)
15/07/17 23:55:28 INFO cluster.YarnScheduler: Adding task set 1.0 with 1354 tasks
15/07/17 23:55:28 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1418 bytes)
15/07/17 23:55:28 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 2, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:28 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on ip-172-31-36-179.ec2.internal:39998 (size: 3.7 KB, free: 535.0 MB)
15/07/17 23:55:28 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on ip-172-31-41-210.ec2.internal:36847 (size: 3.7 KB, free: 535.0 MB)
15/07/17 23:55:29 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-172-31-41-210.ec2.internal:36847 (size: 18.8 KB, free: 535.0 MB)
15/07/17 23:55:31 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 3, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1421 bytes)
15/07/17 23:55:31 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 3501 ms on ip-172-31-41-210.ec2.internal (1/1354)
15/07/17 23:55:31 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 1.0 (TID 4, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:31 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 1.0 (TID 3) in 99 ms on ip-172-31-41-210.ec2.internal (2/1354)
15/07/17 23:55:33 INFO scheduler.TaskSetManager: Starting task 4.0 in stage 1.0 (TID 5, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:33 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 2) in 5190 ms on ip-172-31-36-179.ec2.internal (3/1354)
15/07/17 23:55:36 INFO scheduler.TaskSetManager: Starting task 5.0 in stage 1.0 (TID 6, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:36 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 1.0 (TID 4) in 4471 ms on ip-172-31-41-210.ec2.internal (4/1354)
15/07/17 23:55:37 INFO scheduler.TaskSetManager: Starting task 6.0 in stage 1.0 (TID 7, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:37 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0 (TID 5) in 3676 ms on ip-172-31-36-179.ec2.internal (5/1354)
15/07/17 23:55:40 INFO scheduler.TaskSetManager: Starting task 7.0 in stage 1.0 (TID 8, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:40 INFO scheduler.TaskSetManager: Finished task 5.0 in stage 1.0 (TID 6) in 3895 ms on ip-172-31-41-210.ec2.internal (6/1354)
15/07/17 23:55:40 INFO scheduler.TaskSetManager: Starting task 8.0 in stage 1.0 (TID 9, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes)
4

0 に答える 0