2

ネットワークの外部にアクセスするには、認証されたHTTPプロキシを介して接続する必要があるネットワークを使用しています。私がする必要があるのは、基本的にsocketインターネットに接続するための(または同等の)ものを作成することですが、すべてのデータを直接送信するのではなく、プロキシを介して送信します。これをどのように行うことができるかについてのアイデアはありますか?

4

2 に答える 2

3

使用できる実際のモジュールやその他のコードが見つからなかったため、プロキシを介して接続する独自の関数を作成することになりました。

def http_proxy_connect(address, proxy = None, auth = None, headers = {}):
  """
  Establish a socket connection through an HTTP proxy.

  Arguments:
    address (required)     = The address of the target
    proxy (def: None)      = The address of the proxy server
    auth (def: None)       = A tuple of the username and password used for authentication
    headers (def: {})      = A set of headers that will be sent to the proxy

  Returns:
    A 3-tuple of the format:
      (socket, status_code, headers)
    Where `socket' is the socket object, `status_code` is the HTTP status code that the server
     returned and `headers` is a dict of headers that the server returned.
  """
  import socket
  import base64

  def valid_address(addr):
    """ Verify that an IP/port tuple is valid """
    return isinstance(addr, (list, tuple)) and len(addr) == 2 and isinstance(addr[0], str) and isinstance(addr[1], (int, long))

  if not valid_address(address):
    raise ValueError('Invalid target address')

  if proxy == None:
    s = socket.socket()
    s.connect(address)
    return s, 0, {}

  if not valid_address(proxy):
    raise ValueError('Invalid proxy address')

  headers = {
    'host': address[0]
  }

  if auth != None:
    if isinstance(auth, str):
      headers['proxy-authorization'] = auth
    elif auth and isinstance(auth, (tuple, list)):
      if len(auth) == 1:
        raise ValueError('Invalid authentication specification')

      t = auth[0]
      args = auth[1:]

      if t.lower() == 'basic' and len(args) == 2:
        headers['proxy-authorization'] = 'Basic ' + base64.b64encode('%s:%s' % args)
      else:
        raise ValueError('Invalid authentication specification')
    else:
      raise ValueError('Invalid authentication specification')

  s = socket.socket()
  s.connect(proxy)
  fp = s.makefile('r+')

  fp.write('CONNECT %s:%d HTTP/1.0\r\n' % address)
  fp.write('\r\n'.join('%s: %s' % (k, v) for (k, v) in headers.items()) + '\r\n\r\n')
  fp.flush()

  statusline = fp.readline().rstrip('\r\n')

  if statusline.count(' ') < 2:
    fp.close()
    s.close()
    raise IOError('Bad response')
  version, status, statusmsg = statusline.split(' ', 2)
  if not version in ('HTTP/1.0', 'HTTP/1.1'):
    fp.close()
    s.close()
    raise IOError('Unsupported HTTP version')
  try:
    status = int(status)
  except ValueError:
    fp.close()
    s.close()
    raise IOError('Bad response')

  response_headers = {}

  while True:
    tl = ''
    l = fp.readline().rstrip('\r\n')
    if l == '':
      break
    if not ':' in l:
      continue
    k, v = l.split(':', 1)
    response_headers[k.strip().lower()] = v.strip()

  fp.close()
  return (s, status, response_headers)
于 2012-11-22T15:33:21.620 に答える
1

これが大いに役立つかどうかはわかりませんが、 pycurlを調べましたか?これは、ユーザー名/パスワード認証システムを提供するプロキシに接続するのに役立つ場合があります(これこれを参照)

于 2012-11-20T19:06:57.907 に答える