6

Amazon S3 に保存する前に、画像をアップロードしてサイズ変更するセロリ タスクを作成しようとしています。しかし、期待どおりには機能しません。タスクがなければ、すべてが正常に機能しています。これまでのコードは次のとおりです。

スタックトレース

Traceback (most recent call last):
  File "../myVE/lib/python2.7/site-packages/kombu/messaging.py", line 579, in _receive_callback
    decoded = None if on_m else message.decode()
  File "../myVE/lib/python2.7/site-packages/kombu/transport/base.py", line 147, in decode
    self.content_encoding, accept=self.accept)
  File "../myVE/lib/python2.7/site-packages/kombu/serialization.py", line 187, in decode
    return decode(data)
  File "../myVE/lib/python2.7/site-packages/kombu/serialization.py", line 74, in pickle_loads
    return load(BytesIO(s))
  File "../myVE/lib/python2.7/site-packages/werkzeug/datastructures.py", line 2595, in __getattr__
    return getattr(self.stream, name)
  File "../myVE/lib/python2.7/site-packages/werkzeug/datastructures.py", line 2595, in __getattr__
    return getattr(self.stream, name)
    ...
RuntimeError: maximum recursion depth exceeded while calling a Python object

ビュー.py

from PIL import Image

from flask import Blueprint, redirect, render_template, request, url_for

from myapplication.forms import UploadForm
from myapplication.tasks import upload_task


main = Blueprint('main', __name__)

@main.route('/upload', methods=['GET', 'POST'])
def upload():
    form = UploadForm()
    if form.validate_on_submit():
        upload_task.delay(form.title.data, form.description.data,
                          Image.open(request.files['image']))
        return redirect(url_for('main.index'))
    return render_template('upload.html', form=form)

タスク.py

from StringIO import StringIO

from flask import current_app

from myapplication.extensions import celery, db
from myapplication.helpers import resize, s3_upload
from myapplication.models import MyObject


@celery.task(name='tasks.upload_task')
def upload_task(title, description, source):
    stream = StringIO()
    target = resize(source, current_app.config['SIZE'])
    target.save(stream, 'JPEG', quality=95)
    stream.seek(0)
    obj = MyObject(title=title, description=description, url=s3_upload(stream))
    db.session.add(obj)
    db.session.commit()
4

4 に答える 4

13

これは非常に古い質問であることは知っていますが、ファイルの内容をセロリタスクに渡すのに苦労していました。他の人が行ったことに従おうとすると、エラーが発生し続けます。だから私はこれを書いて、将来他の人に役立つことを願っています.

TL;DR

  • ファイルの内容を base64 エンコーディングでセロリ タスクに送信します。
  • セロリ タスクでデータをデコードしio.BytesIO、ストリームに使用する

長い答え

画像をディスクに保存して再度読み取ることに興味がなかったので、バックグラウンドでファイルを再構築するために必要なデータを渡したかったのです。

他の人が提案することに従おうとすると、エンコードエラーが発生し続けました. エラーの一部は次のとおりです。

  • UnicodeDecodeError: 'utf-8' codec can't decode byte 0xff in position 0: invalid start byte
  • TypeError: initial_value must be str or None, not bytes

TypeErrorによって投げられましたio.StringIO。を取り除くためにデータをデコードしようとしても、UnicodeDecodeErrorあまり意味がありませんでした。そもそもデータがバイナリなので、インスタンスを使おうとしたところ、io.BytesIOうまくいきました。私がする必要があったのは、ファイルのストリームを base64 でエンコードすることだけでした。そうすれば、コンテンツをセロリ タスクに渡すことができます。

コードサンプル

images.py

import base64

file_.stream.seek(0) # start from beginning of file
# some of the data may not be defined
data = {
  'stream': base64.b64encode(file_.read()),
  'name': file_.name,
  'filename': file_.filename,
  'content_type': file_.content_type,
  'content_length': file_.content_length,
  'headers': {header[0]: header[1] for header in file_.headers}
}

###
# add logic to sanitize required fields
###

# define the params for the upload (here I am using AWS S3)
bucket, s3_image_path = AWS_S3_BUCKET, AWS_S3_IMAGE_PATH
# import and call the background task
from async_tasks import upload_async_photo 
upload_async_photo.delay(
  data=data,
  image_path=s3_image_path,
  bucket=bucket)

async_tasks

import base64, io
from werkzeug.datastructures import FileStorage

@celery.task
def upload_async_photo(data, image_path, bucket):
    bucket = get_s3_bucket(bucket) # get bucket instance
    try:
        # decode the stream
        data['stream'] = base64.b64decode(data['stream'])
        # create a BytesIO instance
        # https://docs.python.org/3/library/io.html#binary-i-o
        data['stream'] = io.BytesIO(data['stream'])
        # create the file structure
        file_ = FileStorage(**data)
        # upload image
        bucket.put_object(
                Body=file_,
                Key=image_path,
                ContentType=data['content_type'])
    except Exception as e:
        print(str(e))

編集

また、セロリが受け入れるコンテンツと、データをシリアライズする方法も変更しました。Bytes インスタンスを celery タスクに渡すときに問題が発生しないようにするために、構成に以下を追加する必要がありました。

CELERY_ACCEPT_CONTENT = ['pickle']
CELERY_TASK_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'pickle'
于 2016-11-30T07:48:27.807 に答える
0

古い質問ですが、同じ問題が発生しました。受け入れられた答えは私にとってはうまくいきませんでした(私はDockerインスタンスを使用しているため、Celeryはプロデューサーファイルシステムにアクセスできません。また、最初にファイルをローカルファイルシステムに保存するのが遅いです)。

私のソリューションは、ファイルを RAM に保持します。したがって、はるかに高速です。唯一の欠点は、大きなファイル (>1GB) を処理する必要がある場合、大量の RAM を備えたサーバーが必要になることです。

doc_file のタイプはwerkzeug.datastructure.FileStorage( docs here を参照)

ファイルをセロリ ワーカーに送信する:

entry.delay(doc_file.read(), doc_file.filename, doc_file.name, doc_file.content_length, doc_file.content_type, doc_file.headers)

ファイルの受信:

from werkzeug.datastructures import FileStorage
from StringIO import StringIO

@celery.task()
def entry(stream, filename, name, content_length, content_type, headers):
    doc = FileStorage(stream=StringIO(stream), filename=filename, name=name, content_type=content_type, content_length=content_length)
    # Do something with the file (e.g save to Amazon S3)
于 2016-02-25T13:29:01.613 に答える