アップロードビューを介してcsvファイルを取得するアプリケーションがあり、ファイルからデータをリストに読み取り、リストをセロリタスクに渡し、タスクはジェネレーター関数を使用してデータを200のチャンクに分割します(重複がないことを確認するために少し前処理を行います)。次に、これらのチャンクがタスク セットに渡され、チャンクが処理されて行がデータベースに書き込まれます。サブタスクは、書き込まれた行の結果などをメイン タスクに渡して、完全なタスクセットのログを記録します。
データベースに 200 行のチャンクを書き込む 1 つのタスクがランダムにフリーズする大量のデータ セットをロードするときに問題が発生します。このタスクを強制終了すると、キューは引き続き処理されます。これは、db と celery をローカルで実行してテストした場合には発生しなかったため、何らかの mysql 接続の問題であると想定しています。celery.log と mysql のログは何も明らかにしていません。トラブルシューティングします。
タスクキュー用にdjango、mysql、rabbitmqを実行しており、db、webserver用に1台のサーバー、およびタスク処理用にもう1台のサーバー(ubuntuサーバー12.04を実行する2 x AWS EC2)を使用しています。
私のコード:
#views.py
reader = csv.reader(request.FILES['f'], delimiter=',', quotechar='"')
#Skip the header
reader.next()
uploaddata = [row for row in reader]
LegacyUploadManager.delay(uploaddata, log.pk)
#tasks.py
class LegacyUploadManager(Task):
"""
Task to take uploads, split into smaller tasks and manage
"""
def run(self, f, logpk):
#Create chunk generator to split chunks into sets of 200
ChunkGen = UploadChunkGen(f, 200)
#Generate chunks and create a taskset of chunk tasks
self.tasks = []
for chunk in ChunkGen:
self.tasks.append(LegacyUploader.subtask([chunk]))
#create taskset and start processing
job = TaskSet(self.tasks)
result = job.apply_async()
#Wait for tasks to complete
while result.waiting():
time.sleep(10)
#Write results to database via ImportLogger task
ImporterLogger.delay(logpk, result.join())
class LegacyUploader(Task):
"""
Task for uploading products imported from ESP via a full product layout.
"""
def run(self, f):
for row in f:
Product.objects.create(row)
「timeout=....」パラメータを「job.apply_async()」に追加するだけでよいと思いますが、これを行った場合の結果はわかりません。つまり、タイムアウトしたタスクで処理されなかったすべての行が失われますか、それともタスクがタイムアウトする前にデータベースに書き込まれた行が再度書き込まれますか?