3

未完成のスレッドが多すぎるという問題があります。キューコマンド.join()は、キューを閉じるだけで、それを使用しているスレッドではないと思います。

私のスクリプトでは、280kドメインをチェックし、各ドメインについて彼のMXレコードのリストを取得し、サーバーのIPv6アドレスがある場合はそれを取得する必要があります。

私はスレッドを使用し、スクリプトのおかげで何倍も速くなりました。しかし、問題があります。キューにjoin()がありますが、新しいスレッドを作成できないことを通知するエラーが発生するまで、生きているスレッドの数が増えています(OSの制限?)。

データベースから新しいドメインを取得するときに、各Forループの後にスレッドを終了/閉じる/停止/リセットするにはどうすればよいですか?

スレッドクラスの定義...

class MX_getAAAA_thread(threading.Thread):
    def __init__(self,queue,id_domain):
        threading.Thread.__init__(self)
        self.queue = queue
        self.id_domain = id_domain


    def run(self):
        while True:
            self.mx = self.queue.get()

            res = dns.resolver.Resolver()
            res.lifetime = 1.5
            res.timeout = 0.5

            try:
                answers = res.query(self.mx,'AAAA')
                ip_mx = str(answers[0])
            except:
                ip_mx = "N/A"

            lock.acquire()

            sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
            try:
                cursor.execute(sql)
                db.commit()
            except:
                db.rollback()

            print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)

            lock.release()
            self.queue.task_done()

使用中のスレッドクラス...(メインのForループはここにはありません。これは彼の体の一部にすぎません)

try:
    answers = resolver.query(domain, 'MX')

    qMX = Queue.Queue()
    for i in range(len(answers)):
        t = MX_getAAAA_thread(qMX,id_domain)
        t.setDaemon(True)
        threads.append(t)
        t.start()

    for mx in answers:
        qMX.put(mx.exchange)

    qMX.join()

except NoAnswer as e:
    print "MX - Error: No Answer"
except Timeout as etime:
    print "MX - Error: dns.exception.Timeout"

print "end of script"

私がしようとしました:

for thread in threads:
            thread.join()

キューが完了した後、queue.join()が実行されるとスレッドに対して何もすることがないため、待機する必要がないにもかかわらず、thread.join()は待機を停止しません。

4

3 に答える 3

5

スレッドにこのような無限ループが含まれている場合によく行うことは、条件を外部から制御できるものに変更することです。たとえば、次のようになります。

def run(self):
    self.keepRunning = True
    while self.keepRunning:
        # do stuff

そうすれば、keepRunningプロパティを外部から変更し、falseに設定して、次にループ状態をチェックするときにスレッドを正常に終了させることができます。

ところで。キューに入れるアイテムごとに正確に1つのスレッドを生成するように見えるので、スレッドをループさせる必要はまったくありませんが、作成できるスレッドの最大制限を常に適用する必要があると主張します。このように(すなわちfor i in range(min(len(answers), MAX_THREAD_COUNT)):

あなたの場合、forループの反復ごとにスレッドを終了する代わりに、スレッドを再利用することができます。私があなたのスレッドのソースから収集したものから、スレッドを反復に固有にするのは、id_domainその作成時に設定したプロパティだけです。ただし、それをキューに提供することもできるため、スレッドは完全に独立しており、再利用できます。

これは次のようになります。

qMX = Queue.Queue()
threads = []
for i in range(MAX_THREAD_COUNT):
    t = MX_getAAAA_thread(qMX)
    t.daemon = True
    threads.append(t)
    t.start()

for id_domain in enumerateIdDomains():
    answers = resolver.query(id_domain, 'MX')
    for mx in answers:
        qMX.put((id_domain, mx.exchange)) # insert a tuple

qMX.join()

for thread in threads:
    thread.keepRunning = False

もちろん、スレッドを少し変更する必要があります。

class MX_getAAAA_thread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        self.keepRunning = True
        while self.keepRunning:
            id_domain, mx = self.queue.get()
            # do stuff
于 2013-01-27T14:52:36.683 に答える
4

そもそもなぜ必要なのかわかりませんQueue
結局のところ、設計では、すべてのスレッドが1つのタスクを処理するだけです。
作成時にそのタスクをスレッドに渡すことができるはずです。
このように、あなたはを必要とせずQueue、あなたは-loopを取り除きますwhile

class MX_getAAAA_thread(threading.Thread):
    def __init__(self, id_domain, mx):
        threading.Thread.__init__(self)
        self.id_domain = id_domain
        self.mx = mx

次に、 -methodwhile内の-loopを取り除くことができます。run

def run(self):
    res = dns.resolver.Resolver()
    res.lifetime = 1.5
    res.timeout = 0.5

    try:
        answers = res.query(self.mx,'AAAA')
        ip_mx = str(answers[0])
    except:
        ip_mx = "N/A"

    with lock:
        sql = "INSERT INTO mx (id_domain,mx,ip_mx) VALUES (" + str(id_domain) + ",'" + str(self.mx) + "','" + str(ip_mx) + "')"
        try:
            cursor.execute(sql)
            db.commit()
        except:
            db.rollback()

        print "MX" , '>>' , ip_mx, ' :: ', str(self.mx)

タスクごとに1つのスレッドを作成します

for mx in answers:
    t = MX_getAAAA_thread(qMX, id_domain, mx)
    t.setDaemon(True)
    threads.append(t)
    t.start()

そしてそれらに参加します

for thread in threads:
    thread.join()
于 2013-01-27T14:41:42.697 に答える
2

スレッドを結合することでうまくいきますが、スレッドが実行ループを終了することはないため、この場合の結合は無期限にブロックされます。スレッドを結合できるように、runメソッドを終了する必要があります。

于 2013-01-27T14:23:02.910 に答える