2

次のコードを使用して、ssl サーバーで ssl ハンドシェイクと証明書の検証を実行しています。

import ssl
import socket

s = socket.socket()
print "connecting..."
#logging.debug("Connecting")
# Connect with SSL mutual authentication
# We only trust our server's CA, and it only trusts user certificates signed by it
c = ssl.wrap_socket(s, cert_reqs=ssl.CERT_REQUIRED,
                    ssl_version=ssl.PROTOCOL_SSLv3, ca_certs='ca.crt',
                    certfile='user.crt', keyfile='user.key')
c.connect((constants.server_addr, constants.port))

サーバーに接続でき、証明書は正しく検証されていますが、ここから何をすればよいかわかりません。XML を REST API にポストするなど、ソケットを介して https アクションを実行する必要があります。どうすればいいですか?

4

3 に答える 3

1

この回答で説明されているように、wrap_socketコードを使用して拡張できます。httplib.HTTPConnection

(以前の質問で既に回答したので、PycURL のようなものを使用することを検討します。)

于 2012-05-17T18:10:15.147 に答える
0

それがまさに私のプロジェクトで行っていることです。これは、私のプロジェクトで使用した REST クライアント モジュールです。私のニーズに合わせて変更されていますが、あなたにも役立つと思います。httplib2 が必要です: http://pypi.python.org/pypi/httplib2

"""
    client.py
    ---------

    Modified to allow validation server's certificate with external cacert list.
    -- Arif Widi Nugroho <arif@sainsmograf.com>

    Copyright (C) 2008 Benjamin O'Steen

    This file is part of python-fedoracommons.

    python-fedoracommons is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    python-fedoracommons is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with python-fedoracommons.  If not, see <http://www.gnu.org/licenses/>.
"""

__license__ = 'GPL http://www.gnu.org/licenses/gpl.txt'
__author__ = "Benjamin O'Steen <bosteen@gmail.com>, Arif Widi Nugroho <arif@sainsmograf.com>"
__version__ = '0.1'

import httplib2
import urlparse
import urllib
import base64
from base64 import encodestring

from mime_types import *

import mimetypes

from cStringIO import StringIO

class Connection(object):
    def __init__(self, base_url, username=None, password=None, cache=None, ca_certs=None, user_agent_name=None):
        self.base_url = base_url
        self.username = username
        m = MimeTypes()
        self.mimetypes = m.get_dictionary()

        self.url = urlparse.urlparse(base_url)

        (scheme, netloc, path, query, fragment) = urlparse.urlsplit(base_url)

        self.scheme = scheme
        self.host = netloc
        self.path = path

        if user_agent_name is None:
            self.user_agent_name = 'Basic Agent'
        else:
            self.user_agent_name = user_agent_name

        # Create Http class with support for Digest HTTP Authentication, if necessary
        # self.h = httplib2.Http(".cache")
        self.h = httplib2.Http(cache=cache, ca_certs=ca_certs)
        self.h.follow_all_redirects = True
        if username and password:
            self.h.add_credentials(username, password)

    def request_get(self, resource, args = None, headers={}):
        return self.request(resource, "get", args, headers=headers)

    def request_delete(self, resource, args = None, headers={}):
        return self.request(resource, "delete", args, headers=headers)

    def request_head(self, resource, args = None, headers={}):
        return self.request(resource, "head", args, headers=headers)

    def request_post(self, resource, args = None, body = None, filename=None, headers={}):
        return self.request(resource, "post", args , body = body, filename=filename, headers=headers)

    def request_put(self, resource, args = None, body = None, filename=None, headers={}):
        return self.request(resource, "put", args , body = body, filename=filename, headers=headers)

    def get_content_type(self, filename):
        extension = filename.split('.')[-1]
        guessed_mimetype = self.mimetypes.get(extension, mimetypes.guess_type(filename)[0])
        return guessed_mimetype or 'application/octet-stream'

    def request(self, resource, method = "get", args = None, body = None, filename=None, headers={}):
        params = None
        path = resource
        headers['User-Agent'] = self.user_agent_name

        BOUNDARY = u'00hoYUXOnLD5RQ8SKGYVgLLt64jejnMwtO7q8XE1'
        CRLF = u'\r\n'

        if filename and body:
            #fn = open(filename ,'r')
            #chunks = fn.read()
            #fn.close()

            # Attempt to find the Mimetype
            content_type = self.get_content_type(filename)
            headers['Content-Type']='multipart/form-data; boundary='+BOUNDARY
            encode_string = StringIO()
            encode_string.write(CRLF)
            encode_string.write(u'--' + BOUNDARY + CRLF)
            encode_string.write(u'Content-Disposition: form-data; name="file"; filename="%s"' % filename)
            encode_string.write(CRLF)
            encode_string.write(u'Content-Type: %s' % content_type + CRLF)
            encode_string.write(CRLF)
            encode_string.write(body)
            encode_string.write(CRLF)
            encode_string.write(u'--' + BOUNDARY + u'--' + CRLF)

            body = encode_string.getvalue()
            headers['Content-Length'] = str(len(body))
        elif body:
            if not headers.get('Content-Type', None):
                headers['Content-Type']='text/xml'
            headers['Content-Length'] = str(len(body))        
        else: 
            headers['Content-Type']='text/xml'

        if method.upper() == 'POST':
            headers['Content-Type']='application/x-www-form-urlencoded'

        if args:
            path += u"?" + urllib.urlencode(args)

        request_path = []
        if self.path != "/":
            if self.path.endswith('/'):
                request_path.append(self.path[:-1])
            else:
                request_path.append(self.path)
            if path.startswith('/'):
                request_path.append(path[1:])
            else:
                request_path.append(path)

        resp, content = self.h.request(u"%s://%s%s" % (self.scheme, self.host, u'/'.join(request_path)), method.upper(), body=body, headers=headers )

        return {u'headers':resp, u'body':content.decode('UTF-8')}

使用例 (サーバー証明書が指定された ca によって署名されていない場合、接続は失敗します):

c = client.Connection('https://localhost:8000', certs='/path/to/cacert.pem')
# now post some data to the server
response = c.request_post('rest/path/', body=some_urlencoded_data)
if response['headers']['status'] == '200':
    # do something...
于 2012-05-18T07:42:11.800 に答える
0

http://docs.python.org/library/urllib2.html#urllib2.urlopenから開始することをお勧めしurllib2.urlopenます。

socketこれにより、https URL、フェッチ、POST などを処理できます。低レベルまたはsslオブジェクトで直接作業する必要はありません。ただし、Python 2.x を使用している場合、HTTPS 接続はサーバー側の証明書の検証を行いません。これは必要なようです (それは良いことです)。ただし、 Python 3urllibはそれを行います。

Python 2 を使用している場合は、いくつかのオプションがあります。urllib2.HTTPSHandler1 つは、ソケットで適切な検証を行うようにサブクラス化することです。もう 1 つは、必要な HTTP プロトコル ビットを自分で実装することです (推奨されません)。urllib2また、さまざまなオブジェクトとオブジェクトを通常どおりインスタンス化してからhttplib、既に認証されている ssl ソケットを、それらが使用しているソケットの代わりに割り当てることもできますが、それらの状態が台無しにならないように十分に注意する必要があります。ただし、標準ライブラリのソース コードは非常に読みやすく、このような調整を行う必要がある場合に備えています。

于 2012-05-17T17:59:04.213 に答える