24

の応答から SSL 証明書を取得しようとしていrequestsます。

これを行う良い方法は何ですか?

4

7 に答える 7

0

以前の非常に良い回答に基づいた、よりクリーンな(-ish)ソリューション!

  1. HTTPResponse クラスをオーバーライドする前に requests.Adapter ソース ファイルにパッチを適用する必要があります (保留中のプル リクエスト: https://github.com/psf/requests/pull/6039 ):
    • 静的クラス変数をクラス HTTPAdapter(BaseAdapter)に追加: _clsHTTPResponse = HTTPResponse
    • HTTPResponse オブジェクトを直接作成するのではなく、_clsHTTPResponse を使用 するように send()メソッドを変更します: resp = _clsHTTPResponse.from_httplib(...
  2. このコードを使用してください:
"""
Subclassing HTTP / requests to get peer_certificate back from lower levels
"""
from typing import Optional, Mapping, Any
from http.client import HTTPSConnection
from requests.adapters import HTTPAdapter, DEFAULT_POOLBLOCK
from urllib3.poolmanager import PoolManager,key_fn_by_scheme
from urllib3.connectionpool import HTTPSConnectionPool,HTTPConnectionPool
from urllib3.connection import HTTPSConnection,HTTPConnection
from urllib3.response import HTTPResponse as URLLIB3_HTTPResponse

#force urllib3 to use pyopenssl
import urllib3.contrib.pyopenssl
urllib3.contrib.pyopenssl.inject_into_urllib3()  

class HTTPSConnection_withcert(HTTPSConnection):
    def __init__(self, *args, **kw):
        self.peer_certificate = None
        super().__init__(*args, **kw)
    def connect(self):
        res = super().connect() 
        self.peer_certificate = self.sock.connection.get_peer_certificate()
        return res

class HTTPResponse_withcert(URLLIB3_HTTPResponse):
    def __init__(self, *args, **kwargs):
        self.peer_certificate = None
        res = super().__init__( *args, **kwargs)
        self.peer_certificate = self._connection.peer_certificate
        return res
       
class HTTPSConnectionPool_withcert(HTTPSConnectionPool):
    ConnectionCls   = HTTPSConnection_withcert
    ResponseCls     = HTTPResponse_withcert
    
class PoolManager_withcert(PoolManager): 
    def __init__(
        self,
        num_pools: int = 10,
        headers: Optional[Mapping[str, str]] = None,
        **connection_pool_kw: Any,
    ) -> None:   
        super().__init__(num_pools,headers,**connection_pool_kw)
        self.pool_classes_by_scheme = {"http": HTTPConnectionPool, "https": HTTPSConnectionPool_withcert}
        self.key_fn_by_scheme = key_fn_by_scheme.copy()
                
class HTTPAdapter_withcert(HTTPAdapter):
    _clsHTTPResponse = HTTPResponse_withcert
    def build_response(self, request, resp):
        response = super().build_response( request, resp)
        response.peer_certificate = resp.peer_certificate
        return response

    def init_poolmanager(self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs):
        #do not call super() to not initialize PoolManager twice
        # save these values for pickling
        self._pool_connections  = connections
        self._pool_maxsize      = maxsize
        self._pool_block        = block

        self.poolmanager        = PoolManager_withcert(num_pools=connections, 
                                                   maxsize=maxsize,
                                                   block=block, 
                                                   strict=True, 
                                                   **pool_kwargs)
class Session_withcert(Session):
    def __init__(self):
        super().__init__()
        self.mount('https://', HTTPAdapter_withcert())
  1. 以上です!新しいセッション Session_withcert() をベースのセッションと同様に使用できるようになりましたが、次のこともできます。
ss= Session_withcert()
resp=ss.get("https://www.google.fr")
resp.peer_certificate.get_subject()
print(resp.peer_certificate.get_subject())

出力は次のとおりです。

<X509Name object '/CN=*.google.fr'>
于 2022-01-09T14:15:20.490 に答える