2

現在、コンテンツを取得するための URL のリストがあり、順次実行しています。それらを並行してつかむように変更したいと思います。これは疑似コードです。お聞きしたいのですが、デザインの音ですか?.start() がスレッドを開始することは理解していますが、データベースが更新されていません。q.get() を使用する必要がありますか? ありがとう

import threading    
import Queue
q = Queue.Queue()

def do_database(url):
    """ grab url then input to database """
    webdata = grab_url(url)
    try:
        insert_data_into_database(webdata)
    except:
        ....
    else:
        < do I need to do anything with the queue after each db operation is done?>

def put_queue(q, url ):
    q.put( do_database(url) )

for myfiles in currentdir:
    url = myfiles + some_other_string
    t=threading.Thread(target=put_queue,args=(q,url))
    t.daemon=True
    t.start()   
4

3 に答える 3

2

に何かを入れているのにq、何も取り出さないのは奇妙ですq。の目的はq何ですか? また、何もdo_database()しないので、に入れるだけのようです。returnq.put(do_database(url))Noneq

これらが機能する通常の方法では、実行する作業の説明がキューに追加され、一定数のスレッドが交代でキューから物事を引き出します。おそらく無制限の数のスレッドを作成したくないでしょう;-)

これはかなり完全ですが、テストされていないスケッチです。

import threading
import Queue

NUM_THREADS = 5  # whatever

q = Queue.Queue()
END_OF_DATA = object()  # a unique object

class Worker(threading.Thread):
    def run(self):
        while True:
            url = q.get()
            if url is END_OF_DATA:
                break
            webdata = grab_url(url)
            try:
                # Does your database support concurrent updates
                # from multiple threads?  If not, need to put
                # this in a "with some_global_mutex:" block.
                insert_data_into_database(webdata)
            except:
                #....

threads = [Worker() for _ in range(NUM_THREADS)]
for t in threads:
    t.start()

for myfiles in currentdir:
    url = myfiles + some_other_string
    q.put(url)

# Give each thread an END_OF_DATA marker.
for _ in range(NUM_THREADS):
    q.put(END_OF_DATA)

# Shut down cleanly.  `daemon` is way overused.
for t in threads:
    t.join()
于 2013-09-22T05:21:52.680 に答える
2

これは、スレッドではなく非同期プログラミングで行う必要があります。Python でのスレッド化には問題があり (Global Interpreter Lock を参照)、とにかくここでマルチコア パフォーマンスを達成しようとしていません。長時間実行される可能性のある I/O を多重化する方法が必要なだけです。そのためには、単一のスレッドとTwistedなどのイベント駆動型ライブラリを使用できます。

Twisted には HTTP 機能が付属しているため、多くの同時リクエストを発行し、結果が届いたときに (データベースにデータを入力することによって) 反応することができます。このプログラミング モデルは慣れるまでに少し時間がかかる場合があることに注意してください。あなたが行っている要求の数は天文学的ではありません (つまり、1 台のマシンですべてを完了できる場合、それはあなたの意図のようです)。

于 2013-09-22T05:02:24.137 に答える
1

DB の場合、変更が有効になる前にコミットする必要があります。ただし、挿入ごとにコミットするのは最適ではありません。一括変更後にコミットすると、パフォーマンスが大幅に向上します。

並列の場合、Python はこのために生まれてきたわけではありません。あなたのユースケースでは、Pythonをgeventで使用するのは簡単な解決策だと思います。

参考までに、はるかに効率的な疑似実装を次に示します。

import gevent
from gevent.monkey import patch_all
patch_all() # to use with urllib, etc
from gevent.queue import Queue


def web_worker(q, url):
  grab_something
  q.push(result)

def db_worker(q):
  buf = []
  while True:
    buf.append(q.get())
    if len(buf) > 20:
      insert_stuff_in_buf_to_db
      db_commit
      buf = []

def run(urls):
  q = Queue()
  gevent.spawn(db_worker, q)
  for url in urls:
    gevent.spawn(web_worker, q, url)


run(urls)

さらに、この実装は完全にシングル スレッドであるため、キュー、データベース接続、グローバル変数などのワーカー間の共有データを安全に操作できます。

于 2013-09-22T05:31:52.330 に答える