エラーが発生した場合に備えて、同時HTTPリクエストをサーバーに送信する必要があるコードをいくつか書いています。再試行する必要があります。実行していると、送信が速くなり、数分後に送信速度が 0 に低下します。何かがロックされて解放されず、送信が停止するようです。
データを送信する関数はこれです
@asyncio.coroutine
def handle_line(self, line, destination):
while True:
response = yield from aiohttp.request('POST', destination,
data=line,
headers=headers,
connector=self.tcp_conn)
if response.status == 200:
self._succeeded += 1
if self._succeeded % 20000 == 0 and self._succeeded != 0:
logger.debug("Sent {} events from file".format(self._succeeded))
self.update_dynamo_counter(destination)
break
elif response.status != 200:
logger.debug("Error sending data. Status: {}".format(response.status))
continue
return (yield from response.read())
コネクタは初期化されています
tcp_conn = aiohttp.TCPConnector(limit=100)
send 関数は、800 個のタスクをリストに追加することによって呼び出され、それらが io を詰まらせないように待機しています。
lines_to_send.append(asyncio.async(self.handle_line(json.dumps(data), destination)))
if len(lines_to_send) % 800 == 0:
yield from asyncio.wait(lines_to_send)
lines_to_send = []
私はそれをデバッグしたり、問題の原因を突き止めたりするのに問題があります。誰も手がかりを持っていますか?