1

私はPythonボトとスレッドを使用してS3から多くのファイルを迅速にダウンロードしています。私はこれを私のプログラムで数回使用していますが、うまく機能しています。ただし、機能しない場合があります。そのステップでは、32コアマシン(Amazon EC2 cc2.8xlarge)に3,000ファイルをダウンロードしようとします。

以下のコードは、実際にはすべてのファイルのダウンロードに成功しています(ただし、再試行によって修正されないhttplib.IncompleteReadエラーが発生する場合があります)。ただし、実際に終了するのは32スレッドのうち10スレッド程度であり、プログラムはハングします。これがなぜかわからない。すべてのファイルがダウンロードされ、すべてのスレッドが終了しているはずです。ダウンロードするファイルの数が少ないと、他の手順で実行されます。私はこれらすべてのファイルを単一のスレッドでダウンロードすることになりました(これは機能しますが、非常に遅いです)。どんな洞察も大歓迎です!

from boto.ec2.connection import EC2Connection
from boto.s3.connection import S3Connection
from boto.s3.key import Key

from boto.exception import BotoClientError
from socket import error as socket_error
from httplib import IncompleteRead

import multiprocessing
from time import sleep
import os

import Queue
import threading

def download_to_dir(keys, dir):
    """
    Given a list of S3 keys and a local directory filepath,
    downloads the files corresponding to the keys to the local directory.
    Returns a list of filenames.
    """
    filenames = [None for k in keys]

    class DownloadThread(threading.Thread):

        def __init__(self, queue, dir):
            # call to the parent constructor
            threading.Thread.__init__(self)
            # create a connection to S3
            connection = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
            self.conn = connection
            self.dir = dir
            self.__queue = queue

        def run(self):
            while True:
                key_dict = self.__queue.get()
                print self, key_dict
                if key_dict is None:
                    print "DOWNLOAD THREAD FINISHED"
                    break
                elif key_dict == 'DONE': #last job for last worker
                    print "DOWNLOADING DONE"
                    break
                else: #still work to do!
                    index = key_dict.get('idx')
                    key = key_dict.get('key')
                    bucket_name = key.bucket.name
                    bucket = self.conn.get_bucket(bucket_name)
                    k = Key(bucket) #clone key to use new connection
                    k.key = key.key

                    filename = os.path.join(dir, k.key)
                    #make dirs if don't exist yet
                    try:
                        f_dirname = os.path.dirname(filename)
                        if not os.path.exists(f_dirname):
                            os.makedirs(f_dirname)
                    except OSError: #already written to
                        pass

                    #inspired by: http://code.google.com/p/s3funnel/source/browse/trunk/scripts/s3funnel?r=10
                    RETRIES = 5 #attempt at most 5 times
                    wait = 1
                    for i in xrange(RETRIES):
                        try:
                            k.get_contents_to_filename(filename)
                            break
                        except (IncompleteRead, socket_error, BotoClientError), e:
                            if i == RETRIES-1: #failed final attempt
                                raise Exception('FAILED TO DOWNLOAD %s, %s' % (k, e))
                                break
                            wait *= 2
                            sleep(wait)

                    #put filename in right spot!
                    filenames[index] = filename

    num_cores = multiprocessing.cpu_count()

    q = Queue.Queue(0)

    for i, k in enumerate(keys):
        q.put({'idx': i, 'key':k})
    for i in range(num_cores-1):
        q.put(None) # add end-of-queue markers
    q.put('DONE') #to signal absolute end of job

    #Spin up all the workers
    workers = [DownloadThread(q, dir) for i in range(num_cores)]
    for worker in workers:
        worker.start()

    #Block main thread until completion
    for worker in workers:
        worker.join() 

    return filenames
4

2 に答える 2

4

AWS SDKバージョン1.4.4.0以降にアップグレードするか、正確に2つのスレッドに固執します。古いバージョンには、最大2つの同時接続の制限があります。これは、2つのスレッドを起動した場合にコードが適切に機能することを意味します。3つ以上を起動すると、読み取りが不完全になり、タイムアウトが使い果たされることになります。

2つのスレッドでスループットを大幅に向上させることができますが、ネットワークカードが常にビジーであるため、2つを超えるスレッドはあまり変化しないことがわかります。

于 2012-07-11T22:02:32.263 に答える
0

S3Connectionはhttplib.pyを使用し、そのライブラリはスレッドセーフではないため、各スレッドに独自の接続があることを確認することが重要です。あなたはそれをしているようです。

Botoにはすでに独自の再試行メカニズムがありますが、他の特定のエラーを処理するために、その上に1つを重ねています。例外ブロック内に新しいS3Connectionオブジェクトを作成することをお勧めしますか?基になるhttp接続がその時点で異常な状態にある可能性があるように思われるため、新しい接続から開始するのが最適な場合があります。

ちょっとした考え。

于 2012-07-11T23:03:36.927 に答える