次のコードにはgitからのリクエストが必要です(特にrequests.packages.urllib3.poolmanager.PoolManager._new_pool()
)
私はそれを使用してテストしましたncat -v -l 127.0.0.1 8000
問題は、接続がurllib3ではなく、標準ライブラリのhttplibによって開かれるという事実です。
import socket
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3 import PoolManager, HTTPConnectionPool
try:
from http.client import HTTPConnection
except ImportError:
from httplib import HTTPConnection
class MyAdapter(HTTPAdapter):
def init_poolmanager(self, connections, maxsize):
self.poolmanager = MyPoolManager(num_pools=connections,
maxsize=maxsize)
class MyPoolManager(PoolManager):
def _new_pool(self, scheme, host, port):
# Important!
if scheme == 'http' and host == my_host and port == my_port:
return MyHTTPConnectionPool(host, port, **self.connection_pool_kw)
return super(PoolManager, self)._new_pool(self, scheme, host, port)
class MyHTTPConnectionPool(HTTPConnectionPool):
def _new_conn(self):
self.num_connections += 1
return MyHTTPConnection(host=self.host,
port=self.port,
strict=self.strict)
class MyHTTPConnection(HTTPConnection):
def connect(self):
"""Connect to the host and port specified in __init__."""
# Original
# self.sock = socket.create_connection((self.host, self.port),
# self.timeout, self.source_address)
# Important!
self.sock = my_socket
if self._tunnel_host:
self._tunnel()
if __name__ == '__main__':
import time
my_host = '127.0.0.1'
my_port = 8000
my_socket = socket.create_connection((my_host, my_port))
time.sleep(4)
s = requests.Session()
s.mount('http://', MyAdapter())
s.get('http://127.0.0.1:8000/foo')
編集:
または、接続プールの直接モンキーパッチ:
class MyHTTPConnection(HTTPConnection):
def connect(self):
self.sock = my_socket
if self._tunnel_host:
self._tunnel()
requests.packages.urllib3.connectionpool.HTTPConnection = MyHTTPConnection
if __name__ == '__main__':
my_host = '127.0.0.1'
my_port = 8000
my_socket = socket.create_connection((my_host, my_port))
requests.get('http://127.0.0.1:8000/foo')