2

マルチスレッド用にconcurrent.future.ThredPoolExecutorを使用しています。いくつかのhttpサービスを実行しています。サーバーがダウンしたときに実行を一時停止し、サーバーを起動してから実行を再開するようにスレッドを制御したかったのです。

サーバーがダウンするトリガーは、特定の場所でファイルが利用可能かどうかを確認しているため、実行を一時停止する必要があります。

そのため、concurrent.futures.Executor.shutdown() は、現在保留中のフューチャーの実行が完了したときに、使用しているすべてのリソースを解放する必要があることをエグゼキューターに通知します。

しかし、executor の shutdown() メソッドを使用すると、すぐにスレッドをシャットダウンするのではなく、実行全体が終了した後に shutdown() を呼び出します。

実際、concurren.future で一時停止と再開が見つからなかったため、shutdown() メソッドを呼び出しています。別の方法として、スレッドの実行が終了したら、リストから URL を削除しています。残りのリストを渡して同じメソッドを呼び出すことができるようにします。

コードは次のとおりです。

import concurrent.futures
import urllib.request
import os.path
import datetime
import sys
import pathlib
from errno import ENOENT, EACCES, EPERM
import time
import threading

listOfFilesFromDirectory =  []
webroot = settings.configuration.WEBSERVER_WEBROOT
WEBSERVER_PORT = settings.configuration.WEBSERVER_PORT
shutdown = False

def class myclass:

#populating the list with the urls from a file
def triggerMethod(path):
    try:
        for line in open(path):
            listOfFilesFromDirectory.append(line)
    except IOError as err:
        if err.errno == ENOENT:
            #logging.critical("document.txt file is missing")
            print("document.txt file is missing")
        elif err.errno in (EACCES, EPERM):
            #logging.critical("You are not allowed to read document.txt")
            print("You are not allowed to read document.txt")
        else:
            raise   

# calling this method to stop the threads and restart after a sleep of 100 secs, as the list will always have the urls that were not executed.
def stopExecutor(executor):
    filePath = "C:\logs\serverStopLog.txt"
    while not shutdown:
        time.sleep(5)
        if os.path.isfile(filePath):
            executor.shutdown( )
            time.sleep(100)
            runRegressionInMultipleThreads( )
            break

def load_url(url, timeout):
    conn = urllib.request.urlopen('http://localhost:' + WEBSERVER_PORT + "/" + url, timeout = timeout)
    return conn.info()

def trigegerFunc( ):
    # We can use a with statement to ensure threads are cleaned up promptly
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        # Start the load operations and mark each future with its URL
        future_to_url = {executor.submit(load_url, url, 60): url for url in listOfFilesFromDirectory}

        t = threading.Thread(target=stopExecutor, args=(executor))
        t.start()
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (url, exc))
                listOfFilesFromDirectory.remove(url)
            else:
                if data:
                    if "200" in data:
                        listOfFilesFromDirectory.remove(url)
                    else:
                        listOfFilesFromDirectory.remove(url)
                else:
                    listOfFilesFromDirectory.remove(url)
        shutdown = True
        t.join()                


triggerMethod("C:\inetpub\wwwroot")
trigegerFunc()
4

1 に答える 1

1

You can't cancel or pause/resume threads in Python. executor.shutdown() does exactly what you said it does when you quoted the documentation:

Signal the executor that it should free any resources that it is using when the currently pending futures are done executing.

Note that bolded part - the executor will only shutdown once all currently executing tasks are completed. To get the kind of control you want, you'll need to run the urllib call in a separate process, like this (this is a simplified version of your script):

import time
import os.path
import threading
import urllib.request
import multiprocessing
import concurrent.futures
from multiprocessing import cpu_count

shutdown = False
should_cancel = False

def stopTasks():
    global should_cancel
    filePath = "C:\logs\serverStopLog.txt"
    while not shutdown:
        time.sleep(5)
        if os.path.isfile(filePath):
            should_cancel = True
            break

def _load_url(num, timeout, q):
    conn = urllib.request.urlopen('http://localhost:' + WEBSERVER_PORT + 
                                  "/" + url, timeout=timeout)
    q.put(conn.info())

def load_url(num, timeout):
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=_load_url, args=(num, timeout, q))
    p.start()
    while p.is_alive():
        time.sleep(.5)
        if should_cancel:
            p.terminate()  # This will actually kill the process, cancelling the operation
            break # You could return something here that indicates it was cancelled, too.
    else:
        # We'll only enter this if we didn't `break` above.
        out = q.get()
        p.join()
        return out

def triggerFunc():
    global shutdown
    with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count()) as executor:
        # Start the load operations and mark each future with its URL
        future_to_url = {executor.submit(load_url, url, 60):
                             url for url in listOfFilesFromDirectory}
        t = threading.Thread(target=stopTasks)
        t.start()
        for future in concurrent.futures.as_completed(future_to_url):
            info = future.result()
            print("done: {}".format(info))
            # other stuff you do
        shutdown = True
        t.join()

if __name__ == "__main__":
    triggerFunc()

Because we can actually kill a sub-process by sending it a SIGTERM, we can truly cancel the urlopen operation while its still in progress.

于 2014-07-25T20:01:17.777 に答える