1

マルチスレッドを使用して、複数のホスト名を IP アドレスに解決するスクリプトをプログラムしました。

ただし、ランダムな時点で失敗し、フリーズします。これはどのように解決できますか?

num_threads = 100
conn = pymysql.connect(host='xx.xx.xx.xx', unix_socket='/tmp/mysql.sock', user='user', passwd='pw', db='database')
cur = conn.cursor()
def mexec(befehl):
    cur = conn.cursor()
    cur.execute(befehl)

websites = ['facebook.com','facebook.org' ... ... ... ...] \#10.000 websites in array
queue = Queue()
def getips(i, q):
    while True:
        #--resolve IP--
        try:
            result = socket.gethostbyname_ex(site)
            print(result)
            mexec("UPDATE sites2block SET ip='"+result+"', updated='yes' ") #puts site in mysqldb
        except (socket.gaierror):
            print("no ip")
            mexec("UPDATE sites2block SET ip='no ip', updated='yes',")
        q.task_done()
#Spawn thread pool
for i in range(num_threads):
    worker = Thread(target=getips, args=(i, queue))
    worker.setDaemon(True)
    worker.start()
#Place work in queue
for site in websites:
    queue.put(site)
#Wait until worker threads are done to exit
queue.join()
4

3 に答える 3

3

Sentinel 値を使用して、作業がないことをスレッドに通知し、queue.task_done()andの代わりにスレッドに参加できqueue.join()ます。

#!/usr/bin/env python
import socket
from Queue import Queue
from threading import Thread

def getips(queue):
    for site in iter(queue.get, None):
        try: # resolve hostname
            result = socket.gethostbyname_ex(site)
        except IOError, e:
            print("error %s reason: %s" % (site, e))
        else:
            print("done %s %s" % (site, result))

def main():
    websites = "youtube google non-existent.example facebook yahoo live".split()
    websites = [name+'.com' for name in websites]

    # Spawn thread pool
    queue = Queue()
    threads = [Thread(target=getips, args=(queue,)) for _ in range(20)]
    for t in threads:
        t.daemon = True
        t.start()

    # Place work in queue
    for site in websites: queue.put(site)
    # Put sentinel to signal the end
    for _ in threads: queue.put(None)
    # Wait for completion
    for t in threads: t.join()

main()

gethostbyname_ex()機能は廃止されました。IPv4/v6 アドレスの両方をサポートするには、代わりに使用できますsocket.getaddrinfo()

于 2012-02-10T17:52:53.663 に答える
1

私の最初の考えは、DNS の過負荷が原因でエラーが発生するというものでした。おそらく、リゾルバーが、一度に一定量以上のクエリを実行することを許可していない可能性があります。


さらに、いくつかの問題を発見しました。

  1. siteループ内で正しく割り当てるのを忘れていました - これはおそらく、キューを反復するループなどwhileに置き換えたほうがよいでしょう。forお使いのバージョンではsite、モジュール レベルの名前空間の変数を使用しているため、クエリが二重になり、他のクエリがスキップされる可能性があります。

    この場所では、キューにまだエントリがあるか、待機中かを制御できます。どちらもそうでない場合は、スレッドを終了できます。

  2. セキュリティ上の理由から、

    def mexec(befehl, args=None):
        cur = conn.cursor()
        cur.execute(befehl, args)
    

    後からするために

    mexec("UPDATE sites2block SET ip=%s, updated='yes'", result) #puts site in mysqldb
    

将来のプロトコルとの互換性を維持するには、socket.getaddrinfo()代わりに を使用する必要がありsocket.gethostbyname_ex(site)ます。そこで、必要なすべての IP を取得し (最初は IPv4 に制限できますが、IPv6 に切り替える方が簡単です)、それらすべてを DB に入れることができます。


キューの場合、コード サンプルは次のようになります。

def queue_iterator(q):
    """Iterate over the contents of a queue. Waits for new elements as long as the queue is still filling."""
    while True:
        try:
            item = q.get(block=q.is_filling, timeout=.1)
            yield item
            q.task_done() # indicate that task is done.
        except Empty:
            # If q is still filling, continue.
            # If q is empty and not filling any longer, return.
            if not q.is_filling: return

def getips(i, q):
    for site in queue_iterator(q):
        #--resolve IP--
        try:
            result = socket.gethostbyname_ex(site)
            print(result)
            mexec("UPDATE sites2block SET ip=%s, updated='yes'", result) #puts site in mysqldb
        except (socket.gaierror):
            print("no ip")
            mexec("UPDATE sites2block SET ip='no ip', updated='yes',")
# Indicate it is filling.
q.is_filling = True
#Spawn thread pool
for i in range(num_threads):
    worker = Thread(target=getips, args=(i, queue))
    worker.setDaemon(True)
    worker.start()
#Place work in queue
for site in websites:
    queue.put(site)
queue.is_filling = False # we are done filling, if q becomes empty, we are done.
#Wait until worker threads are done to exit
queue.join()

トリックを行う必要があります。


もう 1 つの問題は、MySQL への並列挿入です。一度に実行できる MySQL クエリは 1 つだけです。threading.Lock()したがって、またはを介し​​てアクセスを保護するかRLock()、別のスレッドによって処理される別のキューに回答を入れて、それらをバンドルすることもできます。

于 2012-02-08T14:29:41.023 に答える
0

, , を直接使用するconcurrent.futuresよりも簡単に使用できる場合があります。threadingmultiprocessingQueue

#!/usr/bin/env python3
import socket
# pip install futures on Python 2.x
from concurrent.futures import ThreadPoolExecutor as Executor

hosts = "youtube.com google.com facebook.com yahoo.com live.com".split()*100
with Executor(max_workers=20) as pool:
     for results in pool.map(socket.gethostbyname_ex, hosts, timeout=60):
         print(results)

注: スレッドの使用からプロセスへの切り替えは簡単です。

from concurrent.futures import ProcessPoolExecutor as Executor

gethostbyname_ex()OS でスレッドセーフでない場合に必要です。たとえば、OSX場合です

で発生する可能性のある例外を処理したい場合gethostbyname_ex():

import concurrent.futures

with Executor(max_workers=20) as pool:
    future2host = dict((pool.submit(socket.gethostbyname_ex, h), h)
                       for h in hosts)
    for f in concurrent.futures.as_completed(future2host, timeout=60):
        e = f.exception()
        print(f.result() if e is None else "{0}: {1}".format(future2host[f], e))

docs の例に似ています。

于 2012-02-08T19:44:25.150 に答える