チャット コンシューマー向けのテストを作成する過程で、WebSocketCommunicator を使用してテストで認証できないという問題に遭遇しました。私が知っているように、承認ヘッダーを使用した適切な認証はまだ不可能であるため、リクエストクエリでトークンを使用してソケットで認証を実装するカスタム JwtTokenAuthMiddleware があります。ネット上で見つけられなかったサンプルコードを教えてください。ところで、私のチャットは問題なく動作しています。また、テストは完全に問題ないはずです。公式ドキュメント Django Channels 2.x Testing からガイドを取得しました。
--JwtTokenAuthMiddlewate--
class JwtTokenAuthMiddleware:
def __init__(self, inner):
self.inner = inner
def __call__(self, scope):
close_old_connections()
try:
raw_token = scope['query_string'].decode().split('=')[1]
auth = JWTAuthentication()
validated_token = auth.get_validated_token(raw_token)
user = auth.get_user(validated_token)
scope['user'] = user
except (IndexError, InvalidToken, AuthenticationFailed):
scope['user'] = AnonymousUser()
return self.inner(scope)
JwtTokenAuthMiddlewareStack = lambda inner: JwtTokenAuthMiddleware(AuthMiddlewareStack(inner))
--テスト例--
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_trainer_auth_success():
room = await database_sync_to_async(RoomFactory.create)()
trainer = room.trainer
trainer_token = await sync_to_async(get_token_for_user)(trainer.user)
room_url = f'ws/room/{room.id}/'
trainer_communicator = WebsocketCommunicator(application, f'{room_url}?t={trainer_token}')
connected, _ = await trainer_communicator.connect()
assert connected
trainer_connect_resp = await trainer_communicator.receive_json_from()
assert_connection(trainer_connect_resp, [], room.max_round_time)
await trainer_communicator.disconnect()
--エラーのトレースバック--
___________________________________________________________ test_trainer_auth_success ___________________________________________________________
self = <channels.testing.websocket.WebsocketCommunicator object at 0x7f6b9906f290>, timeout = 1
async def receive_output(self, timeout=1):
"""
Receives a single message from the application, with optional timeout.
"""
# Make sure there's not an exception to raise from the task
if self.future.done():
self.future.result()
# Wait and receive the message
try:
async with async_timeout(timeout):
> return await self.output_queue.get()
/usr/local/lib/python3.7/site-packages/asgiref/testing.py:74:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Queue at 0x7f6b98f76510 maxsize=0>
async def get(self):
"""Remove and return an item from the queue.
If queue is empty, wait until an item is available.
"""
while self.empty():
getter = self._loop.create_future()
self._getters.append(getter)
try:
> await getter
E concurrent.futures._base.CancelledError
/usr/local/lib/python3.7/asyncio/queues.py:159: CancelledError
During handling of the above exception, another exception occurred:
@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_trainer_auth_success():
room = await database_sync_to_async(RoomFactory.create)()
trainer = room.trainer
trainer_token = await sync_to_async(get_token_for_user)(trainer.user)
room_url = f'ws/room/{room.id}/'
# trainer_communicator = await assert_get_connected_communicator(application, room_url, trainer_token)
trainer_communicator = WebsocketCommunicator(application, f'{room_url}?t={trainer_token}')
> connected, _ = await trainer_communicator.connect()
apps/chat/tests/test_consumers.py:39:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.7/site-packages/channels/testing/websocket.py:36: in connect
response = await self.receive_output(timeout)
/usr/local/lib/python3.7/site-packages/asgiref/testing.py:85: in receive_output
raise e
/usr/local/lib/python3.7/site-packages/asgiref/testing.py:74: in receive_output
return await self.output_queue.get()
/usr/local/lib/python3.7/site-packages/asgiref/timeout.py:68: in __aexit__
self._do_exit(exc_type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <asgiref.timeout.timeout object at 0x7f6b98f76d50>, exc_type = <class 'concurrent.futures._base.CancelledError'>
def _do_exit(self, exc_type: Type[BaseException]) -> None:
if exc_type is asyncio.CancelledError and self._cancelled:
self._cancel_handler = None
self._task = None
> raise asyncio.TimeoutError
E concurrent.futures._base.TimeoutError
/usr/local/lib/python3.7/site-packages/asgiref/timeout.py:105: TimeoutError
=============================================================== warnings summary ================================================================
/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:39
/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:39: PytestDeprecationWarning: direct construction of Function has been deprecated, please use Function.from_parent
item = pytest.Function(name, parent=collector)
/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:45
/usr/local/lib/python3.7/site-packages/pytest_asyncio/plugin.py:45: PytestDeprecationWarning: direct construction of Function has been deprecated, please use Function.from_parent
item = pytest.Function(name, parent=collector) # To reload keywords.