2

数千のファイルを入力として取り、Amazon S3 からダウンロードし、各マップ ステップが文字列を返すマップ フェーズで処理する Spark ジョブがあります。.tar.gz出力をファイルに圧縮し、後で S3 にアップロードしたいと思います。それを行う1つの方法は

outputs = sc.map(filenames).collect()
for output in outputs:
    with tempfile.NamedTemporaryFile() as tar_temp:
        tar = tarfile.open(tar_temp.name, "w:gz")
        for output in outputs:
            with tempfile.NamedTemporaryFile() as output_temp:
                output_temp.write(output)
                tar.add(output_temp.name)
        tar.close()

問題は、outputsメモリに収まらないことです (ただし、ディスクには収まります)。マップフェーズで出力をマスターファイルシステムに保存する方法はありますか? それとも、ループfor output in outputsをジェネレーターとして使用して、すべてをメモリにロードする必要がないようにしますか?

4

1 に答える 1

1

toLocalIteratorSpark 1.3.0では、Python で同じ Java/Scala メソッドを使用できるようになります。

プル リクエストがマージされました: https://github.com/apache/spark/pull/4237

指定されたドキュメントは次のとおりです。

    """
    Return an iterator that contains all of the elements in this RDD.
    The iterator will consume as much memory as the largest partition in this RDD.
    >>> rdd = sc.parallelize(range(10))
    >>> [x for x in rdd.toLocalIterator()]
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    """

全体として、ドライバーにすべてを収集することなく、出力を反復処理できます。

よろしく、

于 2015-03-03T18:32:28.650 に答える