1

I am building a multi threading application.

I have setup a threadPool. [ A Queue of size N and N Workers that get data from the queue]

When all tasks are done I use

tasks.join() 

where tasks is the queue .

The application seems to run smoothly until suddently at some point (after 20 minutes in example) it terminates with the error

thread.error: can't start new thread

Any ideas?

Edit: The threads are daemon Threads and the code is like:

while True:
    t0 = time.time()
    keyword_statuses = DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(100)
    if keyword_statuses.count() == 0:
        DBSession.commit()
        break

    for kw_status in keyword_statuses:
       kw_status.status = 1
       DBSession.commit()

    t0 = time.time()
    w = SWorker(threads_no=32, network_server='http://192.168.1.242:8180/', keywords=keyword_statuses, cities=cities, saver=MySqlRawSave(DBSession), loglevel='debug')

    w.work()

print 'finished'

When the daemon threads are killed? When the application finishes or when the work() finishes?

Look at the thread pool and the worker (it's from a recipe )

from Queue import Queue
from threading import Thread, Event, current_thread
import time

event = Event()

class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""

    def __init__(self, tasks):
        Thread.__init__(self)

        self.tasks = tasks
        self.daemon = True
        self.start()


    def run(self):
        '''Start processing tasks from the queue'''
        while True:
            event.wait()
            #time.sleep(0.1)
            try:
                func, args, callback = self.tasks.get()
            except Exception, e:
                print str(e)
                return
            else:
                if callback is None:
                    func(args)
                else:
                    callback(func(args))

                self.tasks.task_done()

class ThreadPool:
    """Pool of threads consuming tasks from a queue"""

    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads): Worker(self.tasks)


    def add_task(self, func, args=None, callback=None):
        ''''Add a task to the queue'''
        self.tasks.put((func, args, callback))


    def wait_completion(self):
        '''Wait for completion of all the tasks in the queue'''
        self.tasks.join()


    def broadcast_block_event(self):
        '''blocks running threads'''
        event.clear()


    def broadcast_unblock_event(self):
        '''unblocks running threads'''
        event.set()


    def get_event(self):
        '''returns the event object'''
        return event

ALSo maybe the problem it's because I create SWorker objects in a loop? What happens with the old SWorker (garbage collection ?) ?

4

1 に答える 1

4

問題を特定するためのコードはまだ十分ではありませんが、これは、スレッドを利用せず、スレッドを開始しすぎているためだと確信しています。Queue python ドキュメントhttp://docs.python.org/library/queue.html (ページの下部) の標準的な例を見ましたか?

次のコードで問題を再現できます。

import threading
import Queue

q = Queue.Queue()

def worker():
    item = q.get(block=True)  # sleeps forever for now
    do_work(item)
    q.task_done()

# create infinite number of workers threads and fails
# after some time with "error: can't start new thread"
while True:
    t = threading.Thread(target=worker)
    t.start()
q.join() # newer reached this

代わりに、既知のスレッド数でスレッドのポーリングを作成し、次のようにデータをキューに入れる必要があります。

q = Queue()

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

UPD: 一部のスレッドを停止する必要がある場合は、フラグを追加するか、ブレークwhileループの「停止」を意味する特別なマークを送信できます。

class Worker(Thread):
    break_msg = object() # just uniq mark sign

    def __init__(self):
        self.continue = True

    def run():
        while self.continue:  # can stop and destroy thread, (var 1)
            msg = queue.get(block=True)
            if msg == self.break_msg:
                return  # will stop and destroy thread (var 2)
            do_work()
            queue.task_done()

workers = [Worker() for _ in xrange(num_workers)]
for w in workers:
    w.start()
for task in tasks:
    queue.put(task)

for _ in xrange(num_workers):
    queue.put(Worker.break_msg) # stop thread after all tasks done. Need as many messages as many threads you have
OR
queue.join() # wait until all tasks done
for w in workers:
    w.continue = False
    w.put(None)
于 2012-06-29T15:03:24.443 に答える