0

私は FastAPI を使用して、ユーザーが次のことを行うためにリクエストできる API に取り組んでいます。

  1. まず、get リクエストが Google Cloud Storage からファイルを取得し、それを pyspark DataFrame に読み込みます。
  2. 次に、アプリケーションは DataFrame でいくつかの変換を実行します
  3. 最後に、DataFrame を寄木細工のファイルとしてユーザーのディスクに書き込みたいと思います。

いくつかの理由により、ファイルを寄木細工の形式でユーザーに配信する方法がよくわかりません。

  • df.write.parquet('out/path.parquet')データをディレクトリに書き込みますが、データout/path.parquetを渡そうとすると問題が発生しますstarlette.responses.FileResponse
  • 存在することがわかっている単一の .parquet ファイルを渡すとstarlette.responses.FileResponse、バイナリがコンソールに出力されているように見えます (以下のコードで示されているように)。
  • pandas のようにDataFrame を BytesIO ストリームに書き込むことは有望に思えましたが、DataFrame のメソッドまたは DataFrame.rdd のメソッドを使用してそれを行う方法がわかりません。

これは FastAPI でも可能ですか? send_file()を使用してFlaskで可能ですか?

ここに私がこれまで持っているコードがあります。コメント付きのコードのようないくつかのことを試してみましたが、役に立たなかったことに注意してください。

import tempfile

from fastapi import APIRouter
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from starlette.responses import FileResponse


router = APIRouter()
sc = SparkContext('local')
spark = SparkSession(sc)

df: spark.createDataFrame = spark.read.parquet('gs://my-bucket/sample-data/my.parquet')

@router.get("/applications")
def applications():
    df.write.parquet("temp.parquet", compression="snappy")
    return FileResponse("part-some-compressed-file.snappy.parquet")
    # with tempfile.TemporaryFile() as f:
    #     f.write(df.rdd.saveAsPickleFile("temp.parquet"))
    #     return FileResponse("test.parquet")

ありがとう!

編集:ここで提供されている回答と情報を使用してみましたが、うまく機能しません。

4

1 に答える 1

0

問題を解決できましたが、エレガントとは言えません。ディスクに書き込まない解決策を誰かが提供できれば、私はそれを大いに感謝し、あなたの答えを正しいものとして選択します。

を使用して DataFrame をシリアル化しdf.rdd.saveAsPickleFile()、結果のディレクトリを zip して Python クライアントに渡し、結果の zipfile をディスクに書き込み、解凍してから、SparkContext().pickleFile最終的に DataFrame をロードする前に使用することができました。理想には程遠いと思います。

API:

import shutil
import tempfile

from fastapi import APIRouter
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from starlette.responses import FileResponse


router = APIRouter()
sc = SparkContext('local')
spark = SparkSession(sc)

df: spark.createDataFrame = spark.read.parquet('gs://my-bucket/my-file.parquet')

@router.get("/applications")
def applications():
    temp_parquet = tempfile.NamedTemporaryFile()
    temp_parquet.close()
    df.rdd.saveAsPickleFile(temp_parquet.name)

    shutil.make_archive('test', 'zip', temp_parquet.name)

    return FileResponse('test.zip')

クライアント:

import io
import zipfile

import requests

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext('local')
spark = SparkSession(sc)

response = requests.get("http://0.0.0.0:5000/applications")
file_like_object = io.BytesIO(response.content)
with zipfile.ZipFile(file_like_object) as z:
    z.extractall('temp.data')

rdd = sc.pickleFile("temp.data")
df = spark.createDataFrame(rdd)

print(df.head())
于 2020-01-16T01:59:38.617 に答える