28

asyncio 呼び出し loop.run_in_executor を使用して Executor でブロッキング関数を開始し、後でキャンセルしたいのですが、うまくいかないようです。

コードは次のとおりです。

import asyncio
import time

from concurrent.futures import ThreadPoolExecutor


def blocking_func(seconds_to_block):
    for i in range(seconds_to_block):
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


@asyncio.coroutine
def non_blocking_func(seconds):
    for i in range(seconds):
        print('yielding {}/{}'.format(i, seconds))
        yield from asyncio.sleep(1)

    print('done non blocking {}'.format(seconds))


@asyncio.coroutine
def main():
    non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)]
    blocking_future = loop.run_in_executor(None, blocking_func, 5)
    print('wait a few seconds!')
    yield from asyncio.sleep(1.5)

    blocking_future.cancel()
    yield from asyncio.wait(non_blocking_futures)



loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
loop.set_default_executor(executor)
asyncio.async(main())
loop.run_forever()

上記のコードでは、ブロッキング関数の出力のみが許可されると予想されます。

blocking 0/5
blocking 1/5

次に、ノンブロッキング関数の出力を確認します。しかし、代わりに、キャンセルした後もブロッキングの未来が続きます。

出来ますか?それを行う他の方法はありますか?

ありがとう

編集: asyncio を使用したブロッキング コードと非ブロッキング コードの実行に関する詳細な説明: asyncio を使用してブロッキング コードと非ブロッキング コードをインターフェイスする方法

4

2 に答える 2

28

この場合、Future実際に実行を開始した後に をキャンセルする方法はありません。これは、 の動作に依存しておりconcurrent.futures.Futureそのドキュメントには次のように記載されているためです。

cancel()

通話をキャンセルしようとします。呼び出しが現在実行中でキャンセルできない場合、Falseメソッドは戻ります。それ以外の場合、呼び出しはキャンセルされ、メソッドは戻りTrueます。

そのため、キャンセルが成功するのは、タスクがまだ 内で保留中の場合のみExecutorです。asyncio.Future現在、実際には a をラップしたを使用しています。実際にはconcurrent.futures.Future、を呼び出した後にasyncio.Future返された byは、基になるタスクが実際に既に実行されている場合でもloop.run_in_executor()、 aを発生させます。ただし、実際には、 内のタスクの実行はキャンセルされません。CancellationErroryield fromcancel()Executor

実際にタスクをキャンセルする必要がある場合は、スレッドで実行されているタスクを中断する従来の方法を使用する必要があります。それを行う方法の詳細は、ユースケースに依存します。例で示したユースケースでは、次を使用できますthreading.Event

def blocking_func(seconds_to_block, event):
    for i in range(seconds_to_block):
        if event.is_set():
            return
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


...
event = threading.Event()
blocking_future = loop.run_in_executor(None, blocking_func, 5, event)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)

blocking_future.cancel()  # Mark Future as cancelled
event.set() # Actually interrupt blocking_func
于 2014-10-16T22:30:33.587 に答える
1

スレッドはプロセスの同じメモリ アドレス空間を共有するため、実行中のスレッドを安全に終了する方法はありません。これが、ほとんどのプログラミング言語が実行中のスレッドを強制終了することを許可しない理由です (この制限には多くの醜いハックがあります)。

Java は苦労してそれを学びました。

解決策は、スレッドではなく別のプロセスで関数を実行し、正常に終了することです。

Pebbleライブラリは、実行中のキャンセルをconcurrent.futuresサポートするのと同様のインターフェイスを提供します。Futures

from pebble import ProcessPool

def function(foo, bar=0):
    return foo + bar

with ProcessPool() as pool:
    future = pool.schedule(function, args=[1])

    # if running, the container process will be terminated 
    # a new process will be started consuming the next task
    future.cancel()  
于 2017-08-08T10:16:36.967 に答える