4

私はスレッドとキューを使用してURLを取得し、データベースに保存しています。
1つのスレッドで保存ジョブを実行したいだけです。
だから私は以下のようにコードを書きます:

import threading
import time

import Queue

site_count = 10

fetch_thread_count = 2

site_queue = Queue.Queue()
proxy_array=[]        


class FetchThread(threading.Thread):
    def __init__(self,site_queue,proxy_array):
        threading.Thread.__init__(self)
        self.site_queue = site_queue
        self.proxy_array = proxy_array
    def run(self):
        while True:
            index = self.site_queue.get()
            self.get_proxy_one_website(index)
            self.site_queue.task_done()
    def get_proxy_one_website(self,index):
        print '{0} fetched site :{1}\n'.format(self.name,index)
        self.proxy_array.append(index)


def save():
    while True:
        if site_queue.qsize() > 0:
            if len(proxy_array) > 10:
                print 'save :{0}  to database\n'.format(proxy_array.pop())

            else:
                time.sleep(1)
        elif len(proxy_array) > 0:
            print 'save :{0} to database\n'.format(proxy_array.pop())

        elif len(proxy_array) == 0:
            print 'break'
            break
        else:
            print 'continue'
            continue

def start_crawl():
    global site_count,fetch_thread_count,site_queue,proxy_array
    print 'init'
    for i in range(fetch_thread_count):
        ft = FetchThread(site_queue,proxy_array)
        ft.setDaemon(True)
        ft.start()

    print 'put site_queue'
    for i in range(site_count):
        site_queue.put(i)

    save()

    print 'start site_queue join'
    site_queue.join()
    print 'finish'

start_crawl()

実行された出力:

init
put site_queue
Thread-1 fetched site :0

Thread-2 fetched site :1

Thread-1 fetched site :2

Thread-2 fetched site :3

Thread-1 fetched site :4

Thread-2 fetched site :5

Thread-1 fetched site :6

Thread-2 fetched site :7

Thread-1 fetched site :8

Thread-2 fetched site :9

save :9 to database

save :8 to database

save :7 to database

save :6 to database

save :5 to database

save :4 to database

save :3 to database

save :2 to database

save :1 to database

save :0 to database

break
start site_queue join
finish
[Finished in 1.2s]

なぜsave()関数が実行され、その後site_queue.join()に書かれていsave()ます。
私もsave() スレッド関数に置き換えましたが、それも機能しません。
に変更しなければならないということproxy_array=[]ですproxy_queue=Queue.Queue()か? そうすれば、theading を使用してデータを保存できますか?
1 つのスレッドにこれを実行させたいだけで、他のスレッドからデータを取得するスレッドはありませんproxy_array。なぜそれに参加する必要があるのですか?Queue の使用は非常に奇妙に思えます。
より良いソリューションはありますか?

更新:
すべての FetchThreads が作業を完了するまで待ちたくありません。フェッチ中にデータを保存したいのですが、はるかに高速です。結果は次のようになりたいです(array.pop()を使用しているため、save 0は非常に後で表示される可能性があります。これは、簡単に理解できるようにするための単なる例です。):

Thread-2 fetched site :1

Thread-1 fetched site :2

save :0 to database

Thread-2 fetched site :3

Thread-1 fetched site :4

save :2 to database

save :3 to database


Thread-2 fetched site :5

Thread-1 fetched site :6

save :4 to database
.......

誰かのためのUPDATE2には、以下と同じ質問があります:

質問:
上記のコンテキストで言ったように、proxy_array からデータを取得する他のスレッドはありません。
なぜそれがスレッドセーフを壊すのか想像できませんか?

回答:ミーシャの回答の
生産者と消費者の問題、よく読んだらわかりました。


質問:
プログラムのメイン スレッドが FetchThreads を使用してコンシューマとして再生できる場合 (つまり、StoreThread を作成する必要がない場合)

、これは私が理解できないことです。答えが見つかったら更新します。

4

2 に答える 2

5

私は似たような生産者と消費者を考え出さなければなりません。プロデューサは「ID」を生成し、コンシューマはその ID を使用して URL フェッチを行い、それを処理します。これがそれを行う私のスケルトンコードです


    import Queue
    import random
    import threading
    import time
    import sys

    data_queue = Queue.Queue()
    lock = threading.Lock()

    def gcd(a, b):
        while b != 0:
            a,b = b, a%b

        return b

    def consumer(idnum):
        while True:
            try:
                data = data_queue.get(block=False)
            except Exception, e:
               print 'Exception ' + str(e)
            else:
                with lock:
                    print('\t consumer %d: computed gcd(%d, %d) = %d' %(idnum, data[0], data[1], gcd(data[0], data[1])))

            time.sleep(1)
            data_queue.task_done()

    def producer(idnum, count):
        for i in range(count):
            a,b = random.randint(1, sys.maxint), random.randint(1, sys.maxint)
            with lock:
                print('\t producer %d: generated (%d, %d)'% (idnum, a, b))
            data_queue.put((a,b))
            time.sleep(0.5)

    if __name__ == '__main__':
        num_producers = 1
        num_consumers = 2
        num_integer_pairs = 10

        for i in range(num_consumers):
            t = threading.Thread(target=consumer, args=(i,))
            t.daemon = True
            t.start()

        threads = []
        for ii in range(num_producers):
            thread = threading.Thread(target=producer, args=(ii, num_integer_pairs))
            threads.append(thread)
            thread.start()

        # wait for the producers threads to finish
        for thread in threads:
            thread.join()
        print 'done with producer threads'

        # wait till all the jobs are done in the queue
        data_queue.join()

        with lock:
            print 'all consumer threads finished'

        with lock:
            print 'main thread exited'
于 2014-02-04T01:14:35.917 に答える