1

py_pushoverというPushover APIのラッパーを作成しています。機能の 1 つは、 websocketを使用してサーバーに直接接続できることです。非同期コードを「テスト」しようとすると、ちょっとしたトラブルに遭遇しました。以下を実行します。

import py_pushover as py_po
from tests.helpers import app_key, device_id, secret
import time


def print_msg(messages):
    print(messages)

if __name__ == "__main__":
    cm = py_po.client.ClientManager(app_key, secret=secret, device_id=device_id)
    cm.listen_async(print_msg)

    for i in range(10):
        time.sleep(3)
        print("Meh")

    cm.stop_listening()
    cm.clear_server_messages()

希望どおりに動作します。ただし、「コールバック」関数をテスト スイートにラップすると、次のようになります。

class TestMessage(unittest.TestCase):
    def setUp(self):
        self.pm = py_po.message.MessageManager(app_key, user_key)
        self.client = py_po.client.ClientManager(app_key, secret=secret, device_id=device_id)
        self.cleanUpClient()
        self.client.listen_async(self.client_message_receieved)  # <--- BREAKS HERE!

    def tearDown(self):
        self.client.stop_listening()
        self.cleanUpClient()

    def cleanUpClient(self):
        self.client.retrieve_message()
        for msg in self.client.messages:
            if msg['priority'] >= py_po.PRIORITIES.EMERGENCY and msg['acked'] != 1:
                self.client.acknowledge_message(msg['receipt'])
        self.client.clear_server_messages()

        self.client.retrieve_message()
        self.assertEquals(len(self.client.messages), 0)

    def client_message_receieved(self, messages):
        self.stored_messages = messages

    def test_val_msg(self):

        # Testing a normal push message
        send_message = 'Testing normal push'
        self.pm.push_message(send_message, device='test_device')
        # self.client.retrieve_message()

        self.assertEquals(send_message, self.stored_messages[0]['message'])

次のエラーが表示されます。

TypeError: cannot serialize '_io.TextIOWrapper' object

これはピクルス化エラーが原因であると想定していますが、問題がどこから来ているのかよくわかりません. コールバック関数を呼び出すマルチプロセッシング コードをテストできるテスト スイートをセットアップするにはどうすればよいですか?

私のクライアントコードは次のとおりです(目障りにならないようにスリム化されています):

import websocket
import logging
from multiprocessing import Process, Pipe

from py_pushover import BaseManager, send, base_url

logging.basicConfig(filename='client.log', level=logging.INFO)

class ClientManager(BaseManager):
    """
    Manages the interface between the Pushover Servers and user.  This can be instantiated with or without the user
    secret and device id.  If no secret is provided, the user MUST login before interfacing with the Pushover servers.
    If no device id is provided, the user MUST register this client as a device before interfacing with the Pushover
    servers.
    """
    _login_url = base_url + "users/login.json"
    _register_device_url = base_url + "devices.json"
    _message_url = base_url + "messages.json"
    _del_message_url = base_url + "devices/{device_id}/update_highest_message.json"
    _ack_message_url = base_url + "receipts/{receipt_id}/acknowledge.json"
    _ws_connect_url = "wss://client.pushover.net/push"
    _ws_login = "login:{device_id}:{secret}\n"

    def __init__(self, app_token, secret=None, device_id=None):
        """
        :param str app_token: application id from Pushover API
        :param str secret: (Optional) user secret given after validation of login
        :param str device_id: (Optional) device id of this client
        :return:
        """
        super(ClientManager, self).__init__(app_token)
        self.__secret__ = secret
        self.__device_id__ = device_id
        self.messages = []
        self._ws_app = websocket.WebSocketApp(
            self._ws_connect_url,
            on_open=self._on_ws_open,
            on_message=self._on_ws_message,
            on_error=self._on_ws_error,
            on_close=self._on_ws_close
        )
        self.__on_msg_receipt__ = None
        self.__p__ = Process()

    def listen(self, on_msg_receipt):
        """
        Listens for messages from the server.  When a message is received, a call to the on_msg_receipt function with a
          single parameter representing the messages received.

        :param on_msg_receipt: function to call when a message is received
        """
        self.__on_msg_receipt__ = on_msg_receipt
        self._ws_app.run_forever()

    def listen_async(self, on_msg_receipt):
        """
        Creates a Process for listening to the Pushover server for new messages.  This process then listens for messages
          from the server.  When a message is received, a call to the on_msg_receipt function with a single parameter
          representing the messages received.

        :param on_msg_receipt: function to call when a message is received
        """
        self.__p__ = Process(target=self.listen, args=(on_msg_receipt,))
        self.__p__.start()  # <-- BREAKS HERE!

    def stop_listening(self):
        """
        Stops the listening process from accepting any more messages.
        """
        if self.__p__:
            self.__p__.terminate()
            self.__p__ = None

    def _on_ws_open(self, ws):
        """
        Function used when the websocket is opened for the first time.

        :param ws: the websocket
        """
        logging.info("Opening connection to Pushover server...")
        ws.send(self._ws_login.format(device_id=self.__device_id__, secret=self.__secret__))
        logging.info("----Server Connection Established----")

    def _on_ws_message(self, ws, message):
        """
        Function used for when the websocket recieves a message.  Per the Pushover API guidelines 1 of 4 responses
        will be sent:

            1. `#` - Keep-alive packet, no response needed.
            2. `!` - A new message has arrived; you should perform a sync.
            3. `R` - Reload request; you should drop your connection and re-connect.
            4. `E` - Error; a permanent problem occured and you should not automatically re-connect.
                     Prompt the user to login again or re-enable the device.

        :param ws: the websocket
        :param message: message received from remote server
        """
        message = message.decode("utf-8")
        logging.debug("Message received: " + message)
        if message == "#":
            pass

        elif message == "!":
            self.retrieve_message()
            if self.__on_msg_receipt__:
                self.__on_msg_receipt__(self.messages)

        elif message == "R":
            logging.info("Reconnecting to server (requested from server)...")
            ws.close()
            self.listen(self.__on_msg_receipt__)

        elif message == "E":
            logging.error("Server connection failure!")

        else:  # message isn't of the type expected.  Raise an error.
            raise NotImplementedError  #todo Implement an appropriate exception

    def _on_ws_error(self, ws, error):
        """
        Function used when the websocket encounters an error.  The error is logged

        :param ws: the websocket
        :param error: the error encountered
        """
        logging.error('Error: ' + error)

    def _on_ws_close(self, ws):
        """
        Function used when the websocket closes the connection to the remote server.

        :param ws: the websocket
        """
        logging.info("----Server Connection Closed----")
        self._ws_app = None

注: 私は自分のpy_pushoverモジュールを宣伝しようとしているわけではありませんが、より多くのコードを調べる必要がある場合に備えて、ここにリンクしています。

4

0 に答える 0