私はPythonボトとスレッドを使用してS3から多くのファイルを迅速にダウンロードしています。私はこれを私のプログラムで数回使用していますが、うまく機能しています。ただし、機能しない場合があります。そのステップでは、32コアマシン(Amazon EC2 cc2.8xlarge)に3,000ファイルをダウンロードしようとします。
以下のコードは、実際にはすべてのファイルのダウンロードに成功しています(ただし、再試行によって修正されないhttplib.IncompleteReadエラーが発生する場合があります)。ただし、実際に終了するのは32スレッドのうち10スレッド程度であり、プログラムはハングします。これがなぜかわからない。すべてのファイルがダウンロードされ、すべてのスレッドが終了しているはずです。ダウンロードするファイルの数が少ないと、他の手順で実行されます。私はこれらすべてのファイルを単一のスレッドでダウンロードすることになりました(これは機能しますが、非常に遅いです)。どんな洞察も大歓迎です!
from boto.ec2.connection import EC2Connection
from boto.s3.connection import S3Connection
from boto.s3.key import Key
from boto.exception import BotoClientError
from socket import error as socket_error
from httplib import IncompleteRead
import multiprocessing
from time import sleep
import os
import Queue
import threading
def download_to_dir(keys, dir):
"""
Given a list of S3 keys and a local directory filepath,
downloads the files corresponding to the keys to the local directory.
Returns a list of filenames.
"""
filenames = [None for k in keys]
class DownloadThread(threading.Thread):
def __init__(self, queue, dir):
# call to the parent constructor
threading.Thread.__init__(self)
# create a connection to S3
connection = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
self.conn = connection
self.dir = dir
self.__queue = queue
def run(self):
while True:
key_dict = self.__queue.get()
print self, key_dict
if key_dict is None:
print "DOWNLOAD THREAD FINISHED"
break
elif key_dict == 'DONE': #last job for last worker
print "DOWNLOADING DONE"
break
else: #still work to do!
index = key_dict.get('idx')
key = key_dict.get('key')
bucket_name = key.bucket.name
bucket = self.conn.get_bucket(bucket_name)
k = Key(bucket) #clone key to use new connection
k.key = key.key
filename = os.path.join(dir, k.key)
#make dirs if don't exist yet
try:
f_dirname = os.path.dirname(filename)
if not os.path.exists(f_dirname):
os.makedirs(f_dirname)
except OSError: #already written to
pass
#inspired by: http://code.google.com/p/s3funnel/source/browse/trunk/scripts/s3funnel?r=10
RETRIES = 5 #attempt at most 5 times
wait = 1
for i in xrange(RETRIES):
try:
k.get_contents_to_filename(filename)
break
except (IncompleteRead, socket_error, BotoClientError), e:
if i == RETRIES-1: #failed final attempt
raise Exception('FAILED TO DOWNLOAD %s, %s' % (k, e))
break
wait *= 2
sleep(wait)
#put filename in right spot!
filenames[index] = filename
num_cores = multiprocessing.cpu_count()
q = Queue.Queue(0)
for i, k in enumerate(keys):
q.put({'idx': i, 'key':k})
for i in range(num_cores-1):
q.put(None) # add end-of-queue markers
q.put('DONE') #to signal absolute end of job
#Spin up all the workers
workers = [DownloadThread(q, dir) for i in range(num_cores)]
for worker in workers:
worker.start()
#Block main thread until completion
for worker in workers:
worker.join()
return filenames