Python Manager への接続の成功/失敗を待つことができないアプリケーションを実行しています。クライアント アプリケーションは、実行中と思われるサーバーに何らかの情報を送信しようとする必要があり、失敗した場合は別の手段が取られます。問題は、サーバーがダウンしている場合は常に、接続がクライアント アプリケーションに制御を返すのに多くの時間がかかることです。他にやるべきことがあるために、接続を待つ時間を無駄にすることはできません。
中間オブジェクトが接続を担当するスキームを思いつきましたが、それは一度しか機能しません。初めて、サーバーへの接続がまだない場合、この中間オブジェクトはクライアント アプリケーションをブロックすることなく接続部分を処理するとします。なんらかの理由でサーバーがダウンして再び戻ってきた場合、それ以上機能させることはできません。
次のサーバーがあるとします。
# server.py
from multiprocessing import Queue, managers
from multiprocessing.queues import Empty
import select
import threading
class RServer(object):
def __init__(self, items_buffer):
self.items_buffer = items_buffer
def receive_items(self):
while True:
(_, [], []) = select.select([self.items_buffer._reader], [], [])
while True:
try:
item = self.items_buffer.get(block=False)
# do something with item
print('item received')
except Empty:
break
class SharedObjectsManager(managers.BaseManager):
pass
if __name__ == '__main__':
items_buffer = Queue()
remote_server = RServer(items_buffer)
remote_server_th = threading.Thread(target=remote_server.receive_items)
remote_server_th.start()
SharedObjectsManager.register('items_buffer', callable=lambda: items_buffer)
shared_objects_manager = SharedObjectsManager(address=('localhost', 5001),
authkey=str.encode('my_server'),
serializer='xmlrpclib')
s = shared_objects_manager.get_server()
s.serve_forever()
接続を処理する中間オブジェクトは次のとおりです。
# bridge.py
from multiprocessing.managers import BaseManager
import threading
import socket
class ConnectionManager():
def __init__(self):
self.remote_manager = BaseManager(address=('localhost', 5001),
authkey=b'my_server',
serializer='xmlrpclib')
self.remote_manager.register('items_buffer')
self.items_buffer = None
self.items_buffer_lock = threading.Lock()
self.connecting = False
self.connecting_lock = threading.Lock()
self.connection_started_condition = threading.Condition()
def transmit_item(self, item):
try:
with self.items_buffer_lock:
self.items_buffer.put(item)
except (AttributeError, EOFError, IOError):
with self.connection_started_condition:
with self.connecting_lock:
if not self.connecting:
self.connecting = True
connect_th = threading.Thread(target=self.connect_to_server,
name='Client Connect')
connect_th.start()
self.connection_started_condition.notify()
raise ConnectionError('Connection Error')
def connect_to_server(self):
with self.connection_started_condition:
self.connection_started_condition.wait()
try:
self.remote_manager.connect()
except socket.error:
pass
else:
try:
with self.items_buffer_lock:
self.items_buffer = self.remote_manager.items_buffer()
except (AssertionError, socket.error):
pass
with self.connecting_lock:
self.connecting = False
class ConnectionError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
最後に、クライアント アプリケーション:
# client.py
import time
from bridge import ConnectionManager, ConnectionError
remote_buffer = ConnectionManager()
while True:
try:
remote_buffer.transmit_item({'rubish': None})
print('item sent')
except ConnectionError:
# do something else
print('item not sent')
# do other stuff
print('doing other stuff')
time.sleep(15)
私は確かにスレッドで何か間違ったことをしていますが、それを理解することはできません. 何か案が?