5

約60,000個のアイテムのリストがあります。データベースにクエリを送信して、それらが存在するかどうか、および計算結果が返されるかどうかを確認したいと思います。リストを1つずつ繰り返しながら、通常のクエリを実行します。クエリは過去4日間実行されています。私はこれを改善するためにスレッドモジュールを使用できると思いました。私はこのようなことをしました

if __name__ == '__main__':
  for ra, dec in candidates:
    t = threading.Thread(target=search_sl, args=(ra,dec, q))
    t.start()
  t.join()

10個のアイテムだけでテストしましたが、正常に機能しました。60k個のアイテムのリスト全体を送信すると、「最大セッション数を超えました」というエラーが発生します。私がやりたいのは、一度に10個程度のスレッドを作成することです。最初のスレッドの束が実行を終了したら、別のリクエストを送信します。

4

4 に答える 4

7

マルチプロセッシングモジュールで利用可能なプロセスプールを使用してみることができます。Pythonドキュメントの例を次に示します。

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

http://docs.python.org/library/multiprocessing.html#using-a-pool-of-workers

システムがサポートできる最大数に達するまで、プロセスの数を増やしてみてください。

于 2012-04-08T14:04:04.907 に答える
4

スレッド化する前にクエリを改善してください(時期尚早の最適化はすべての悪の根源です!)

問題は、単一のデータベースに60,000の異なるクエリがあることです。アイテムごとに1つのクエリがあるということは、接続を開いてDBカーソルセッションを呼び出すための多くのオーバーヘッドを意味します。

これらのクエリをスレッド化するとプロセスを高速化できますが、DBの過負荷や許可される最大セッション数などの別の問題が発生します。

最初のアプローチ:すべてのクエリに多くのアイテムIDをロードします

代わりに、クエリを改善してみてください。製品の長いリストを送信し、一致するものを返すクエリを作成できますか?おそらく次のようなものです:

SELECT item_id, * 
FROM   items
WHERE  item_id IN (id1, id2, id3, id4, id5, ....)

Pythonは、この種のifクエリに便利なインターフェイスを提供するため、IN句でpythonicリストを使用できます。このようにして、アイテムの長いリストを、たとえば、それぞれ1,000個のIDを持つ60個のクエリに分割できます。

2番目のアプローチ:一時テーブルを使用する

もう1つの興味深いアプローチは、アイテムIDを使用してデータベースに一時テーブルを作成することです。一時的なテーブルは接続が存続する限り存続するため、クリーンアップについて心配する必要はありません。おそらく次のようなものです:

CREATE TEMPORARY TABLE 
           item_ids_list (id INT PRIMARY KEY); # Remember indexing!

適切なPythonライブラリを使用してIDを挿入します。

INSERT INTO item_ids_list   ...                # Insert your 60,000 items here

結果を得る:

SELECT * FROM items WHERE items.id IN (SELECT * FROM items_ids_list);
于 2012-04-08T14:52:30.333 に答える
3

まず、最後のスレッドのみに参加します。最後に終了する保証はありません。次のように使用する必要があります。

from time import sleep
delay = 0.5
tlist = [threading.Thread(target=search_sl, args=(ra,dec, q)) for ra, dec in candidates ]
map(lambda t:t.start(), tlist)
while(any(map(lambda t:t.isAlive()))): sleep(delay)

2番目の問題は、現時点で60Kスレッドを実行するには、非常に膨大なハードウェアリソースが必要になることです:-)タスクをキューに入れてから、ワーカーで処理することをお勧めします。ワーカースレッドの数を制限する必要があります。そのように(コードをテストしていませんが、アイデアは明らかです):

from Queue import Queue
from threading import Thread
from time import sleep
tasks = Queue()
map(tasks.put, candidates)
maxthreads = 50
delay = 0.1
try:
    threads = [Thread(target=search_sl, args=tasks.get()) \
               for i in xrange(0,maxthreads) ]
except Queue.Empty:
    pass
map(lambda t:t.start(), threads)

while not tasks.empty():
    threads = filter(lambda t:t.isAlive(), threads)
    while len(threads) < maxthreads:
        try:
            t = Thread(target=search_sl, args=tasks.get())
            t.start()
            threads.append(t)
        except Queue.Empty:
            break
    sleep(delay)

while(any(map(lambda t:t.isAlive(), threads))): sleep(delay)
于 2012-04-08T14:20:27.717 に答える
0

これはIOタスクであるため、スレッドもプロセスも適切ではありません。計算タスクを並列化する必要がある場合は、これらを使用します。ですから、モダンにしてください™、gevent並列IOを多用するタスクのようなものを使用してください。

http://www.gevent.org/intro.html#example

于 2012-04-08T14:30:55.693 に答える