(注: この問題の背景はかなり詳細ですが、スキップできる SSCCE が一番下にあります)
バックグラウンド
Web サービスとやり取りするための Python ベースの CLI を開発しようとしています。私のコードベースにはCommunicationService
、Web サービスとのすべての直接通信を処理するクラスがあります。Web サービスから応答が返されたときに通知を受けるために、他のオブジェクトがサブスクライブできる (RxPY からの) をreceived_response
返すプロパティを公開します。Observable
click
サブコマンドの 1 つが以下のように実装されているライブラリに基づいて CLI ロジックを作成しました。
async def enabled(self, request: str, response_handler: Callable[[str], Tuple[bool, str]]) -> None:
self._generate_request(request)
if response_handler is None:
return None
while True:
response = await self.on_response
success, value = response_handler(response)
print(success, value)
if success:
return value
ここで何が起こっているか (そうでresponse_handler
ない場合) は、サブコマンドが Web サービス ( ) からの応答を待機し、処理できる最初の応答から処理された値を返すNone
コルーチンとして動作していることです。self.on_response == CommunicationService.received_response
CommunicationService
完全にモックされたテストケースを作成して、CLI の動作をテストしようとしています。偽物Subject
が作成され( として機能できますObservable
)、CommunicationService.received_response
モックされて返されます。テストの一環として、サブジェクトのon_next
メソッドが呼び出され、モック Web サービスの応答が本番コードに返されます。
@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
context.mock_received_response_subject.on_next(context.text)
CLI 呼び出しの最後に呼び出され、コルーチン (サブコマンド) が完了するまでブロックするクリックの「結果コールバック」関数を使用します。
@cli.resultcallback()
def _handle_command_task(task: Coroutine, **_) -> None:
if task:
loop = asyncio.get_event_loop()
result = loop.run_until_complete(task)
loop.close()
print('RESULT:', result)
問題
テストの開始時に、CliRunner.invoke
シバン全体を発射するために走ります。問題は、これがブロッキング呼び出しであり、CLI が終了して結果を返すまでスレッドをブロックすることです。これは、テスト スレッドを続行する必要がある場合には役に立ちません。これにより、テスト スレッドと同時に Web サービスの応答のモックを生成できます。
私がする必要があると思うのはCliRunner.invoke
、を使用して新しいスレッドで実行することThreadPoolExecutor
です。これにより、テスト ロジックを元のスレッドで続行し、@when
上記のステップを実行できます。ただし、で発行された通知は mock_received_response_subject.on_next
、実行をトリガーしてサブコマンド内で続行するようには見えません。
解決策には RxPY のAsyncIOScheduler
.
SSCCE
以下のスニペットは、問題の本質であることを願っています。動作するように変更できる場合は、同じソリューションを実際のコードに適用して、希望どおりに動作させることができるはずです。
import asyncio
import logging
import sys
import time
import click
from click.testing import CliRunner
from rx.subjects import Subject
web_response_subject = Subject()
web_response_observable = web_response_subject.as_observable()
thread_loop = asyncio.new_event_loop()
@click.group()
def cli():
asyncio.set_event_loop(thread_loop)
@cli.resultcallback()
def result_handler(task, **_):
loop = asyncio.get_event_loop()
result = loop.run_until_complete(task) # Should block until subject publishes value
loop.close()
print(result)
@cli.command()
async def get_web_response():
return await web_response_observable
def test():
runner = CliRunner()
future = thread_loop.run_in_executor(None, runner.invoke, cli, ['get_web_response'])
time.sleep(1)
web_response_subject.on_next('foo') # Simulate reception of web response.
time.sleep(1)
result = future.result()
print(result.output)
logging.basicConfig(
level=logging.DEBUG,
format='%(threadName)10s %(name)18s: %(message)s',
stream=sys.stderr,
)
test()
現在の動作
実行時にプログラムがハングし、 でブロックされresult = loop.run_until_complete(task)
ます。
合否基準
プログラムが終了し、 に出力foo
されstdout
ます。
更新 1
Vincent の助けに基づいて、コードにいくつかの変更を加えました。
Relay.enabled
(Web サービスからの応答を処理するために待機するサブコマンド) は、次のように実装されています。
async def enabled(self, request: str, response_handler: Callable[[str], Tuple[bool, str]]) -> None:
self._generate_request(request)
if response_handler is None:
return None
return await self.on_response \
.select(response_handler) \
.where(lambda result, i: result[0]) \
.select(lambda result, index: result[1]) \
.first()
await
オブザーバブルでどのように動作するかはよくわかりませんでしたRxPY
-生成された各要素で呼び出し元に実行を返すか、オブザーバブルが完了した(またはエラーになった?)場合にのみ実行します。正直なところ、後者の方がより自然な選択のように感じられ、この関数の実装をよりエレガントで反応的に感じさせることができました。
モック Web サービス応答を生成するテスト ステップも変更しました。
@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
loop = asyncio.get_event_loop()
loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)
残念ながら、CLI は独自のスレッドで呼び出されているため、これはそのままでは機能しません...
@when('the CLI is run with "{arguments}"')
def step_impl(context, arguments):
loop = asyncio.get_event_loop()
if 'async.cli' in context.tags:
context.async_result = loop.run_in_executor(None, context.cli_runner.invoke, testcube.cli, arguments.split())
else:
...
また、CLI は、呼び出されると独自のスレッド プライベート イベント ループを作成します...
def cli(context, hostname, port):
_initialize_logging(context.meta['click_log.core.logger']['level'])
# Create a new event loop for processing commands asynchronously on.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
...
私が必要だと思うのは、テスト ステップが新しいスレッドで CLI を呼び出し、使用しているイベント ループをフェッチできるようにする方法です。
@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
loop = _get_cli_event_loop() # Needs to be implemented.
loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)
更新 2
特定のスレッドがそれ自体で作成して使用するイベント ループを取得する簡単な方法はないようです。そのため、代わりに、Victor のアドバイスを受けasyncio.new_event_loop
て、テスト コードが作成して保存するイベント ループを返すようにモックしました。
def _apply_mock_event_loop_patch(context):
# Close any already-existing exit stacks.
if hasattr(context, 'mock_event_loop_exit_stack'):
context.mock_event_loop_exit_stack.close()
context.test_loop = asyncio.new_event_loop()
print(context.test_loop)
context.mock_event_loop_exit_stack = ExitStack()
context.mock_event_loop_exit_stack.enter_context(
patch.object(asyncio, 'new_event_loop', spec=True, return_value=context.test_loop))
'mock web response received' テスト ステップを次のように変更します。
@when('the communications service receives a response from TestCube Web Service')
def step_impl(context):
loop = context.test_loop
loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, context.text)
Relay.enabled
素晴らしいニュースは、このステップが実行されると、実際にコルーチンがトリガーされることです!
現在の唯一の問題は、CLI を独自のスレッドで実行することで得られる将来を待ち、CLI がこれを送信していることを検証する最終テスト ステップですstdout
。
@then('the CLI should print "{output}"')
def step_impl(context, output):
if 'async.cli' in context.tags:
loop = asyncio.get_event_loop() # main loop, not test loop
result = loop.run_until_complete(context.async_result)
else:
result = context.result
assert_that(result.output, equal_to(output))
私はこれをいじってみましたが、うまく移行して結果を返すことができないcontext.async_result
ようloop.run_in_executor
ですdone
。現在の実装では、最初のテスト ( ) でエラーが発生し、2 番目のテスト ( 1.1
) で無期限にハングし1.2
ます。
@mock.comms @async.cli @wip
Scenario Outline: Querying relay enable state -- @1.1 # testcube/tests/features/relay.feature:45
When the user queries the enable state of relay 0 # testcube/tests/features/steps/relay.py:17 0.003s
Then the CLI should query the web service about the enable state of relay 0 # testcube/tests/features/steps/relay.py:48 0.000s
When the communications service receives a response from TestCube Web Service # testcube/tests/features/steps/core.py:58 0.000s
"""
{'module':'relays','path':'relays[0].enabled','data':[True]}'
"""
Then the CLI should print "True" # testcube/tests/features/steps/core.py:94 0.003s
Traceback (most recent call last):
File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/behave/model.py", line 1456, in run
match.run(runner.context)
File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/behave/model.py", line 1903, in run
self.func(context, *args, **kwargs)
File "testcube/tests/features/steps/core.py", line 99, in step_impl
result = loop.run_until_complete(context.async_result)
File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
return future.result()
File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/concurrent/futures/thread.py", line 55, in run
result = self.fn(*self.args, **self.kwargs)
File "/Users/davidfallah/testcube_env/lib/python3.5/site-packages/click/testing.py", line 299, in invoke
output = out.getvalue()
ValueError: I/O operation on closed file.
Captured stdout:
RECEIVED WEB RESPONSE: {'module':'relays','path':'relays[0].enabled','data':[True]}'
<Future pending cb=[_chain_future.<locals>._call_check_cancel() at /usr/local/Cellar/python3/3.5.2_1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py:431]>
@mock.comms @async.cli @wip
Scenario Outline: Querying relay enable state -- @1.2 # testcube/tests/features/relay.feature:46
When the user queries the enable state of relay 1 # testcube/tests/features/steps/relay.py:17 0.005s
Then the CLI should query the web service about the enable state of relay 1 # testcube/tests/features/steps/relay.py:48 0.001s
When the communications service receives a response from TestCube Web Service # testcube/tests/features/steps/core.py:58 0.000s
"""
{'module':'relays','path':'relays[1].enabled','data':[False]}'
"""
RECEIVED WEB RESPONSE: {'module':'relays','path':'relays[1].enabled','data':[False]}'
Then the CLI should print "False" # testcube/tests/features/steps/core.py:94
第3章:フィナーレ
このすべての非同期マルチスレッドのものを台無しにしてください。私はそれにはあまりにも愚かです。
まず、このようなシナリオを説明する代わりに...
When the user queries the enable state of relay <relay_id>
Then the CLI should query the web service about the enable state of relay <relay_id>
When the communications service receives a response from TestCube Web Service:
"""
{"module":"relays","path":"relays[<relay_id>].enabled","data":[<relay_enabled>]}
"""
Then the CLI should print "<relay_enabled>"
次のように説明します。
Given the communications service will respond to requests:
"""
{"module":"relays","path":"relays[<relay_id>].enabled","data":[<relay_enabled>]}
"""
When the user queries the enable state of relay <relay_id>
Then the CLI should query the web service about the enable state of relay <relay_id>
And the CLI should print "<relay_enabled>"
新しい特定のステップを実装します。
@given('the communications service will respond to requests')
def step_impl(context):
response = context.text
def publish_mock_response(_):
loop = context.test_loop
loop.call_soon_threadsafe(context.mock_received_response_subject.on_next, response)
# Configure the mock comms service to publish a mock response when a request is made.
instance = context.mock_comms.return_value
instance.send_request.on_next.side_effect = publish_mock_response
ブーム
2 features passed, 0 failed, 0 skipped
22 scenarios passed, 0 failed, 0 skipped
58 steps passed, 0 failed, 0 skipped, 0 undefined
Took 0m0.111s