[Jython-checkins] jython (merge default -> default): Merge.
frank.wierzbicki
jython-checkins at python.org
Mon Feb 4 18:33:00 CET 2013
http://hg.python.org/jython/rev/73cc6455b2e1
changeset: 6984:73cc6455b2e1
parent: 6983:8e8f4aa4f3cb
parent: 6982:c52aac19c12e
user: Frank Wierzbicki <fwierzbicki at gmail.com>
date: Mon Feb 04 09:32:36 2013 -0800
summary:
Merge.
files:
Lib/ftplib.py | 1047 +++++++++++++++++
Lib/socket.py | 43 +-
Lib/ssl.py | 2 +-
Lib/test/test_socket_ssl.py | 9 +
Lib/test/test_urllib.py | 249 +++-
Lib/test/test_urllib2.py | 103 +-
Lib/test/test_urllib2_localnet.py | 343 ++++-
Lib/urllib.py | 232 +--
8 files changed, 1781 insertions(+), 247 deletions(-)
diff --git a/Lib/ftplib.py b/Lib/ftplib.py
new file mode 100644
--- /dev/null
+++ b/Lib/ftplib.py
@@ -0,0 +1,1047 @@
+"""An FTP client class and some helper functions.
+
+Based on RFC 959: File Transfer Protocol (FTP), by J. Postel and J. Reynolds
+
+Example:
+
+>>> from ftplib import FTP
+>>> ftp = FTP('ftp.python.org') # connect to host, default port
+>>> ftp.login() # default, i.e.: user anonymous, passwd anonymous@
+'230 Guest login ok, access restrictions apply.'
+>>> ftp.retrlines('LIST') # list directory contents
+total 9
+drwxr-xr-x 8 root wheel 1024 Jan 3 1994 .
+drwxr-xr-x 8 root wheel 1024 Jan 3 1994 ..
+drwxr-xr-x 2 root wheel 1024 Jan 3 1994 bin
+drwxr-xr-x 2 root wheel 1024 Jan 3 1994 etc
+d-wxrwxr-x 2 ftp wheel 1024 Sep 5 13:43 incoming
+drwxr-xr-x 2 root wheel 1024 Nov 17 1993 lib
+drwxr-xr-x 6 1094 wheel 1024 Sep 13 19:07 pub
+drwxr-xr-x 3 root wheel 1024 Jan 3 1994 usr
+-rw-r--r-- 1 root root 312 Aug 1 1994 welcome.msg
+'226 Transfer complete.'
+>>> ftp.quit()
+'221 Goodbye.'
+>>>
+
+A nice test that reveals some of the network dialogue would be:
+python ftplib.py -d localhost -l -p -l
+"""
+
+#
+# Changes and improvements suggested by Steve Majewski.
+# Modified by Jack to work on the mac.
+# Modified by Siebren to support docstrings and PASV.
+# Modified by Phil Schwartz to add storbinary and storlines callbacks.
+# Modified by Giampaolo Rodola' to add TLS support.
+#
+
+import os
+import sys
+
+# Import SOCKS module if it exists, else standard socket module socket
+try:
+ import SOCKS; socket = SOCKS; del SOCKS # import SOCKS as socket
+ from socket import getfqdn; socket.getfqdn = getfqdn; del getfqdn
+except ImportError:
+ import socket
+from socket import _GLOBAL_DEFAULT_TIMEOUT
+
+__all__ = ["FTP","Netrc"]
+
+# Magic number from <socket.h>
+MSG_OOB = 0x1 # Process data out of band
+
+
+# The standard FTP server control port
+FTP_PORT = 21
+
+
+# Exception raised when an error or invalid response is received
+class Error(Exception): pass
+class error_reply(Error): pass # unexpected [123]xx reply
+class error_temp(Error): pass # 4xx errors
+class error_perm(Error): pass # 5xx errors
+class error_proto(Error): pass # response does not begin with [1-5]
+
+
+# All exceptions (hopefully) that may be raised here and that aren't
+# (always) programming errors on our side
+all_errors = (Error, IOError, EOFError)
+
+
+# Line terminators (we always output CRLF, but accept any of CRLF, CR, LF)
+CRLF = '\r\n'
+
+# The class itself
+class FTP:
+
+ '''An FTP client class.
+
+ To create a connection, call the class using these arguments:
+ host, user, passwd, acct, timeout
+
+ The first four arguments are all strings, and have default value ''.
+ timeout must be numeric and defaults to None if not passed,
+ meaning that no timeout will be set on any ftp socket(s)
+ If a timeout is passed, then this is now the default timeout for all ftp
+ socket operations for this instance.
+
+ Then use self.connect() with optional host and port argument.
+
+ To download a file, use ftp.retrlines('RETR ' + filename),
+ or ftp.retrbinary() with slightly different arguments.
+ To upload a file, use ftp.storlines() or ftp.storbinary(),
+ which have an open file as argument (see their definitions
+ below for details).
+ The download/upload functions first issue appropriate TYPE
+ and PORT or PASV commands.
+'''
+
+ debugging = 0
+ host = ''
+ port = FTP_PORT
+ sock = None
+ file = None
+ welcome = None
+ passiveserver = 1
+
+ # Initialization method (called by class instantiation).
+ # Initialize host to localhost, port to standard ftp port
+ # Optional arguments are host (for connect()),
+ # and user, passwd, acct (for login())
+ def __init__(self, host='', user='', passwd='', acct='',
+ timeout=_GLOBAL_DEFAULT_TIMEOUT):
+ self.timeout = timeout
+ if host:
+ self.connect(host)
+ if user:
+ self.login(user, passwd, acct)
+
+ def connect(self, host='', port=0, timeout=-999):
+ '''Connect to host. Arguments are:
+ - host: hostname to connect to (string, default previous host)
+ - port: port to connect to (integer, default previous port)
+ '''
+ if host != '':
+ self.host = host
+ if port > 0:
+ self.port = port
+ if timeout != -999:
+ self.timeout = timeout
+ self.sock = socket.create_connection((self.host, self.port), self.timeout)
+ self.af = self.sock.family
+ self.file = self.sock.makefile('rb')
+ self.welcome = self.getresp()
+ return self.welcome
+
+ def getwelcome(self):
+ '''Get the welcome message from the server.
+ (this is read and squirreled away by connect())'''
+ if self.debugging:
+ print '*welcome*', self.sanitize(self.welcome)
+ return self.welcome
+
+ def set_debuglevel(self, level):
+ '''Set the debugging level.
+ The required argument level means:
+ 0: no debugging output (default)
+ 1: print commands and responses but not body text etc.
+ 2: also print raw lines read and sent before stripping CR/LF'''
+ self.debugging = level
+ debug = set_debuglevel
+
+ def set_pasv(self, val):
+ '''Use passive or active mode for data transfers.
+ With a false argument, use the normal PORT mode,
+ With a true argument, use the PASV command.'''
+ self.passiveserver = val
+
+ # Internal: "sanitize" a string for printing
+ def sanitize(self, s):
+ if s[:5] == 'pass ' or s[:5] == 'PASS ':
+ i = len(s)
+ while i > 5 and s[i-1] in '\r\n':
+ i = i-1
+ s = s[:5] + '*'*(i-5) + s[i:]
+ return repr(s)
+
+ # Internal: send one line to the server, appending CRLF
+ def putline(self, line):
+ line = line + CRLF
+ if self.debugging > 1: print '*put*', self.sanitize(line)
+ self.sock.sendall(line)
+
+ # Internal: send one command to the server (through putline())
+ def putcmd(self, line):
+ if self.debugging: print '*cmd*', self.sanitize(line)
+ self.putline(line)
+
+ # Internal: return one line from the server, stripping CRLF.
+ # Raise EOFError if the connection is closed
+ def getline(self):
+ line = self.file.readline()
+ if self.debugging > 1:
+ print '*get*', self.sanitize(line)
+ if not line: raise EOFError
+ if line[-2:] == CRLF: line = line[:-2]
+ elif line[-1:] in CRLF: line = line[:-1]
+ return line
+
+ # Internal: get a response from the server, which may possibly
+ # consist of multiple lines. Return a single string with no
+ # trailing CRLF. If the response consists of multiple lines,
+ # these are separated by '\n' characters in the string
+ def getmultiline(self):
+ line = self.getline()
+ if line[3:4] == '-':
+ code = line[:3]
+ while 1:
+ nextline = self.getline()
+ line = line + ('\n' + nextline)
+ if nextline[:3] == code and \
+ nextline[3:4] != '-':
+ break
+ return line
+
+ # Internal: get a response from the server.
+ # Raise various errors if the response indicates an error
+ def getresp(self):
+ resp = self.getmultiline()
+ if self.debugging: print '*resp*', self.sanitize(resp)
+ self.lastresp = resp[:3]
+ c = resp[:1]
+ if c in ('1', '2', '3'):
+ return resp
+ if c == '4':
+ raise error_temp, resp
+ if c == '5':
+ raise error_perm, resp
+ raise error_proto, resp
+
+ def voidresp(self):
+ """Expect a response beginning with '2'."""
+ resp = self.getresp()
+ if resp[:1] != '2':
+ raise error_reply, resp
+ return resp
+
+ def abort(self):
+ '''Abort a file transfer. Uses out-of-band data.
+ This does not follow the procedure from the RFC to send Telnet
+ IP and Synch; that doesn't seem to work with the servers I've
+ tried. Instead, just send the ABOR command as OOB data.'''
+ line = 'ABOR' + CRLF
+ if self.debugging > 1: print '*put urgent*', self.sanitize(line)
+ self.sock.sendall(line, MSG_OOB)
+ resp = self.getmultiline()
+ if resp[:3] not in ('426', '225', '226'):
+ raise error_proto, resp
+
+ def sendcmd(self, cmd):
+ '''Send a command and return the response.'''
+ self.putcmd(cmd)
+ return self.getresp()
+
+ def voidcmd(self, cmd):
+ """Send a command and expect a response beginning with '2'."""
+ self.putcmd(cmd)
+ return self.voidresp()
+
+ def sendport(self, host, port):
+ '''Send a PORT command with the current host and the given
+ port number.
+ '''
+ hbytes = host.split('.')
+ pbytes = [repr(port//256), repr(port%256)]
+ bytes = hbytes + pbytes
+ cmd = 'PORT ' + ','.join(bytes)
+ return self.voidcmd(cmd)
+
+ def sendeprt(self, host, port):
+ '''Send a EPRT command with the current host and the given port number.'''
+ af = 0
+ if self.af == socket.AF_INET:
+ af = 1
+ if self.af == socket.AF_INET6:
+ af = 2
+ if af == 0:
+ raise error_proto, 'unsupported address family'
+ fields = ['', repr(af), host, repr(port), '']
+ cmd = 'EPRT ' + '|'.join(fields)
+ return self.voidcmd(cmd)
+
+ def makeport(self):
+ '''Create a new socket and send a PORT command for it.'''
+ err = None
+ sock = None
+ for res in socket.getaddrinfo(None, 0, self.af, socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
+ af, socktype, proto, canonname, sa = res
+ try:
+ sock = socket.socket(af, socktype, proto)
+ sock.bind(sa)
+ except socket.error, err:
+ if sock:
+ sock.close()
+ sock = None
+ continue
+ break
+ if sock is None:
+ if err is not None:
+ raise err
+ else:
+ raise socket.error("getaddrinfo returns an empty list")
+ sock.listen(1)
+ port = sock.getsockname()[1] # Get proper port
+ host = self.sock.getsockname()[0] # Get proper host
+ if self.af == socket.AF_INET:
+ resp = self.sendport(host, port)
+ else:
+ resp = self.sendeprt(host, port)
+ if self.timeout is not _GLOBAL_DEFAULT_TIMEOUT:
+ sock.settimeout(self.timeout)
+ return sock
+
+ def makepasv(self):
+ if self.af == socket.AF_INET:
+ host, port = parse227(self.sendcmd('PASV'))
+ else:
+ host, port = parse229(self.sendcmd('EPSV'), self.sock.getpeername())
+ return host, port
+
+ def ntransfercmd(self, cmd, rest=None):
+ """Initiate a transfer over the data connection.
+
+ If the transfer is active, send a port command and the
+ transfer command, and accept the connection. If the server is
+ passive, send a pasv command, connect to it, and start the
+ transfer command. Either way, return the socket for the
+ connection and the expected size of the transfer. The
+ expected size may be None if it could not be determined.
+
+ Optional `rest' argument can be a string that is sent as the
+ argument to a REST command. This is essentially a server
+ marker used to tell the server to skip over any data up to the
+ given marker.
+ """
+ size = None
+ if self.passiveserver:
+ host, port = self.makepasv()
+ conn = socket.create_connection((host, port), self.timeout)
+ try:
+ if rest is not None:
+ self.sendcmd("REST %s" % rest)
+ resp = self.sendcmd(cmd)
+ # Some servers apparently send a 200 reply to
+ # a LIST or STOR command, before the 150 reply
+ # (and way before the 226 reply). This seems to
+ # be in violation of the protocol (which only allows
+ # 1xx or error messages for LIST), so we just discard
+ # this response.
+ if resp[0] == '2':
+ resp = self.getresp()
+ if resp[0] != '1':
+ raise error_reply, resp
+ except:
+ conn.close()
+ raise
+ else:
+ sock = self.makeport()
+ try:
+ if rest is not None:
+ self.sendcmd("REST %s" % rest)
+ resp = self.sendcmd(cmd)
+ # See above.
+ if resp[0] == '2':
+ resp = self.getresp()
+ if resp[0] != '1':
+ raise error_reply, resp
+ conn, sockaddr = sock.accept()
+ if self.timeout is not _GLOBAL_DEFAULT_TIMEOUT:
+ conn.settimeout(self.timeout)
+ finally:
+ sock.close()
+ if resp[:3] == '150':
+ # this is conditional in case we received a 125
+ size = parse150(resp)
+ return conn, size
+
+ def transfercmd(self, cmd, rest=None):
+ """Like ntransfercmd() but returns only the socket."""
+ return self.ntransfercmd(cmd, rest)[0]
+
+ def login(self, user = '', passwd = '', acct = ''):
+ '''Login, default anonymous.'''
+ if not user: user = 'anonymous'
+ if not passwd: passwd = ''
+ if not acct: acct = ''
+ if user == 'anonymous' and passwd in ('', '-'):
+ # If there is no anonymous ftp password specified
+ # then we'll just use anonymous@
+ # We don't send any other thing because:
+ # - We want to remain anonymous
+ # - We want to stop SPAM
+ # - We don't want to let ftp sites to discriminate by the user,
+ # host or country.
+ passwd = passwd + 'anonymous@'
+ resp = self.sendcmd('USER ' + user)
+ if resp[0] == '3': resp = self.sendcmd('PASS ' + passwd)
+ if resp[0] == '3': resp = self.sendcmd('ACCT ' + acct)
+ if resp[0] != '2':
+ raise error_reply, resp
+ return resp
+
+ def retrbinary(self, cmd, callback, blocksize=8192, rest=None):
+ """Retrieve data in binary mode. A new port is created for you.
+
+ Args:
+ cmd: A RETR command.
+ callback: A single parameter callable to be called on each
+ block of data read.
+ blocksize: The maximum number of bytes to read from the
+ socket at one time. [default: 8192]
+ rest: Passed to transfercmd(). [default: None]
+
+ Returns:
+ The response code.
+ """
+ self.voidcmd('TYPE I')
+ conn = self.transfercmd(cmd, rest)
+ while 1:
+ data = conn.recv(blocksize)
+ if not data:
+ break
+ callback(data)
+ conn.close()
+ return self.voidresp()
+
+ def retrlines(self, cmd, callback = None):
+ """Retrieve data in line mode. A new port is created for you.
+
+ Args:
+ cmd: A RETR, LIST, NLST, or MLSD command.
+ callback: An optional single parameter callable that is called
+ for each line with the trailing CRLF stripped.
+ [default: print_line()]
+
+ Returns:
+ The response code.
+ """
+ if callback is None: callback = print_line
+ resp = self.sendcmd('TYPE A')
+ conn = self.transfercmd(cmd)
+ fp = conn.makefile('rb')
+ while 1:
+ line = fp.readline()
+ if self.debugging > 2: print '*retr*', repr(line)
+ if not line:
+ break
+ if line[-2:] == CRLF:
+ line = line[:-2]
+ elif line[-1:] == '\n':
+ line = line[:-1]
+ callback(line)
+ fp.close()
+ conn.close()
+ return self.voidresp()
+
+ def storbinary(self, cmd, fp, blocksize=8192, callback=None, rest=None):
+ """Store a file in binary mode. A new port is created for you.
+
+ Args:
+ cmd: A STOR command.
+ fp: A file-like object with a read(num_bytes) method.
+ blocksize: The maximum data size to read from fp and send over
+ the connection at once. [default: 8192]
+ callback: An optional single parameter callable that is called on
+ on each block of data after it is sent. [default: None]
+ rest: Passed to transfercmd(). [default: None]
+
+ Returns:
+ The response code.
+ """
+ self.voidcmd('TYPE I')
+ conn = self.transfercmd(cmd, rest)
+ while 1:
+ buf = fp.read(blocksize)
+ if not buf: break
+ conn.sendall(buf)
+ if callback: callback(buf)
+ conn.close()
+ return self.voidresp()
+
+ def storlines(self, cmd, fp, callback=None):
+ """Store a file in line mode. A new port is created for you.
+
+ Args:
+ cmd: A STOR command.
+ fp: A file-like object with a readline() method.
+ callback: An optional single parameter callable that is called on
+ on each line after it is sent. [default: None]
+
+ Returns:
+ The response code.
+ """
+ self.voidcmd('TYPE A')
+ conn = self.transfercmd(cmd)
+ while 1:
+ buf = fp.readline()
+ if not buf: break
+ if buf[-2:] != CRLF:
+ if buf[-1] in CRLF: buf = buf[:-1]
+ buf = buf + CRLF
+ conn.sendall(buf)
+ if callback: callback(buf)
+ conn.close()
+ return self.voidresp()
+
+ def acct(self, password):
+ '''Send new account name.'''
+ cmd = 'ACCT ' + password
+ return self.voidcmd(cmd)
+
+ def nlst(self, *args):
+ '''Return a list of files in a given directory (default the current).'''
+ cmd = 'NLST'
+ for arg in args:
+ cmd = cmd + (' ' + arg)
+ files = []
+ self.retrlines(cmd, files.append)
+ return files
+
+ def dir(self, *args):
+ '''List a directory in long form.
+ By default list current directory to stdout.
+ Optional last argument is callback function; all
+ non-empty arguments before it are concatenated to the
+ LIST command. (This *should* only be used for a pathname.)'''
+ cmd = 'LIST'
+ func = None
+ if args[-1:] and type(args[-1]) != type(''):
+ args, func = args[:-1], args[-1]
+ for arg in args:
+ if arg:
+ cmd = cmd + (' ' + arg)
+ self.retrlines(cmd, func)
+
+ def rename(self, fromname, toname):
+ '''Rename a file.'''
+ resp = self.sendcmd('RNFR ' + fromname)
+ if resp[0] != '3':
+ raise error_reply, resp
+ return self.voidcmd('RNTO ' + toname)
+
+ def delete(self, filename):
+ '''Delete a file.'''
+ resp = self.sendcmd('DELE ' + filename)
+ if resp[:3] in ('250', '200'):
+ return resp
+ else:
+ raise error_reply, resp
+
+ def cwd(self, dirname):
+ '''Change to a directory.'''
+ if dirname == '..':
+ try:
+ return self.voidcmd('CDUP')
+ except error_perm, msg:
+ if msg.args[0][:3] != '500':
+ raise
+ elif dirname == '':
+ dirname = '.' # does nothing, but could return error
+ cmd = 'CWD ' + dirname
+ return self.voidcmd(cmd)
+
+ def size(self, filename):
+ '''Retrieve the size of a file.'''
+ # The SIZE command is defined in RFC-3659
+ resp = self.sendcmd('SIZE ' + filename)
+ if resp[:3] == '213':
+ s = resp[3:].strip()
+ try:
+ return int(s)
+ except (OverflowError, ValueError):
+ return long(s)
+
+ def mkd(self, dirname):
+ '''Make a directory, return its full pathname.'''
+ resp = self.sendcmd('MKD ' + dirname)
+ return parse257(resp)
+
+ def rmd(self, dirname):
+ '''Remove a directory.'''
+ return self.voidcmd('RMD ' + dirname)
+
+ def pwd(self):
+ '''Return current working directory.'''
+ resp = self.sendcmd('PWD')
+ return parse257(resp)
+
+ def quit(self):
+ '''Quit, and close the connection.'''
+ resp = self.voidcmd('QUIT')
+ self.close()
+ return resp
+
+ def close(self):
+ '''Close the connection without assuming anything about it.'''
+ if self.file is not None:
+ self.file.close()
+ if self.sock is not None:
+ self.sock.close()
+ self.file = self.sock = None
+
+try:
+ import ssl
+ ssl.PROTOCOL_TLSv1
+except (ImportError, AttributeError):
+ pass
+else:
+ class FTP_TLS(FTP):
+ '''A FTP subclass which adds TLS support to FTP as described
+ in RFC-4217.
+
+ Connect as usual to port 21 implicitly securing the FTP control
+ connection before authenticating.
+
+ Securing the data connection requires user to explicitly ask
+ for it by calling prot_p() method.
+
+ Usage example:
+ >>> from ftplib import FTP_TLS
+ >>> ftps = FTP_TLS('ftp.python.org')
+ >>> ftps.login() # login anonymously previously securing control channel
+ '230 Guest login ok, access restrictions apply.'
+ >>> ftps.prot_p() # switch to secure data connection
+ '200 Protection level set to P'
+ >>> ftps.retrlines('LIST') # list directory content securely
+ total 9
+ drwxr-xr-x 8 root wheel 1024 Jan 3 1994 .
+ drwxr-xr-x 8 root wheel 1024 Jan 3 1994 ..
+ drwxr-xr-x 2 root wheel 1024 Jan 3 1994 bin
+ drwxr-xr-x 2 root wheel 1024 Jan 3 1994 etc
+ d-wxrwxr-x 2 ftp wheel 1024 Sep 5 13:43 incoming
+ drwxr-xr-x 2 root wheel 1024 Nov 17 1993 lib
+ drwxr-xr-x 6 1094 wheel 1024 Sep 13 19:07 pub
+ drwxr-xr-x 3 root wheel 1024 Jan 3 1994 usr
+ -rw-r--r-- 1 root root 312 Aug 1 1994 welcome.msg
+ '226 Transfer complete.'
+ >>> ftps.quit()
+ '221 Goodbye.'
+ >>>
+ '''
+ ssl_version = ssl.PROTOCOL_TLSv1
+
+ def __init__(self, host='', user='', passwd='', acct='', keyfile=None,
+ certfile=None, timeout=_GLOBAL_DEFAULT_TIMEOUT):
+ self.keyfile = keyfile
+ self.certfile = certfile
+ self._prot_p = False
+ FTP.__init__(self, host, user, passwd, acct, timeout)
+
+ def login(self, user='', passwd='', acct='', secure=True):
+ if secure and not isinstance(self.sock, ssl.SSLSocket):
+ self.auth()
+ return FTP.login(self, user, passwd, acct)
+
+ def auth(self):
+ '''Set up secure control connection by using TLS/SSL.'''
+ if isinstance(self.sock, ssl.SSLSocket):
+ raise ValueError("Already using TLS")
+ if self.ssl_version == ssl.PROTOCOL_TLSv1:
+ resp = self.voidcmd('AUTH TLS')
+ else:
+ resp = self.voidcmd('AUTH SSL')
+ self.sock = ssl.wrap_socket(self.sock, self.keyfile, self.certfile,
+ ssl_version=self.ssl_version)
+ self.file = self.sock.makefile(mode='rb')
+ return resp
+
+ def prot_p(self):
+ '''Set up secure data connection.'''
+ # PROT defines whether or not the data channel is to be protected.
+ # Though RFC-2228 defines four possible protection levels,
+ # RFC-4217 only recommends two, Clear and Private.
+ # Clear (PROT C) means that no security is to be used on the
+ # data-channel, Private (PROT P) means that the data-channel
+ # should be protected by TLS.
+ # PBSZ command MUST still be issued, but must have a parameter of
+ # '0' to indicate that no buffering is taking place and the data
+ # connection should not be encapsulated.
+ self.voidcmd('PBSZ 0')
+ resp = self.voidcmd('PROT P')
+ self._prot_p = True
+ return resp
+
+ def prot_c(self):
+ '''Set up clear text data connection.'''
+ resp = self.voidcmd('PROT C')
+ self._prot_p = False
+ return resp
+
+ # --- Overridden FTP methods
+
+ def ntransfercmd(self, cmd, rest=None):
+ conn, size = FTP.ntransfercmd(self, cmd, rest)
+ if self._prot_p:
+ conn = ssl.wrap_socket(conn, self.keyfile, self.certfile,
+ ssl_version=self.ssl_version)
+ return conn, size
+
+ def retrbinary(self, cmd, callback, blocksize=8192, rest=None):
+ self.voidcmd('TYPE I')
+ conn = self.transfercmd(cmd, rest)
+ try:
+ while 1:
+ data = conn.recv(blocksize)
+ if not data:
+ break
+ callback(data)
+ # shutdown ssl layer
+ if isinstance(conn, ssl.SSLSocket):
+ conn.unwrap()
+ finally:
+ conn.close()
+ return self.voidresp()
+
+ def retrlines(self, cmd, callback = None):
+ if callback is None: callback = print_line
+ resp = self.sendcmd('TYPE A')
+ conn = self.transfercmd(cmd)
+ fp = conn.makefile('rb')
+ try:
+ while 1:
+ line = fp.readline()
+ if self.debugging > 2: print '*retr*', repr(line)
+ if not line:
+ break
+ if line[-2:] == CRLF:
+ line = line[:-2]
+ elif line[-1:] == '\n':
+ line = line[:-1]
+ callback(line)
+ # shutdown ssl layer
+ if isinstance(conn, ssl.SSLSocket):
+ conn.unwrap()
+ finally:
+ fp.close()
+ conn.close()
+ return self.voidresp()
+
+ def storbinary(self, cmd, fp, blocksize=8192, callback=None, rest=None):
+ self.voidcmd('TYPE I')
+ conn = self.transfercmd(cmd, rest)
+ try:
+ while 1:
+ buf = fp.read(blocksize)
+ if not buf: break
+ conn.sendall(buf)
+ if callback: callback(buf)
+ # shutdown ssl layer
+ if isinstance(conn, ssl.SSLSocket):
+ conn.unwrap()
+ finally:
+ conn.close()
+ return self.voidresp()
+
+ def storlines(self, cmd, fp, callback=None):
+ self.voidcmd('TYPE A')
+ conn = self.transfercmd(cmd)
+ try:
+ while 1:
+ buf = fp.readline()
+ if not buf: break
+ if buf[-2:] != CRLF:
+ if buf[-1] in CRLF: buf = buf[:-1]
+ buf = buf + CRLF
+ conn.sendall(buf)
+ if callback: callback(buf)
+ # shutdown ssl layer
+ if isinstance(conn, ssl.SSLSocket):
+ conn.unwrap()
+ finally:
+ conn.close()
+ return self.voidresp()
+
+ __all__.append('FTP_TLS')
+ all_errors = (Error, IOError, EOFError, ssl.SSLError)
+
+
+_150_re = None
+
+def parse150(resp):
+ '''Parse the '150' response for a RETR request.
+ Returns the expected transfer size or None; size is not guaranteed to
+ be present in the 150 message.
+ '''
+ if resp[:3] != '150':
+ raise error_reply, resp
+ global _150_re
+ if _150_re is None:
+ import re
+ _150_re = re.compile("150 .* \((\d+) bytes\)", re.IGNORECASE)
+ m = _150_re.match(resp)
+ if not m:
+ return None
+ s = m.group(1)
+ try:
+ return int(s)
+ except (OverflowError, ValueError):
+ return long(s)
+
+
+_227_re = None
+
+def parse227(resp):
+ '''Parse the '227' response for a PASV request.
+ Raises error_proto if it does not contain '(h1,h2,h3,h4,p1,p2)'
+ Return ('host.addr.as.numbers', port#) tuple.'''
+
+ if resp[:3] != '227':
+ raise error_reply, resp
+ global _227_re
+ if _227_re is None:
+ import re
+ _227_re = re.compile(r'(\d+),(\d+),(\d+),(\d+),(\d+),(\d+)')
+ m = _227_re.search(resp)
+ if not m:
+ raise error_proto, resp
+ numbers = m.groups()
+ host = '.'.join(numbers[:4])
+ port = (int(numbers[4]) << 8) + int(numbers[5])
+ return host, port
+
+
+def parse229(resp, peer):
+ '''Parse the '229' response for a EPSV request.
+ Raises error_proto if it does not contain '(|||port|)'
+ Return ('host.addr.as.numbers', port#) tuple.'''
+
+ if resp[:3] != '229':
+ raise error_reply, resp
+ left = resp.find('(')
+ if left < 0: raise error_proto, resp
+ right = resp.find(')', left + 1)
+ if right < 0:
+ raise error_proto, resp # should contain '(|||port|)'
+ if resp[left + 1] != resp[right - 1]:
+ raise error_proto, resp
+ parts = resp[left + 1:right].split(resp[left+1])
+ if len(parts) != 5:
+ raise error_proto, resp
+ host = peer[0]
+ port = int(parts[3])
+ return host, port
+
+
+def parse257(resp):
+ '''Parse the '257' response for a MKD or PWD request.
+ This is a response to a MKD or PWD request: a directory name.
+ Returns the directoryname in the 257 reply.'''
+
+ if resp[:3] != '257':
+ raise error_reply, resp
+ if resp[3:5] != ' "':
+ return '' # Not compliant to RFC 959, but UNIX ftpd does this
+ dirname = ''
+ i = 5
+ n = len(resp)
+ while i < n:
+ c = resp[i]
+ i = i+1
+ if c == '"':
+ if i >= n or resp[i] != '"':
+ break
+ i = i+1
+ dirname = dirname + c
+ return dirname
+
+
+def print_line(line):
+ '''Default retrlines callback to print a line.'''
+ print line
+
+
+def ftpcp(source, sourcename, target, targetname = '', type = 'I'):
+ '''Copy file from one FTP-instance to another.'''
+ if not targetname: targetname = sourcename
+ type = 'TYPE ' + type
+ source.voidcmd(type)
+ target.voidcmd(type)
+ sourcehost, sourceport = parse227(source.sendcmd('PASV'))
+ target.sendport(sourcehost, sourceport)
+ # RFC 959: the user must "listen" [...] BEFORE sending the
+ # transfer request.
+ # So: STOR before RETR, because here the target is a "user".
+ treply = target.sendcmd('STOR ' + targetname)
+ if treply[:3] not in ('125', '150'): raise error_proto # RFC 959
+ sreply = source.sendcmd('RETR ' + sourcename)
+ if sreply[:3] not in ('125', '150'): raise error_proto # RFC 959
+ source.voidresp()
+ target.voidresp()
+
+
+class Netrc:
+ """Class to parse & provide access to 'netrc' format files.
+
+ See the netrc(4) man page for information on the file format.
+
+ WARNING: This class is obsolete -- use module netrc instead.
+
+ """
+ __defuser = None
+ __defpasswd = None
+ __defacct = None
+
+ def __init__(self, filename=None):
+ if filename is None:
+ if "HOME" in os.environ:
+ filename = os.path.join(os.environ["HOME"],
+ ".netrc")
+ else:
+ raise IOError, \
+ "specify file to load or set $HOME"
+ self.__hosts = {}
+ self.__macros = {}
+ fp = open(filename, "r")
+ in_macro = 0
+ while 1:
+ line = fp.readline()
+ if not line: break
+ if in_macro and line.strip():
+ macro_lines.append(line)
+ continue
+ elif in_macro:
+ self.__macros[macro_name] = tuple(macro_lines)
+ in_macro = 0
+ words = line.split()
+ host = user = passwd = acct = None
+ default = 0
+ i = 0
+ while i < len(words):
+ w1 = words[i]
+ if i+1 < len(words):
+ w2 = words[i + 1]
+ else:
+ w2 = None
+ if w1 == 'default':
+ default = 1
+ elif w1 == 'machine' and w2:
+ host = w2.lower()
+ i = i + 1
+ elif w1 == 'login' and w2:
+ user = w2
+ i = i + 1
+ elif w1 == 'password' and w2:
+ passwd = w2
+ i = i + 1
+ elif w1 == 'account' and w2:
+ acct = w2
+ i = i + 1
+ elif w1 == 'macdef' and w2:
+ macro_name = w2
+ macro_lines = []
+ in_macro = 1
+ break
+ i = i + 1
+ if default:
+ self.__defuser = user or self.__defuser
+ self.__defpasswd = passwd or self.__defpasswd
+ self.__defacct = acct or self.__defacct
+ if host:
+ if host in self.__hosts:
+ ouser, opasswd, oacct = \
+ self.__hosts[host]
+ user = user or ouser
+ passwd = passwd or opasswd
+ acct = acct or oacct
+ self.__hosts[host] = user, passwd, acct
+ fp.close()
+
+ def get_hosts(self):
+ """Return a list of hosts mentioned in the .netrc file."""
+ return self.__hosts.keys()
+
+ def get_account(self, host):
+ """Returns login information for the named host.
+
+ The return value is a triple containing userid,
+ password, and the accounting field.
+
+ """
+ host = host.lower()
+ user = passwd = acct = None
+ if host in self.__hosts:
+ user, passwd, acct = self.__hosts[host]
+ user = user or self.__defuser
+ passwd = passwd or self.__defpasswd
+ acct = acct or self.__defacct
+ return user, passwd, acct
+
+ def get_macros(self):
+ """Return a list of all defined macro names."""
+ return self.__macros.keys()
+
+ def get_macro(self, macro):
+ """Return a sequence of lines which define a named macro."""
+ return self.__macros[macro]
+
+
+
+def test():
+ '''Test program.
+ Usage: ftp [-d] [-r[file]] host [-l[dir]] [-d[dir]] [-p] [file] ...
+
+ -d dir
+ -l list
+ -p password
+ '''
+
+ if len(sys.argv) < 2:
+ print test.__doc__
+ sys.exit(0)
+
+ debugging = 0
+ rcfile = None
+ while sys.argv[1] == '-d':
+ debugging = debugging+1
+ del sys.argv[1]
+ if sys.argv[1][:2] == '-r':
+ # get name of alternate ~/.netrc file:
+ rcfile = sys.argv[1][2:]
+ del sys.argv[1]
+ host = sys.argv[1]
+ ftp = FTP(host)
+ ftp.set_debuglevel(debugging)
+ userid = passwd = acct = ''
+ try:
+ netrc = Netrc(rcfile)
+ except IOError:
+ if rcfile is not None:
+ sys.stderr.write("Could not open account file"
+ " -- using anonymous login.")
+ else:
+ try:
+ userid, passwd, acct = netrc.get_account(host)
+ except KeyError:
+ # no account for host
+ sys.stderr.write(
+ "No account -- using anonymous login.")
+ ftp.login(userid, passwd, acct)
+ for file in sys.argv[2:]:
+ if file[:2] == '-l':
+ ftp.dir(file[2:])
+ elif file[:2] == '-d':
+ cmd = 'CWD'
+ if file[2:]: cmd = cmd + ' ' + file[2:]
+ resp = ftp.sendcmd(cmd)
+ elif file == '-p':
+ ftp.set_pasv(not ftp.passiveserver)
+ else:
+ ftp.retrbinary('RETR ' + file, \
+ sys.stdout.write, 1024)
+ ftp.quit()
+
+
+if __name__ == '__main__':
+ test()
diff --git a/Lib/socket.py b/Lib/socket.py
--- a/Lib/socket.py
+++ b/Lib/socket.py
@@ -700,6 +700,14 @@
except java.lang.Exception, jlx:
raise _map_exception(jlx)
+#
+# Skeleton implementation of gethostbyname_ex
+# Needed because urllib2 refers to it
+#
+
+def gethostbyname_ex(name):
+ return (name, [], gethostbyname(name))
+
def gethostbyaddr(name):
names, addrs = _gethostbyaddr(name)
return (names[0], names, addrs)
@@ -1850,24 +1858,31 @@
class ssl:
- def __init__(self, plain_sock, keyfile=None, certfile=None):
+ def __init__(self, jython_socket_wrapper, keyfile=None, certfile=None):
try:
- self.ssl_sock = self._make_ssl_socket(plain_sock)
- self._in_buf = java.io.BufferedInputStream(self.ssl_sock.getInputStream())
- self._out_buf = java.io.BufferedOutputStream(self.ssl_sock.getOutputStream())
+ self.jython_socket_wrapper = jython_socket_wrapper
+ jython_socket = self.jython_socket_wrapper._sock
+ self.java_ssl_socket = self._make_ssl_socket(jython_socket)
+ self._in_buf = java.io.BufferedInputStream(self.java_ssl_socket.getInputStream())
+ self._out_buf = java.io.BufferedOutputStream(self.java_ssl_socket.getOutputStream())
except java.lang.Exception, jlx:
raise _map_exception(jlx)
- def _make_ssl_socket(self, plain_socket, auto_close=0):
- java_net_socket = plain_socket._get_jsocket()
+ def _make_ssl_socket(self, jython_socket, auto_close=0):
+ java_net_socket = jython_socket._get_jsocket()
assert isinstance(java_net_socket, java.net.Socket)
host = java_net_socket.getInetAddress().getHostAddress()
port = java_net_socket.getPort()
factory = javax.net.ssl.SSLSocketFactory.getDefault();
- ssl_socket = factory.createSocket(java_net_socket, host, port, auto_close)
- ssl_socket.setEnabledCipherSuites(ssl_socket.getSupportedCipherSuites())
- ssl_socket.startHandshake()
- return ssl_socket
+ java_ssl_socket = factory.createSocket(java_net_socket, host, port, auto_close)
+ java_ssl_socket.setEnabledCipherSuites(java_ssl_socket.getSupportedCipherSuites())
+ java_ssl_socket.startHandshake()
+ return java_ssl_socket
+
+ def __getattr__(self, attr_name):
+ if hasattr(self.jython_socket_wrapper, attr_name):
+ return getattr(self.jython_socket_wrapper, attr_name)
+ raise AttributeError(attr_name)
def read(self, n=4096):
try:
@@ -1891,7 +1906,7 @@
def _get_server_cert(self):
try:
- return self.ssl_sock.getSession().getPeerCertificates()[0]
+ return self.java_ssl_socket.getSession().getPeerCertificates()[0]
except java.lang.Exception, jlx:
raise _map_exception(jlx)
@@ -1903,12 +1918,6 @@
cert = self._get_server_cert()
return cert.getIssuerDN().toString()
-_realssl = ssl
-def ssl(sock, keyfile=None, certfile=None):
- if hasattr(sock, "_sock"):
- sock = sock._sock
- return _realssl(sock, keyfile, certfile)
-
def test():
s = socket(AF_INET, SOCK_STREAM)
s.connect(("", 80))
diff --git a/Lib/ssl.py b/Lib/ssl.py
--- a/Lib/ssl.py
+++ b/Lib/ssl.py
@@ -7,4 +7,4 @@
import socket
-wrap = socket.ssl
+wrap_socket = socket.ssl
diff --git a/Lib/test/test_socket_ssl.py b/Lib/test/test_socket_ssl.py
--- a/Lib/test/test_socket_ssl.py
+++ b/Lib/test/test_socket_ssl.py
@@ -61,11 +61,20 @@
time.sleep(1)
connector()
+def test_https_socket():
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect(('www.verisign.com', 443))
+ ssl_sock = socket.ssl(s)
+ ssl_sock.server()
+ ssl_sock.issuer()
+ s.close()
+
def test_main():
if not hasattr(socket, "ssl"):
raise test_support.TestSkipped("socket module has no ssl support")
test_rude_shutdown()
test_basic()
+ test_https_socket()
if __name__ == "__main__":
test_main()
diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py
--- a/Lib/test/test_urllib.py
+++ b/Lib/test/test_urllib.py
@@ -3,12 +3,16 @@
import urllib
import httplib
import unittest
-from test import test_support
import os
+import sys
import mimetools
import tempfile
import StringIO
+from test import test_support
+from base64 import b64encode
+
+
def hexescape(char):
"""Escape char as RFC 2396 specifies"""
hex_repr = hex(ord(char))[2:].upper()
@@ -16,6 +20,43 @@
hex_repr = "0%s" % hex_repr
return "%" + hex_repr
+
+class FakeHTTPMixin(object):
+ def fakehttp(self, fakedata):
+ class FakeSocket(StringIO.StringIO):
+
+ def sendall(self, data):
+ FakeHTTPConnection.buf = data
+
+ def makefile(self, *args, **kwds):
+ return self
+
+ def read(self, amt=None):
+ if self.closed:
+ return ""
+ return StringIO.StringIO.read(self, amt)
+
+ def readline(self, length=None):
+ if self.closed:
+ return ""
+ return StringIO.StringIO.readline(self, length)
+
+ class FakeHTTPConnection(httplib.HTTPConnection):
+
+ # buffer to store data for verification in urlopen tests.
+ buf = ""
+
+ def connect(self):
+ self.sock = FakeSocket(fakedata)
+
+ assert httplib.HTTP._connection_class == httplib.HTTPConnection
+
+ httplib.HTTP._connection_class = FakeHTTPConnection
+
+ def unfakehttp(self):
+ httplib.HTTP._connection_class = httplib.HTTPConnection
+
+
class urlopen_FileTests(unittest.TestCase):
"""Test urlopen() opening a temporary file.
@@ -44,7 +85,7 @@
# Make sure object returned by urlopen() has the specified methods
for attr in ("read", "readline", "readlines", "fileno",
"close", "info", "geturl", "getcode", "__iter__"):
- self.assert_(hasattr(self.returned_obj, attr),
+ self.assertTrue(hasattr(self.returned_obj, attr),
"object returned by urlopen() lacks %s attribute" %
attr)
@@ -79,7 +120,7 @@
self.returned_obj.close()
def test_info(self):
- self.assert_(isinstance(self.returned_obj.info(), mimetools.Message))
+ self.assertIsInstance(self.returned_obj.info(), mimetools.Message)
def test_geturl(self):
self.assertEqual(self.returned_obj.geturl(), self.pathname)
@@ -95,6 +136,9 @@
for line in self.returned_obj.__iter__():
self.assertEqual(line, self.text)
+ def test_relativelocalfile(self):
+ self.assertRaises(ValueError,urllib.urlopen,'./' + self.pathname)
+
class ProxyTests(unittest.TestCase):
def setUp(self):
@@ -114,31 +158,15 @@
self.env.set('NO_PROXY', 'localhost')
proxies = urllib.getproxies_environment()
# getproxies_environment use lowered case truncated (no '_proxy') keys
- self.assertEquals('localhost', proxies['no'])
+ self.assertEqual('localhost', proxies['no'])
+ # List of no_proxies with space.
+ self.env.set('NO_PROXY', 'localhost, anotherdomain.com, newdomain.com')
+ self.assertTrue(urllib.proxy_bypass_environment('anotherdomain.com'))
-class urlopen_HttpTests(unittest.TestCase):
+class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin):
"""Test urlopen() opening a fake http connection."""
- def fakehttp(self, fakedata):
- class FakeSocket(StringIO.StringIO):
- def sendall(self, str): pass
- def makefile(self, mode, name): return self
- def read(self, amt=None):
- if self.closed: return ''
- return StringIO.StringIO.read(self, amt)
- def readline(self, length=None):
- if self.closed: return ''
- return StringIO.StringIO.readline(self, length)
- class FakeHTTPConnection(httplib.HTTPConnection):
- def connect(self):
- self.sock = FakeSocket(fakedata)
- assert httplib.HTTP._connection_class == httplib.HTTPConnection
- httplib.HTTP._connection_class = FakeHTTPConnection
-
- def unfakehttp(self):
- httplib.HTTP._connection_class = httplib.HTTPConnection
-
def test_read(self):
self.fakehttp('Hello!')
try:
@@ -150,6 +178,16 @@
finally:
self.unfakehttp()
+ def test_url_fragment(self):
+ # Issue #11703: geturl() omits fragments in the original URL.
+ url = 'http://docs.python.org/library/urllib.html#OK'
+ self.fakehttp('Hello!')
+ try:
+ fp = urllib.urlopen(url)
+ self.assertEqual(fp.geturl(), url)
+ finally:
+ self.unfakehttp()
+
def test_read_bogus(self):
# urlopen() should raise IOError for many error codes.
self.fakehttp('''HTTP/1.1 401 Authentication Required
@@ -186,6 +224,62 @@
finally:
self.unfakehttp()
+ def test_missing_localfile(self):
+ self.assertRaises(IOError, urllib.urlopen,
+ 'file://localhost/a/missing/file.py')
+ fd, tmp_file = tempfile.mkstemp()
+ tmp_fileurl = 'file://localhost/' + tmp_file.replace(os.path.sep, '/')
+ try:
+ self.assertTrue(os.path.exists(tmp_file))
+ fp = urllib.urlopen(tmp_fileurl)
+ finally:
+ os.close(fd)
+ fp.close()
+ os.unlink(tmp_file)
+
+ self.assertFalse(os.path.exists(tmp_file))
+ self.assertRaises(IOError, urllib.urlopen, tmp_fileurl)
+
+ def test_ftp_nonexisting(self):
+ self.assertRaises(IOError, urllib.urlopen,
+ 'ftp://localhost/not/existing/file.py')
+
+
+ def test_userpass_inurl(self):
+ self.fakehttp('Hello!')
+ try:
+ fakehttp_wrapper = httplib.HTTP._connection_class
+ fp = urllib.urlopen("http://user:pass@python.org/")
+ authorization = ("Authorization: Basic %s\r\n" %
+ b64encode('user:pass'))
+ # The authorization header must be in place
+ self.assertIn(authorization, fakehttp_wrapper.buf)
+ self.assertEqual(fp.readline(), "Hello!")
+ self.assertEqual(fp.readline(), "")
+ self.assertEqual(fp.geturl(), 'http://user:pass@python.org/')
+ self.assertEqual(fp.getcode(), 200)
+ finally:
+ self.unfakehttp()
+
+ def test_userpass_with_spaces_inurl(self):
+ self.fakehttp('Hello!')
+ try:
+ url = "http://a b:c d at python.org/"
+ fakehttp_wrapper = httplib.HTTP._connection_class
+ authorization = ("Authorization: Basic %s\r\n" %
+ b64encode('a b:c d'))
+ fp = urllib.urlopen(url)
+ # The authorization header must be in place
+ self.assertIn(authorization, fakehttp_wrapper.buf)
+ self.assertEqual(fp.readline(), "Hello!")
+ self.assertEqual(fp.readline(), "")
+ # the spaces are quoted in URL so no match
+ self.assertNotEqual(fp.geturl(), url)
+ self.assertEqual(fp.getcode(), 200)
+ finally:
+ self.unfakehttp()
+
+
class urlretrieve_FileTests(unittest.TestCase):
"""Test urllib.urlretrieve() on local files"""
@@ -243,9 +337,9 @@
# a headers value is returned.
result = urllib.urlretrieve("file:%s" % test_support.TESTFN)
self.assertEqual(result[0], test_support.TESTFN)
- self.assert_(isinstance(result[1], mimetools.Message),
- "did not get a mimetools.Message instance as second "
- "returned value")
+ self.assertIsInstance(result[1], mimetools.Message,
+ "did not get a mimetools.Message instance as "
+ "second returned value")
def test_copy(self):
# Test that setting the filename argument works.
@@ -254,7 +348,7 @@
result = urllib.urlretrieve(self.constructLocalFileUrl(
test_support.TESTFN), second_temp)
self.assertEqual(second_temp, result[0])
- self.assert_(os.path.exists(second_temp), "copy of the file was not "
+ self.assertTrue(os.path.exists(second_temp), "copy of the file was not "
"made")
FILE = file(second_temp, 'rb')
try:
@@ -268,9 +362,9 @@
def test_reporthook(self):
# Make sure that the reporthook works.
def hooktester(count, block_size, total_size, count_holder=[0]):
- self.assert_(isinstance(count, int))
- self.assert_(isinstance(block_size, int))
- self.assert_(isinstance(total_size, int))
+ self.assertIsInstance(count, int)
+ self.assertIsInstance(block_size, int)
+ self.assertIsInstance(total_size, int)
self.assertEqual(count, count_holder[0])
count_holder[0] = count_holder[0] + 1
second_temp = "%s.2" % test_support.TESTFN
@@ -318,6 +412,45 @@
self.assertEqual(report[0][1], 8192)
self.assertEqual(report[0][2], 8193)
+
+class urlretrieve_HttpTests(unittest.TestCase, FakeHTTPMixin):
+ """Test urllib.urlretrieve() using fake http connections"""
+
+ def test_short_content_raises_ContentTooShortError(self):
+ self.fakehttp('''HTTP/1.1 200 OK
+Date: Wed, 02 Jan 2008 03:03:54 GMT
+Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
+Connection: close
+Content-Length: 100
+Content-Type: text/html; charset=iso-8859-1
+
+FF
+''')
+
+ def _reporthook(par1, par2, par3):
+ pass
+
+ try:
+ self.assertRaises(urllib.ContentTooShortError, urllib.urlretrieve,
+ 'http://example.com', reporthook=_reporthook)
+ finally:
+ self.unfakehttp()
+
+ def test_short_content_raises_ContentTooShortError_without_reporthook(self):
+ self.fakehttp('''HTTP/1.1 200 OK
+Date: Wed, 02 Jan 2008 03:03:54 GMT
+Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
+Connection: close
+Content-Length: 100
+Content-Type: text/html; charset=iso-8859-1
+
+FF
+''')
+ try:
+ self.assertRaises(urllib.ContentTooShortError, urllib.urlretrieve, 'http://example.com/')
+ finally:
+ self.unfakehttp()
+
class QuotingTests(unittest.TestCase):
"""Tests for urllib.quote() and urllib.quote_plus()
@@ -395,8 +528,10 @@
result = urllib.quote(partial_quote)
self.assertEqual(expected, result,
"using quote(): %s != %s" % (expected, result))
+ result = urllib.quote_plus(partial_quote)
self.assertEqual(expected, result,
"using quote_plus(): %s != %s" % (expected, result))
+ self.assertRaises(TypeError, urllib.quote, None)
def test_quoting_space(self):
# Make sure quote() and quote_plus() handle spaces as specified in
@@ -527,7 +662,7 @@
expect_somewhere = ["1st=1", "2nd=2", "3rd=3"]
result = urllib.urlencode(given)
for expected in expect_somewhere:
- self.assert_(expected in result,
+ self.assertIn(expected, result,
"testing %s: %s not found in %s" %
(test_type, expected, result))
self.assertEqual(result.count('&'), 2,
@@ -536,7 +671,7 @@
amp_location = result.index('&')
on_amp_left = result[amp_location - 1]
on_amp_right = result[amp_location + 1]
- self.assert_(on_amp_left.isdigit() and on_amp_right.isdigit(),
+ self.assertTrue(on_amp_left.isdigit() and on_amp_right.isdigit(),
"testing %s: '&' not located in proper place in %s" %
(test_type, result))
self.assertEqual(len(result), (5 * 3) + 2, #5 chars per thing and amps
@@ -574,8 +709,7 @@
result = urllib.urlencode(given, True)
for value in given["sequence"]:
expect = "sequence=%s" % value
- self.assert_(expect in result,
- "%s not found in %s" % (expect, result))
+ self.assertIn(expect, result)
self.assertEqual(result.count('&'), 2,
"Expected 2 '&'s, got %s" % result.count('&'))
@@ -622,8 +756,45 @@
"url2pathname() failed; %s != %s" %
(expect, result))
+ @unittest.skipUnless(sys.platform == 'win32',
+ 'test specific to the nturl2path library')
+ def test_ntpath(self):
+ given = ('/C:/', '///C:/', '/C|//')
+ expect = 'C:\\'
+ for url in given:
+ result = urllib.url2pathname(url)
+ self.assertEqual(expect, result,
+ 'nturl2path.url2pathname() failed; %s != %s' %
+ (expect, result))
+ given = '///C|/path'
+ expect = 'C:\\path'
+ result = urllib.url2pathname(given)
+ self.assertEqual(expect, result,
+ 'nturl2path.url2pathname() failed; %s != %s' %
+ (expect, result))
+
+class Utility_Tests(unittest.TestCase):
+ """Testcase to test the various utility functions in the urllib."""
+
+ def test_splitpasswd(self):
+ """Some of the password examples are not sensible, but it is added to
+ confirming to RFC2617 and addressing issue4675.
+ """
+ self.assertEqual(('user', 'ab'),urllib.splitpasswd('user:ab'))
+ self.assertEqual(('user', 'a\nb'),urllib.splitpasswd('user:a\nb'))
+ self.assertEqual(('user', 'a\tb'),urllib.splitpasswd('user:a\tb'))
+ self.assertEqual(('user', 'a\rb'),urllib.splitpasswd('user:a\rb'))
+ self.assertEqual(('user', 'a\fb'),urllib.splitpasswd('user:a\fb'))
+ self.assertEqual(('user', 'a\vb'),urllib.splitpasswd('user:a\vb'))
+ self.assertEqual(('user', 'a:b'),urllib.splitpasswd('user:a:b'))
+ self.assertEqual(('user', 'a b'),urllib.splitpasswd('user:a b'))
+ self.assertEqual(('user 2', 'ab'),urllib.splitpasswd('user 2:ab'))
+ self.assertEqual(('user+1', 'a+b'),urllib.splitpasswd('user+1:a+b'))
+
+
class URLopener_Tests(unittest.TestCase):
"""Testcase to test the open method of URLopener class."""
+
def test_quoted_open(self):
class DummyURLopener(urllib.URLopener):
def open_spam(self, url):
@@ -640,7 +811,7 @@
# Just commented them out.
# Can't really tell why keep failing in windows and sparc.
-# Everywhere else they work ok, but on those machines, someteimes
+# Everywhere else they work ok, but on those machines, sometimes
# fail in one of the tests, sometimes in other. I have a linux, and
# the tests go ok.
# If anybody has one of the problematic enviroments, please help!
@@ -689,7 +860,7 @@
# def testTimeoutNone(self):
# # global default timeout is ignored
# import socket
-# self.assert_(socket.getdefaulttimeout() is None)
+# self.assertTrue(socket.getdefaulttimeout() is None)
# socket.setdefaulttimeout(30)
# try:
# ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, [])
@@ -701,7 +872,7 @@
# def testTimeoutDefault(self):
# # global default timeout is used
# import socket
-# self.assert_(socket.getdefaulttimeout() is None)
+# self.assertTrue(socket.getdefaulttimeout() is None)
# socket.setdefaulttimeout(30)
# try:
# ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, [])
@@ -727,11 +898,13 @@
urlopen_FileTests,
urlopen_HttpTests,
urlretrieve_FileTests,
+ urlretrieve_HttpTests,
ProxyTests,
QuotingTests,
UnquotingTests,
urlencode_Tests,
Pathname_Tests,
+ Utility_Tests,
URLopener_Tests,
#FTPWrapperTests,
)
diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py
--- a/Lib/test/test_urllib2.py
+++ b/Lib/test/test_urllib2.py
@@ -293,6 +293,7 @@
self._tunnel_headers = headers
else:
self._tunnel_headers.clear()
+
def request(self, method, url, body=None, headers=None):
self.method = method
self.selector = url
@@ -304,9 +305,13 @@
if self.raise_on_endheaders:
import socket
raise socket.error()
+
def getresponse(self):
return MockHTTPResponse(MockFile(), {}, 200, "OK")
+ def close(self):
+ pass
+
class MockHandler:
# useful for testing handler machinery
# see add_ordered_mock_handlers() docstring
@@ -593,21 +598,20 @@
def sanepathname2url(path):
import urllib
urlpath = urllib.pathname2url(path)
- if ((os._name if test_support.is_jython else os.name) == 'nt'
- and urlpath.startswith("///")):
+ if os.name == "nt" and urlpath.startswith("///"):
urlpath = urlpath[2:]
# XXX don't ask me about the mac...
return urlpath
class HandlerTests(unittest.TestCase):
- @unittest.skip("FIXME: broken")
def test_ftp(self):
class MockFTPWrapper:
def __init__(self, data): self.data = data
def retrfile(self, filename, filetype):
self.filename, self.filetype = filename, filetype
return StringIO.StringIO(self.data), len(self.data)
+ def close(self): pass
class NullFTPHandler(urllib2.FTPHandler):
def __init__(self, data): self.data = data
@@ -659,7 +663,6 @@
self.assertEqual(headers.get("Content-type"), mimetype)
self.assertEqual(int(headers["Content-length"]), len(data))
- @unittest.skip("FIXME: not working")
def test_file(self):
import rfc822, socket
h = urllib2.FileHandler()
@@ -753,7 +756,6 @@
self.assertEqual(req.type, "ftp")
self.assertEqual(req.type == "ftp", ftp)
- @unittest.skip("FIXME: broken")
def test_http(self):
h = urllib2.AbstractHTTPHandler()
@@ -842,7 +844,6 @@
p_ds_req = h.do_request_(ds_req)
self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
- @unittest.skip("FIXME: broken")
def test_fixpath_in_weirdurls(self):
# Issue4493: urllib2 to supply '/' when to urls where path does not
# start with'/'
@@ -974,6 +975,28 @@
self.assertEqual(count,
urllib2.HTTPRedirectHandler.max_redirections)
+ def test_invalid_redirect(self):
+ from_url = "http://example.com/a.html"
+ valid_schemes = ['http', 'https', 'ftp']
+ invalid_schemes = ['file', 'imap', 'ldap']
+ schemeless_url = "example.com/b.html"
+ h = urllib2.HTTPRedirectHandler()
+ o = h.parent = MockOpener()
+ req = Request(from_url)
+ req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
+
+ for scheme in invalid_schemes:
+ invalid_url = scheme + '://' + schemeless_url
+ self.assertRaises(urllib2.HTTPError, h.http_error_302,
+ req, MockFile(), 302, "Security Loophole",
+ MockHeaders({"location": invalid_url}))
+
+ for scheme in valid_schemes:
+ valid_url = scheme + '://' + schemeless_url
+ h.http_error_302(req, MockFile(), 302, "That's fine",
+ MockHeaders({"location": valid_url}))
+ self.assertEqual(o.req.get_full_url(), valid_url)
+
def test_cookie_redirect(self):
# cookies shouldn't leak into redirected requests
from cookielib import CookieJar
@@ -990,6 +1013,15 @@
o.open("http://www.example.com/")
self.assertTrue(not hh.req.has_header("Cookie"))
+ def test_redirect_fragment(self):
+ redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n'
+ hh = MockHTTPHandler(302, 'Location: ' + redirected_url)
+ hdeh = urllib2.HTTPDefaultErrorHandler()
+ hrh = urllib2.HTTPRedirectHandler()
+ o = build_test_opener(hh, hdeh, hrh)
+ fp = o.open('http://www.example.com')
+ self.assertEqual(fp.geturl(), redirected_url.strip())
+
def test_proxy(self):
o = OpenerDirector()
ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
@@ -1074,12 +1106,31 @@
self._test_basic_auth(opener, auth_handler, "Authorization",
realm, http_handler, password_manager,
"http://acme.example.com/protected",
- "http://acme.example.com/protected",
- )
+ "http://acme.example.com/protected"
+ )
def test_basic_auth_with_single_quoted_realm(self):
self.test_basic_auth(quote_char="'")
+ @unittest.skipIf(test_support.is_jython, "Currently not working on jython")
+ def test_basic_auth_with_unquoted_realm(self):
+ opener = OpenerDirector()
+ password_manager = MockPasswordManager()
+ auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
+ realm = "ACME Widget Store"
+ http_handler = MockHTTPHandler(
+ 401, 'WWW-Authenticate: Basic realm=%s\r\n\r\n' % realm)
+ opener.add_handler(auth_handler)
+ opener.add_handler(http_handler)
+ msg = "Basic Auth Realm was unquoted"
+ with test_support.check_warnings((msg, UserWarning)):
+ self._test_basic_auth(opener, auth_handler, "Authorization",
+ realm, http_handler, password_manager,
+ "http://acme.example.com/protected",
+ "http://acme.example.com/protected"
+ )
+
+
def test_proxy_basic_auth(self):
opener = OpenerDirector()
ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
@@ -1098,7 +1149,7 @@
)
def test_basic_and_digest_auth_handlers(self):
- # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40*
+ # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40*
# response (http://python.org/sf/1479302), where it should instead
# return None to allow another handler (especially
# HTTPBasicAuthHandler) to handle the response.
@@ -1275,12 +1326,44 @@
req = Request("<URL:http://www.python.org>")
self.assertEqual("www.python.org", req.get_host())
- def test_urlwith_fragment(self):
+ def test_url_fragment(self):
req = Request("http://www.python.org/?qs=query#fragment=true")
self.assertEqual("/?qs=query", req.get_selector())
req = Request("http://www.python.org/#fun=true")
self.assertEqual("/", req.get_selector())
+ # Issue 11703: geturl() omits fragment in the original URL.
+ url = 'http://docs.python.org/library/urllib2.html#OK'
+ req = Request(url)
+ self.assertEqual(req.get_full_url(), url)
+
+ def test_HTTPError_interface(self):
+ """
+ Issue 13211 reveals that HTTPError didn't implement the URLError
+ interface even though HTTPError is a subclass of URLError.
+
+ >>> err = urllib2.HTTPError(msg='something bad happened', url=None, code=None, hdrs=None, fp=None)
+ >>> assert hasattr(err, 'reason')
+ >>> err.reason
+ 'something bad happened'
+ """
+
+ @unittest.skip("Test is broken because of fp=None, which causes failure to call addinfourl superclass __init__")
+ def test_HTTPError_interface_call(self):
+ """
+ Issue 15701= - HTTPError interface has info method available from URLError.
+ """
+ err = urllib2.HTTPError(msg='something bad happened', url=None,
+ code=None, hdrs='Content-Length:42', fp=None)
+ self.assertTrue(hasattr(err, 'reason'))
+ assert hasattr(err, 'reason')
+ assert hasattr(err, 'info')
+ assert callable(err.info)
+ try:
+ err.info()
+ except AttributeError:
+ self.fail("err.info() failed")
+ self.assertEqual(err.info(), "Content-Length:42")
def test_main(verbose=None):
from test import test_urllib2
diff --git a/Lib/test/test_urllib2_localnet.py b/Lib/test/test_urllib2_localnet.py
--- a/Lib/test/test_urllib2_localnet.py
+++ b/Lib/test/test_urllib2_localnet.py
@@ -1,14 +1,21 @@
#!/usr/bin/env python
-import sys
-import threading
import urlparse
import urllib2
import BaseHTTPServer
import unittest
import hashlib
+
from test import test_support
+if test_support.is_jython:
+ import socket
+ # Working around an IPV6 problem on Windows
+ socket._use_ipv4_addresses_only(True)
+
+mimetools = test_support.import_module('mimetools', deprecated=True)
+threading = test_support.import_module('threading')
+
# Loopback http server infrastructure
class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
@@ -19,7 +26,12 @@
def __init__(self, server_address, RequestHandlerClass):
BaseHTTPServer.HTTPServer.__init__(self,
server_address,
- RequestHandlerClass)
+ RequestHandlerClass,
+ True)
+
+ host, port = self.socket.getsockname()[:2]
+ self.server_name = socket.getfqdn(host)
+ self.server_port = port
# Set the timeout of our listening socket really low so
# that we can stop the server easily.
@@ -40,13 +52,16 @@
class LoopbackHttpServerThread(threading.Thread):
"""Stoppable thread that runs a loopback http server."""
- def __init__(self, port, RequestHandlerClass):
+ def __init__(self, request_handler):
threading.Thread.__init__(self)
- self._RequestHandlerClass = RequestHandlerClass
self._stop = False
- self._port = port
- self._server_address = ('127.0.0.1', self._port)
self.ready = threading.Event()
+ request_handler.protocol_version = "HTTP/1.0"
+ self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
+ request_handler)
+ #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
+ # self.httpd.server_port)
+ self.port = self.httpd.server_port
def stop(self):
"""Stops the webserver if it's currently running."""
@@ -57,19 +72,9 @@
self.join()
def run(self):
- protocol = "HTTP/1.0"
-
- self._RequestHandlerClass.protocol_version = protocol
- httpd = LoopbackHttpServer(self._server_address,
- self._RequestHandlerClass)
-
- sa = httpd.socket.getsockname()
- #print "Serving HTTP on", sa[0], "port", sa[1], "..."
-
self.ready.set()
while not self._stop:
- httpd.handle_request()
- httpd.server_close()
+ self.httpd.handle_request()
# Authentication infrastructure
@@ -161,13 +166,13 @@
if len(self._users) == 0:
return True
- if not request_handler.headers.has_key('Proxy-Authorization'):
+ if 'Proxy-Authorization' not in request_handler.headers:
return self._return_auth_challenge(request_handler)
else:
auth_dict = self._create_auth_dict(
request_handler.headers['Proxy-Authorization']
)
- if self._users.has_key(auth_dict["username"]):
+ if auth_dict["username"] in self._users:
password = self._users[ auth_dict["username"] ]
else:
return self._return_auth_challenge(request_handler)
@@ -202,7 +207,11 @@
testing.
"""
- digest_auth_handler = DigestAuthHandler()
+ def __init__(self, digest_auth_handler, *args, **kwargs):
+ # This has to be set before calling our parent's __init__(), which will
+ # try to call do_GET().
+ self.digest_auth_handler = digest_auth_handler
+ BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
def log_message(self, format, *args):
# Uncomment the next line for debugging.
@@ -223,60 +232,68 @@
# Test cases
-class ProxyAuthTests(unittest.TestCase):
- URL = "http://www.foo.com"
+class BaseTestCase(unittest.TestCase):
+ def setUp(self):
+ self._threads = test_support.threading_setup()
- PORT = 58080
+ def tearDown(self):
+ test_support.threading_cleanup(*self._threads)
+
+
+class ProxyAuthTests(BaseTestCase):
+ URL = "http://localhost"
+
USER = "tester"
PASSWD = "test123"
REALM = "TestRealm"
- PROXY_URL = "http://127.0.0.1:%d" % PORT
+ def setUp(self):
+ super(ProxyAuthTests, self).setUp()
+ self.digest_auth_handler = DigestAuthHandler()
+ self.digest_auth_handler.set_users({self.USER: self.PASSWD})
+ self.digest_auth_handler.set_realm(self.REALM)
+ def create_fake_proxy_handler(*args, **kwargs):
+ return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
- def setUp(self):
- FakeProxyHandler.digest_auth_handler.set_users({
- self.USER : self.PASSWD
- })
- FakeProxyHandler.digest_auth_handler.set_realm(self.REALM)
-
- self.server = LoopbackHttpServerThread(self.PORT, FakeProxyHandler)
+ self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
self.server.start()
self.server.ready.wait()
-
- handler = urllib2.ProxyHandler({"http" : self.PROXY_URL})
- self._digest_auth_handler = urllib2.ProxyDigestAuthHandler()
- self.opener = urllib2.build_opener(handler, self._digest_auth_handler)
+ proxy_url = "http://127.0.0.1:%d" % self.server.port
+ handler = urllib2.ProxyHandler({"http" : proxy_url})
+ self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
+ self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
def tearDown(self):
self.server.stop()
+ super(ProxyAuthTests, self).tearDown()
def test_proxy_with_bad_password_raises_httperror(self):
- self._digest_auth_handler.add_password(self.REALM, self.URL,
+ self.proxy_digest_handler.add_password(self.REALM, self.URL,
self.USER, self.PASSWD+"bad")
- FakeProxyHandler.digest_auth_handler.set_qop("auth")
+ self.digest_auth_handler.set_qop("auth")
self.assertRaises(urllib2.HTTPError,
self.opener.open,
self.URL)
def test_proxy_with_no_password_raises_httperror(self):
- FakeProxyHandler.digest_auth_handler.set_qop("auth")
+ self.digest_auth_handler.set_qop("auth")
self.assertRaises(urllib2.HTTPError,
self.opener.open,
self.URL)
def test_proxy_qop_auth_works(self):
- self._digest_auth_handler.add_password(self.REALM, self.URL,
+ self.proxy_digest_handler.add_password(self.REALM, self.URL,
self.USER, self.PASSWD)
- FakeProxyHandler.digest_auth_handler.set_qop("auth")
+ self.digest_auth_handler.set_qop("auth")
result = self.opener.open(self.URL)
while result.read():
pass
result.close()
def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
- self._digest_auth_handler.add_password(self.REALM, self.URL,
+ self.proxy_digest_handler.add_password(self.REALM, self.URL,
self.USER, self.PASSWD)
- FakeProxyHandler.digest_auth_handler.set_qop("auth-int")
+ self.digest_auth_handler.set_qop("auth-int")
try:
result = self.opener.open(self.URL)
except urllib2.URLError:
@@ -289,6 +306,244 @@
pass
result.close()
+
+def GetRequestHandler(responses):
+
+ class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+
+ server_version = "TestHTTP/"
+ requests = []
+ headers_received = []
+ port = 80
+
+ def do_GET(self):
+ body = self.send_head()
+ if body:
+ self.wfile.write(body)
+
+ def do_POST(self):
+ content_length = self.headers['Content-Length']
+ post_data = self.rfile.read(int(content_length))
+ self.do_GET()
+ self.requests.append(post_data)
+
+ def send_head(self):
+ FakeHTTPRequestHandler.headers_received = self.headers
+ self.requests.append(self.path)
+ response_code, headers, body = responses.pop(0)
+
+ self.send_response(response_code)
+
+ for (header, value) in headers:
+ self.send_header(header, value % self.port)
+ if body:
+ self.send_header('Content-type', 'text/plain')
+ self.end_headers()
+ return body
+ self.end_headers()
+
+ def log_message(self, *args):
+ pass
+
+
+ return FakeHTTPRequestHandler
+
+
+class TestUrlopen(BaseTestCase):
+ """Tests urllib2.urlopen using the network.
+
+ These tests are not exhaustive. Assuming that testing using files does a
+ good job overall of some of the basic interface features. There are no
+ tests exercising the optional 'data' and 'proxies' arguments. No tests
+ for transparent redirection have been written.
+ """
+
+ def setUp(self):
+ proxy_handler = urllib2.ProxyHandler({})
+ opener = urllib2.build_opener(proxy_handler)
+ urllib2.install_opener(opener)
+ super(TestUrlopen, self).setUp()
+
+ def start_server(self, responses):
+ handler = GetRequestHandler(responses)
+
+ self.server = LoopbackHttpServerThread(handler)
+ self.server.start()
+ self.server.ready.wait()
+ port = self.server.port
+ handler.port = port
+ return handler
+
+
+ def test_redirection(self):
+ expected_response = 'We got here...'
+ responses = [
+ (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
+ (200, [], expected_response)
+ ]
+
+ handler = self.start_server(responses)
+
+ try:
+ f = urllib2.urlopen('http://localhost:%s/' % handler.port)
+ data = f.read()
+ f.close()
+
+ self.assertEqual(data, expected_response)
+ self.assertEqual(handler.requests, ['/', '/somewhere_else'])
+ finally:
+ self.server.stop()
+
+
+ def test_404(self):
+ expected_response = 'Bad bad bad...'
+ handler = self.start_server([(404, [], expected_response)])
+
+ try:
+ try:
+ urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
+ except urllib2.URLError, f:
+ pass
+ else:
+ self.fail('404 should raise URLError')
+
+ data = f.read()
+ f.close()
+
+ self.assertEqual(data, expected_response)
+ self.assertEqual(handler.requests, ['/weeble'])
+ finally:
+ self.server.stop()
+
+
+ def test_200(self):
+ expected_response = 'pycon 2008...'
+ handler = self.start_server([(200, [], expected_response)])
+
+ try:
+ f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
+ data = f.read()
+ f.close()
+
+ self.assertEqual(data, expected_response)
+ self.assertEqual(handler.requests, ['/bizarre'])
+ finally:
+ self.server.stop()
+
+ def test_200_with_parameters(self):
+ expected_response = 'pycon 2008...'
+ handler = self.start_server([(200, [], expected_response)])
+
+ try:
+ f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')
+ data = f.read()
+ f.close()
+
+ self.assertEqual(data, expected_response)
+ self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling'])
+ finally:
+ self.server.stop()
+
+
+ def test_sending_headers(self):
+ handler = self.start_server([(200, [], "we don't care")])
+
+ try:
+ req = urllib2.Request("http://localhost:%s/" % handler.port,
+ headers={'Range': 'bytes=20-39'})
+ urllib2.urlopen(req)
+ self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
+ finally:
+ self.server.stop()
+
+ def test_basic(self):
+ handler = self.start_server([(200, [], "we don't care")])
+
+ try:
+ open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
+ for attr in ("read", "close", "info", "geturl"):
+ self.assertTrue(hasattr(open_url, attr), "object returned from "
+ "urlopen lacks the %s attribute" % attr)
+ try:
+ self.assertTrue(open_url.read(), "calling 'read' failed")
+ finally:
+ open_url.close()
+ finally:
+ self.server.stop()
+
+ def test_info(self):
+ handler = self.start_server([(200, [], "we don't care")])
+
+ try:
+ open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
+ info_obj = open_url.info()
+ self.assertIsInstance(info_obj, mimetools.Message,
+ "object returned by 'info' is not an "
+ "instance of mimetools.Message")
+ self.assertEqual(info_obj.getsubtype(), "plain")
+ finally:
+ self.server.stop()
+
+ def test_geturl(self):
+ # Make sure same URL as opened is returned by geturl.
+ handler = self.start_server([(200, [], "we don't care")])
+
+ try:
+ open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
+ url = open_url.geturl()
+ self.assertEqual(url, "http://localhost:%s" % handler.port)
+ finally:
+ self.server.stop()
+
+
+ def test_bad_address(self):
+ # Make sure proper exception is raised when connecting to a bogus
+ # address.
+ self.assertRaises(IOError,
+ # Given that both VeriSign and various ISPs have in
+ # the past or are presently hijacking various invalid
+ # domain name requests in an attempt to boost traffic
+ # to their own sites, finding a domain name to use
+ # for this test is difficult. RFC2606 leads one to
+ # believe that '.invalid' should work, but experience
+ # seemed to indicate otherwise. Single character
+ # TLDs are likely to remain invalid, so this seems to
+ # be the best choice. The trailing '.' prevents a
+ # related problem: The normal DNS resolver appends
+ # the domain names from the search path if there is
+ # no '.' the end and, and if one of those domains
+ # implements a '*' rule a result is returned.
+ # However, none of this will prevent the test from
+ # failing if the ISP hijacks all invalid domain
+ # requests. The real solution would be to be able to
+ # parameterize the framework with a mock resolver.
+ urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./")
+
+ def test_iteration(self):
+ expected_response = "pycon 2008..."
+ handler = self.start_server([(200, [], expected_response)])
+ try:
+ data = urllib2.urlopen("http://localhost:%s" % handler.port)
+ for line in data:
+ self.assertEqual(line, expected_response)
+ finally:
+ self.server.stop()
+
+ def ztest_line_iteration(self):
+ lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"]
+ expected_response = "".join(lines)
+ handler = self.start_server([(200, [], expected_response)])
+ try:
+ data = urllib2.urlopen("http://localhost:%s" % handler.port)
+ for index, line in enumerate(data):
+ self.assertEqual(line, lines[index],
+ "Fetched line number %s doesn't match expected:\n"
+ " Expected length was %s, got %s" %
+ (index, len(lines[index]), len(line)))
+ finally:
+ self.server.stop()
+ self.assertEqual(index + 1, len(lines))
+
def test_main():
# We will NOT depend on the network resource flag
# (Lib/test/regrtest.py -u network) since all tests here are only
@@ -296,7 +551,7 @@
# the next line.
#test_support.requires("network")
- test_support.run_unittest(ProxyAuthTests)
+ test_support.run_unittest(ProxyAuthTests, TestUrlopen)
if __name__ == "__main__":
test_main()
diff --git a/Lib/urllib.py b/Lib/urllib.py
--- a/Lib/urllib.py
+++ b/Lib/urllib.py
@@ -27,6 +27,8 @@
import os
import time
import sys
+import base64
+
from urlparse import urljoin as basejoin
__all__ = ["urlopen", "URLopener", "FancyURLopener", "urlretrieve",
@@ -42,9 +44,7 @@
MAXFTPCACHE = 10 # Trim the ftp cache beyond this size
# Helper for non-unix systems
-if os.name == 'mac':
- from macurl2path import url2pathname, pathname2url
-elif (os._name if sys.platform.startswith('java') else os.name) == 'nt':
+if (os._name if sys.platform.startswith('java') else os.name) == 'nt':
from nturl2path import url2pathname, pathname2url
elif os.name == 'riscos':
from rourl2path import url2pathname, pathname2url
@@ -94,7 +94,7 @@
def urlcleanup():
if _urlopener:
_urlopener.cleanup()
- _safemaps.clear()
+ _safe_quoters.clear()
ftpcache.clear()
# check for SSL
@@ -177,8 +177,8 @@
def open(self, fullurl, data=None):
"""Use URLopener().open(file) instead of open(file, 'r')."""
fullurl = unwrap(toBytes(fullurl))
- # percent encode url. fixing lame server errors like space within url
- # parts
+ # percent encode url, fixing lame server errors for e.g, like space
+ # within url paths.
fullurl = quote(fullurl, safe="%/:=&?~#+!$,;'@()*[]|")
if self.tempcache and fullurl in self.tempcache:
filename, headers = self.tempcache[fullurl]
@@ -232,9 +232,9 @@
try:
fp = self.open_local_file(url1)
hdrs = fp.info()
- del fp
+ fp.close()
return url2pathname(splithost(url1)[1]), hdrs
- except IOError, msg:
+ except IOError:
pass
fp = self.open(url, data)
try:
@@ -259,9 +259,9 @@
size = -1
read = 0
blocknum = 0
+ if "content-length" in headers:
+ size = int(headers["Content-Length"])
if reporthook:
- if "content-length" in headers:
- size = int(headers["Content-Length"])
reporthook(blocknum, bs, size)
while 1:
block = fp.read(bs)
@@ -276,8 +276,6 @@
tfp.close()
finally:
fp.close()
- del fp
- del tfp
# raise exception if actual size does not match content-length header
if size >= 0 and read < size:
@@ -322,13 +320,13 @@
if not host: raise IOError, ('http error', 'no host given')
if proxy_passwd:
- import base64
+ proxy_passwd = unquote(proxy_passwd)
proxy_auth = base64.b64encode(proxy_passwd).strip()
else:
proxy_auth = None
if user_passwd:
- import base64
+ user_passwd = unquote(user_passwd)
auth = base64.b64encode(user_passwd).strip()
else:
auth = None
@@ -343,9 +341,7 @@
if auth: h.putheader('Authorization', 'Basic %s' % auth)
if realhost: h.putheader('Host', realhost)
for args in self.addheaders: h.putheader(*args)
- h.endheaders()
- if data is not None:
- h.send(data)
+ h.endheaders(data)
errcode, errmsg, headers = h.getreply()
fp = h.getfile()
if errcode == -1:
@@ -380,7 +376,6 @@
def http_error_default(self, url, fp, errcode, errmsg, headers):
"""Default error handler: close the connection and raise IOError."""
- void = fp.read()
fp.close()
raise IOError, ('http error', errcode, errmsg, headers)
@@ -415,12 +410,12 @@
#print "proxy via https:", host, selector
if not host: raise IOError, ('https error', 'no host given')
if proxy_passwd:
- import base64
+ proxy_passwd = unquote(proxy_passwd)
proxy_auth = base64.b64encode(proxy_passwd).strip()
else:
proxy_auth = None
if user_passwd:
- import base64
+ user_passwd = unquote(user_passwd)
auth = base64.b64encode(user_passwd).strip()
else:
auth = None
@@ -438,9 +433,7 @@
if auth: h.putheader('Authorization', 'Basic %s' % auth)
if realhost: h.putheader('Host', realhost)
for args in self.addheaders: h.putheader(*args)
- h.endheaders()
- if data is not None:
- h.send(data)
+ h.endheaders(data)
errcode, errmsg, headers = h.getreply()
fp = h.getfile()
if errcode == -1:
@@ -491,6 +484,8 @@
urlfile = file
if file[:1] == '/':
urlfile = 'file://' + file
+ elif file[:2] == './':
+ raise ValueError("local file url may start with / or file:. Unknown url of type: %s" % url)
return addinfourl(open(localname, 'rb'),
headers, urlfile)
host, port = splitport(host)
@@ -519,8 +514,8 @@
if user: user, passwd = splitpasswd(user)
else: passwd = None
host = unquote(host)
- user = unquote(user or '')
- passwd = unquote(passwd or '')
+ user = user or ''
+ passwd = passwd or ''
host = socket.gethostbyname(host)
if not port:
import ftplib
@@ -598,7 +593,6 @@
time.gmtime(time.time())))
msg.append('Content-type: %s' % type)
if encoding == 'base64':
- import base64
data = base64.decodestring(data)
else:
data = unquote(data)
@@ -648,7 +642,6 @@
newurl = headers['uri']
else:
return
- void = fp.read()
fp.close()
# In case the server sent a relative URL, join with original:
newurl = basejoin(self.type + ":" + url, newurl)
@@ -785,7 +778,7 @@
else:
return self.open(newurl, data)
- def get_user_passwd(self, host, realm, clear_cache = 0):
+ def get_user_passwd(self, host, realm, clear_cache=0):
key = realm + '@' + host.lower()
if key in self.auth_cache:
if clear_cache:
@@ -858,13 +851,16 @@
"""Class used by open_ftp() for cache of open FTP connections."""
def __init__(self, user, passwd, host, port, dirs,
- timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
+ timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
+ persistent=True):
self.user = user
self.passwd = passwd
self.host = host
self.port = port
self.dirs = dirs
self.timeout = timeout
+ self.refcount = 0
+ self.keepalive = persistent
self.init()
def init(self):
@@ -891,7 +887,7 @@
# Try to retrieve as a file
try:
cmd = 'RETR ' + file
- conn = self.ftp.ntransfercmd(cmd)
+ conn, retrlen = self.ftp.ntransfercmd(cmd)
except ftplib.error_perm, reason:
if str(reason)[:3] != '550':
raise IOError, ('ftp error', reason), sys.exc_info()[2]
@@ -911,11 +907,14 @@
cmd = 'LIST ' + file
else:
cmd = 'LIST'
- conn = self.ftp.ntransfercmd(cmd)
+ conn, retrlen = self.ftp.ntransfercmd(cmd)
self.busy = 1
+ ftpobj = addclosehook(conn.makefile('rb'), self.file_close)
+ self.refcount += 1
+ conn.close()
# Pass back both a suitably decorated object and a retrieval length
- return (addclosehook(conn[0].makefile('rb'),
- self.endtransfer), conn[1])
+ return (ftpobj, retrlen)
+
def endtransfer(self):
if not self.busy:
return
@@ -926,6 +925,17 @@
pass
def close(self):
+ self.keepalive = False
+ if self.refcount <= 0:
+ self.real_close()
+
+ def file_close(self):
+ self.endtransfer()
+ self.refcount -= 1
+ if self.refcount <= 0 and not self.keepalive:
+ self.real_close()
+
+ def real_close(self):
self.endtransfer()
try:
self.ftp.close()
@@ -970,11 +980,11 @@
self.hookargs = hookargs
def close(self):
- addbase.close(self)
if self.closehook:
self.closehook(*self.hookargs)
self.closehook = None
self.hookargs = None
+ addbase.close(self)
class addinfo(addbase):
"""class to add an info() method to an open file."""
@@ -1072,7 +1082,12 @@
_hostprog = re.compile('^//([^/?]*)(.*)$')
match = _hostprog.match(url)
- if match: return match.group(1, 2)
+ if match:
+ host_port = match.group(1)
+ path = match.group(2)
+ if path and not path.startswith('/'):
+ path = '/' + path
+ return host_port, path
return None, url
_userprog = None
@@ -1084,7 +1099,7 @@
_userprog = re.compile('^(.*)@(.*)$')
match = _userprog.match(host)
- if match: return map(unquote, match.group(1, 2))
+ if match: return match.group(1, 2)
return None, host
_passwdprog = None
@@ -1093,7 +1108,7 @@
global _passwdprog
if _passwdprog is None:
import re
- _passwdprog = re.compile('^([^:]*):(.*)$')
+ _passwdprog = re.compile('^([^:]*):(.*)$',re.S)
match = _passwdprog.match(user)
if match: return match.group(1, 2)
@@ -1176,21 +1191,29 @@
if match: return match.group(1, 2)
return attr, None
+# urlparse contains a duplicate of this method to avoid a circular import. If
+# you update this method, also update the copy in urlparse. This code
+# duplication does not exist in Python3.
+
_hexdig = '0123456789ABCDEFabcdef'
-_hextochr = dict((a+b, chr(int(a+b,16))) for a in _hexdig for b in _hexdig)
+_hextochr = dict((a + b, chr(int(a + b, 16)))
+ for a in _hexdig for b in _hexdig)
def unquote(s):
"""unquote('abc%20def') -> 'abc def'."""
res = s.split('%')
- for i in xrange(1, len(res)):
- item = res[i]
+ # fastpath
+ if len(res) == 1:
+ return s
+ s = res[0]
+ for item in res[1:]:
try:
- res[i] = _hextochr[item[:2]] + item[2:]
+ s += _hextochr[item[:2]] + item[2:]
except KeyError:
- res[i] = '%' + item
+ s += '%' + item
except UnicodeDecodeError:
- res[i] = unichr(int(item[:2], 16)) + item[2:]
- return "".join(res)
+ s += unichr(int(item[:2], 16)) + item[2:]
+ return s
def unquote_plus(s):
"""unquote('%7e/abc+def') -> '~/abc def'"""
@@ -1200,9 +1223,12 @@
always_safe = ('ABCDEFGHIJKLMNOPQRSTUVWXYZ'
'abcdefghijklmnopqrstuvwxyz'
'0123456789' '_.-')
-_safemaps = {}
+_safe_map = {}
+for i, c in zip(xrange(256), str(bytearray(xrange(256)))):
+ _safe_map[c] = c if (i < 128 and c in always_safe) else '%{:02X}'.format(i)
+_safe_quoters = {}
-def quote(s, safe = '/'):
+def quote(s, safe='/'):
"""quote('abc def') -> 'abc%20def'
Each part of a URL, e.g. the path info, the query, etc., has a
@@ -1223,27 +1249,32 @@
called on a path where the existing slash characters are used as
reserved characters.
"""
+ # fastpath
+ if not s:
+ if s is None:
+ raise TypeError('None object cannot be quoted')
+ return s
cachekey = (safe, always_safe)
try:
- safe_map = _safemaps[cachekey]
+ (quoter, safe) = _safe_quoters[cachekey]
except KeyError:
- safe += always_safe
- safe_map = {}
- for i in range(256):
- c = chr(i)
- safe_map[c] = (c in safe) and c or ('%%%02X' % i)
- _safemaps[cachekey] = safe_map
- res = map(safe_map.__getitem__, s)
- return ''.join(res)
+ safe_map = _safe_map.copy()
+ safe_map.update([(c, c) for c in safe])
+ quoter = safe_map.__getitem__
+ safe = always_safe + safe
+ _safe_quoters[cachekey] = (quoter, safe)
+ if not s.rstrip(safe):
+ return s
+ return ''.join(map(quoter, s))
-def quote_plus(s, safe = ''):
+def quote_plus(s, safe=''):
"""Quote the query fragment of a URL; replacing ' ' with '+'"""
if ' ' in s:
s = quote(s, safe + ' ')
return s.replace(' ', '+')
return quote(s, safe)
-def urlencode(query,doseq=0):
+def urlencode(query, doseq=0):
"""Encode a sequence of two-element tuples or dictionary into a URL query string.
If any values in the query arg are sequences and doseq is true, each
@@ -1295,7 +1326,7 @@
else:
try:
# is this a sufficient test for sequence-ness?
- x = len(v)
+ len(v)
except TypeError:
# not a sequence
v = quote_plus(str(v))
@@ -1336,7 +1367,8 @@
# strip port off host
hostonly, port = splitport(host)
# check if the host ends with any of the DNS suffixes
- for name in no_proxy.split(','):
+ no_proxy_list = [proxy.strip() for proxy in no_proxy.split(',')]
+ for name in no_proxy_list:
if name and (hostonly.endswith(name) or host.endswith(name)):
return 1
# otherwise, don't bypass
@@ -1395,7 +1427,7 @@
else:
mask = int(mask[1:])
- mask = 32 - mask
+ mask = 32 - mask
if (hostIP >> mask) == (base >> mask):
return True
@@ -1405,7 +1437,6 @@
return False
-
def getproxies_macosx_sysconf():
"""Return a dictionary of scheme -> proxy server URL mappings.
@@ -1414,8 +1445,6 @@
"""
return _get_proxies()
-
-
def proxy_bypass(host):
if getproxies_environment():
return proxy_bypass_environment(host)
@@ -1519,18 +1548,11 @@
# '<local>' string by the localhost entry and the corresponding
# canonical entry.
proxyOverride = proxyOverride.split(';')
- i = 0
- while i < len(proxyOverride):
- if proxyOverride[i] == '<local>':
- proxyOverride[i:i+1] = ['localhost',
- '127.0.0.1',
- socket.gethostname(),
- socket.gethostbyname(
- socket.gethostname())]
- i += 1
- # print proxyOverride
# now check if we match one of the registry values.
for test in proxyOverride:
+ if test == '<local>':
+ if '.' not in rawHost:
+ return 1
test = test.replace(".", r"\.") # mask dots
test = test.replace("*", r".*") # change glob sequence
test = test.replace("?", r".") # change glob char
@@ -1578,67 +1600,3 @@
# Report during remote transfers
print "Block number: %d, Block size: %d, Total size: %d" % (
blocknum, blocksize, totalsize)
-
-# Test program
-def test(args=[]):
- if not args:
- args = [
- '/etc/passwd',
- 'file:/etc/passwd',
- 'file://localhost/etc/passwd',
- 'ftp://ftp.gnu.org/pub/README',
- 'http://www.python.org/index.html',
- ]
- if hasattr(URLopener, "open_https"):
- args.append('https://synergy.as.cmu.edu/~geek/')
- try:
- for url in args:
- print '-'*10, url, '-'*10
- fn, h = urlretrieve(url, None, reporthook)
- print fn
- if h:
- print '======'
- for k in h.keys(): print k + ':', h[k]
- print '======'
- fp = open(fn, 'rb')
- data = fp.read()
- del fp
- if '\r' in data:
- table = string.maketrans("", "")
- data = data.translate(table, "\r")
- print data
- fn, h = None, None
- print '-'*40
- finally:
- urlcleanup()
-
-def main():
- import getopt, sys
- try:
- opts, args = getopt.getopt(sys.argv[1:], "th")
- except getopt.error, msg:
- print msg
- print "Use -h for help"
- return
- t = 0
- for o, a in opts:
- if o == '-t':
- t = t + 1
- if o == '-h':
- print "Usage: python urllib.py [-t] [url ...]"
- print "-t runs self-test;",
- print "otherwise, contents of urls are printed"
- return
- if t:
- if t > 1:
- test1()
- test(args)
- else:
- if not args:
- print "Use -h for help"
- for url in args:
- print urlopen(url).read(),
-
-# Run test program when run as a script
-if __name__ == '__main__':
- main()
--
Repository URL: http://hg.python.org/jython
More information about the Jython-checkins
mailing list