[Jython-checkins] jython: Import latest test_urllib, test_urllib2 and test_urllib2_localnet from cpython

alan.kennedy jython-checkins at python.org
Sat Feb 2 16:20:10 CET 2013


http://hg.python.org/jython/rev/a720ee1162d6
changeset:   6979:a720ee1162d6
user:        Alan Kennedy <alan at xhaus.com>
date:        Sat Feb 02 14:18:15 2013 +0000
summary:
  Import latest test_urllib, test_urllib2 and test_urllib2_localnet from cpython 2.7: http://hg.python.org/cpython/file/b6b707063991/

files:
  Lib/test/test_urllib.py           |  253 +++++++++++--
  Lib/test/test_urllib2.py          |   99 ++++-
  Lib/test/test_urllib2_localnet.py |  331 +++++++++++++++--
  3 files changed, 590 insertions(+), 93 deletions(-)


diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py
--- a/Lib/test/test_urllib.py
+++ b/Lib/test/test_urllib.py
@@ -3,12 +3,16 @@
 import urllib
 import httplib
 import unittest
-from test import test_support
 import os
+import sys
 import mimetools
 import tempfile
 import StringIO
 
+from test import test_support
+from base64 import b64encode
+
+
 def hexescape(char):
     """Escape char as RFC 2396 specifies"""
     hex_repr = hex(ord(char))[2:].upper()
@@ -16,6 +20,43 @@
         hex_repr = "0%s" % hex_repr
     return "%" + hex_repr
 
+
+class FakeHTTPMixin(object):
+    def fakehttp(self, fakedata):
+        class FakeSocket(StringIO.StringIO):
+
+            def sendall(self, data):
+                FakeHTTPConnection.buf = data
+
+            def makefile(self, *args, **kwds):
+                return self
+
+            def read(self, amt=None):
+                if self.closed:
+                    return ""
+                return StringIO.StringIO.read(self, amt)
+
+            def readline(self, length=None):
+                if self.closed:
+                    return ""
+                return StringIO.StringIO.readline(self, length)
+
+        class FakeHTTPConnection(httplib.HTTPConnection):
+
+            # buffer to store data for verification in urlopen tests.
+            buf = ""
+
+            def connect(self):
+                self.sock = FakeSocket(fakedata)
+
+        assert httplib.HTTP._connection_class == httplib.HTTPConnection
+
+        httplib.HTTP._connection_class = FakeHTTPConnection
+
+    def unfakehttp(self):
+        httplib.HTTP._connection_class = httplib.HTTPConnection
+
+
 class urlopen_FileTests(unittest.TestCase):
     """Test urlopen() opening a temporary file.
 
@@ -44,7 +85,7 @@
         # Make sure object returned by urlopen() has the specified methods
         for attr in ("read", "readline", "readlines", "fileno",
                      "close", "info", "geturl", "getcode", "__iter__"):
-            self.assert_(hasattr(self.returned_obj, attr),
+            self.assertTrue(hasattr(self.returned_obj, attr),
                          "object returned by urlopen() lacks %s attribute" %
                          attr)
 
@@ -66,9 +107,7 @@
 
     def test_fileno(self):
         file_num = self.returned_obj.fileno()
-        if not test_support.is_jython:
-            self.assert_(isinstance(file_num, int),
-                         "fileno() did not return an int")
+        self.assertIsInstance(file_num, int, "fileno() did not return an int")
         self.assertEqual(os.read(file_num, len(self.text)), self.text,
                          "Reading on the file descriptor returned by fileno() "
                          "did not return the expected text")
@@ -79,7 +118,7 @@
         self.returned_obj.close()
 
     def test_info(self):
-        self.assert_(isinstance(self.returned_obj.info(), mimetools.Message))
+        self.assertIsInstance(self.returned_obj.info(), mimetools.Message)
 
     def test_geturl(self):
         self.assertEqual(self.returned_obj.geturl(), self.pathname)
@@ -95,6 +134,9 @@
         for line in self.returned_obj.__iter__():
             self.assertEqual(line, self.text)
 
+    def test_relativelocalfile(self):
+        self.assertRaises(ValueError,urllib.urlopen,'./' + self.pathname)
+
 class ProxyTests(unittest.TestCase):
 
     def setUp(self):
@@ -114,31 +156,15 @@
         self.env.set('NO_PROXY', 'localhost')
         proxies = urllib.getproxies_environment()
         # getproxies_environment use lowered case truncated (no '_proxy') keys
-        self.assertEquals('localhost', proxies['no'])
+        self.assertEqual('localhost', proxies['no'])
+        # List of no_proxies with space.
+        self.env.set('NO_PROXY', 'localhost, anotherdomain.com, newdomain.com')
+        self.assertTrue(urllib.proxy_bypass_environment('anotherdomain.com'))
 
 
-class urlopen_HttpTests(unittest.TestCase):
+class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin):
     """Test urlopen() opening a fake http connection."""
 
-    def fakehttp(self, fakedata):
-        class FakeSocket(StringIO.StringIO):
-            def sendall(self, str): pass
-            def makefile(self, mode, name): return self
-            def read(self, amt=None):
-                if self.closed: return ''
-                return StringIO.StringIO.read(self, amt)
-            def readline(self, length=None):
-                if self.closed: return ''
-                return StringIO.StringIO.readline(self, length)
-        class FakeHTTPConnection(httplib.HTTPConnection):
-            def connect(self):
-                self.sock = FakeSocket(fakedata)
-        assert httplib.HTTP._connection_class == httplib.HTTPConnection
-        httplib.HTTP._connection_class = FakeHTTPConnection
-
-    def unfakehttp(self):
-        httplib.HTTP._connection_class = httplib.HTTPConnection
-
     def test_read(self):
         self.fakehttp('Hello!')
         try:
@@ -150,6 +176,16 @@
         finally:
             self.unfakehttp()
 
+    def test_url_fragment(self):
+        # Issue #11703: geturl() omits fragments in the original URL.
+        url = 'http://docs.python.org/library/urllib.html#OK'
+        self.fakehttp('Hello!')
+        try:
+            fp = urllib.urlopen(url)
+            self.assertEqual(fp.geturl(), url)
+        finally:
+            self.unfakehttp()
+
     def test_read_bogus(self):
         # urlopen() should raise IOError for many error codes.
         self.fakehttp('''HTTP/1.1 401 Authentication Required
@@ -186,6 +222,62 @@
         finally:
             self.unfakehttp()
 
+    def test_missing_localfile(self):
+        self.assertRaises(IOError, urllib.urlopen,
+                'file://localhost/a/missing/file.py')
+        fd, tmp_file = tempfile.mkstemp()
+        tmp_fileurl = 'file://localhost/' + tmp_file.replace(os.path.sep, '/')
+        try:
+            self.assertTrue(os.path.exists(tmp_file))
+            fp = urllib.urlopen(tmp_fileurl)
+        finally:
+            os.close(fd)
+            fp.close()
+        os.unlink(tmp_file)
+
+        self.assertFalse(os.path.exists(tmp_file))
+        self.assertRaises(IOError, urllib.urlopen, tmp_fileurl)
+
+    def test_ftp_nonexisting(self):
+        self.assertRaises(IOError, urllib.urlopen,
+                'ftp://localhost/not/existing/file.py')
+
+
+    def test_userpass_inurl(self):
+        self.fakehttp('Hello!')
+        try:
+            fakehttp_wrapper = httplib.HTTP._connection_class
+            fp = urllib.urlopen("http://user:pass@python.org/")
+            authorization = ("Authorization: Basic %s\r\n" %
+                            b64encode('user:pass'))
+            # The authorization header must be in place
+            self.assertIn(authorization, fakehttp_wrapper.buf)
+            self.assertEqual(fp.readline(), "Hello!")
+            self.assertEqual(fp.readline(), "")
+            self.assertEqual(fp.geturl(), 'http://user:pass@python.org/')
+            self.assertEqual(fp.getcode(), 200)
+        finally:
+            self.unfakehttp()
+
+    def test_userpass_with_spaces_inurl(self):
+        self.fakehttp('Hello!')
+        try:
+            url = "http://a b:c d at python.org/"
+            fakehttp_wrapper = httplib.HTTP._connection_class
+            authorization = ("Authorization: Basic %s\r\n" %
+                             b64encode('a b:c d'))
+            fp = urllib.urlopen(url)
+            # The authorization header must be in place
+            self.assertIn(authorization, fakehttp_wrapper.buf)
+            self.assertEqual(fp.readline(), "Hello!")
+            self.assertEqual(fp.readline(), "")
+            # the spaces are quoted in URL so no match
+            self.assertNotEqual(fp.geturl(), url)
+            self.assertEqual(fp.getcode(), 200)
+        finally:
+            self.unfakehttp()
+
+
 class urlretrieve_FileTests(unittest.TestCase):
     """Test urllib.urlretrieve() on local files"""
 
@@ -243,9 +335,9 @@
         # a headers value is returned.
         result = urllib.urlretrieve("file:%s" % test_support.TESTFN)
         self.assertEqual(result[0], test_support.TESTFN)
-        self.assert_(isinstance(result[1], mimetools.Message),
-                     "did not get a mimetools.Message instance as second "
-                     "returned value")
+        self.assertIsInstance(result[1], mimetools.Message,
+                              "did not get a mimetools.Message instance as "
+                              "second returned value")
 
     def test_copy(self):
         # Test that setting the filename argument works.
@@ -254,7 +346,7 @@
         result = urllib.urlretrieve(self.constructLocalFileUrl(
             test_support.TESTFN), second_temp)
         self.assertEqual(second_temp, result[0])
-        self.assert_(os.path.exists(second_temp), "copy of the file was not "
+        self.assertTrue(os.path.exists(second_temp), "copy of the file was not "
                                                   "made")
         FILE = file(second_temp, 'rb')
         try:
@@ -268,9 +360,9 @@
     def test_reporthook(self):
         # Make sure that the reporthook works.
         def hooktester(count, block_size, total_size, count_holder=[0]):
-            self.assert_(isinstance(count, int))
-            self.assert_(isinstance(block_size, int))
-            self.assert_(isinstance(total_size, int))
+            self.assertIsInstance(count, int)
+            self.assertIsInstance(block_size, int)
+            self.assertIsInstance(total_size, int)
             self.assertEqual(count, count_holder[0])
             count_holder[0] = count_holder[0] + 1
         second_temp = "%s.2" % test_support.TESTFN
@@ -318,6 +410,45 @@
         self.assertEqual(report[0][1], 8192)
         self.assertEqual(report[0][2], 8193)
 
+
+class urlretrieve_HttpTests(unittest.TestCase, FakeHTTPMixin):
+    """Test urllib.urlretrieve() using fake http connections"""
+
+    def test_short_content_raises_ContentTooShortError(self):
+        self.fakehttp('''HTTP/1.1 200 OK
+Date: Wed, 02 Jan 2008 03:03:54 GMT
+Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
+Connection: close
+Content-Length: 100
+Content-Type: text/html; charset=iso-8859-1
+
+FF
+''')
+
+        def _reporthook(par1, par2, par3):
+            pass
+
+        try:
+            self.assertRaises(urllib.ContentTooShortError, urllib.urlretrieve,
+                    'http://example.com', reporthook=_reporthook)
+        finally:
+            self.unfakehttp()
+
+    def test_short_content_raises_ContentTooShortError_without_reporthook(self):
+        self.fakehttp('''HTTP/1.1 200 OK
+Date: Wed, 02 Jan 2008 03:03:54 GMT
+Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
+Connection: close
+Content-Length: 100
+Content-Type: text/html; charset=iso-8859-1
+
+FF
+''')
+        try:
+            self.assertRaises(urllib.ContentTooShortError, urllib.urlretrieve, 'http://example.com/')
+        finally:
+            self.unfakehttp()
+
 class QuotingTests(unittest.TestCase):
     """Tests for urllib.quote() and urllib.quote_plus()
 
@@ -395,8 +526,10 @@
         result = urllib.quote(partial_quote)
         self.assertEqual(expected, result,
                          "using quote(): %s != %s" % (expected, result))
+        result = urllib.quote_plus(partial_quote)
         self.assertEqual(expected, result,
                          "using quote_plus(): %s != %s" % (expected, result))
+        self.assertRaises(TypeError, urllib.quote, None)
 
     def test_quoting_space(self):
         # Make sure quote() and quote_plus() handle spaces as specified in
@@ -527,7 +660,7 @@
         expect_somewhere = ["1st=1", "2nd=2", "3rd=3"]
         result = urllib.urlencode(given)
         for expected in expect_somewhere:
-            self.assert_(expected in result,
+            self.assertIn(expected, result,
                          "testing %s: %s not found in %s" %
                          (test_type, expected, result))
         self.assertEqual(result.count('&'), 2,
@@ -536,7 +669,7 @@
         amp_location = result.index('&')
         on_amp_left = result[amp_location - 1]
         on_amp_right = result[amp_location + 1]
-        self.assert_(on_amp_left.isdigit() and on_amp_right.isdigit(),
+        self.assertTrue(on_amp_left.isdigit() and on_amp_right.isdigit(),
                      "testing %s: '&' not located in proper place in %s" %
                      (test_type, result))
         self.assertEqual(len(result), (5 * 3) + 2, #5 chars per thing and amps
@@ -574,8 +707,7 @@
         result = urllib.urlencode(given, True)
         for value in given["sequence"]:
             expect = "sequence=%s" % value
-            self.assert_(expect in result,
-                         "%s not found in %s" % (expect, result))
+            self.assertIn(expect, result)
         self.assertEqual(result.count('&'), 2,
                          "Expected 2 '&'s, got %s" % result.count('&'))
 
@@ -622,8 +754,45 @@
                          "url2pathname() failed; %s != %s" %
                          (expect, result))
 
+    @unittest.skipUnless(sys.platform == 'win32',
+                         'test specific to the nturl2path library')
+    def test_ntpath(self):
+        given = ('/C:/', '///C:/', '/C|//')
+        expect = 'C:\\'
+        for url in given:
+            result = urllib.url2pathname(url)
+            self.assertEqual(expect, result,
+                             'nturl2path.url2pathname() failed; %s != %s' %
+                             (expect, result))
+        given = '///C|/path'
+        expect = 'C:\\path'
+        result = urllib.url2pathname(given)
+        self.assertEqual(expect, result,
+                         'nturl2path.url2pathname() failed; %s != %s' %
+                         (expect, result))
+
+class Utility_Tests(unittest.TestCase):
+    """Testcase to test the various utility functions in the urllib."""
+
+    def test_splitpasswd(self):
+        """Some of the password examples are not sensible, but it is added to
+        confirming to RFC2617 and addressing issue4675.
+        """
+        self.assertEqual(('user', 'ab'),urllib.splitpasswd('user:ab'))
+        self.assertEqual(('user', 'a\nb'),urllib.splitpasswd('user:a\nb'))
+        self.assertEqual(('user', 'a\tb'),urllib.splitpasswd('user:a\tb'))
+        self.assertEqual(('user', 'a\rb'),urllib.splitpasswd('user:a\rb'))
+        self.assertEqual(('user', 'a\fb'),urllib.splitpasswd('user:a\fb'))
+        self.assertEqual(('user', 'a\vb'),urllib.splitpasswd('user:a\vb'))
+        self.assertEqual(('user', 'a:b'),urllib.splitpasswd('user:a:b'))
+        self.assertEqual(('user', 'a b'),urllib.splitpasswd('user:a b'))
+        self.assertEqual(('user 2', 'ab'),urllib.splitpasswd('user 2:ab'))
+        self.assertEqual(('user+1', 'a+b'),urllib.splitpasswd('user+1:a+b'))
+
+
 class URLopener_Tests(unittest.TestCase):
     """Testcase to test the open method of URLopener class."""
+
     def test_quoted_open(self):
         class DummyURLopener(urllib.URLopener):
             def open_spam(self, url):
@@ -640,7 +809,7 @@
 
 # Just commented them out.
 # Can't really tell why keep failing in windows and sparc.
-# Everywhere else they work ok, but on those machines, someteimes
+# Everywhere else they work ok, but on those machines, sometimes
 # fail in one of the tests, sometimes in other. I have a linux, and
 # the tests go ok.
 # If anybody has one of the problematic enviroments, please help!
@@ -689,7 +858,7 @@
 #     def testTimeoutNone(self):
 #         # global default timeout is ignored
 #         import socket
-#         self.assert_(socket.getdefaulttimeout() is None)
+#         self.assertTrue(socket.getdefaulttimeout() is None)
 #         socket.setdefaulttimeout(30)
 #         try:
 #             ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, [])
@@ -701,7 +870,7 @@
 #     def testTimeoutDefault(self):
 #         # global default timeout is used
 #         import socket
-#         self.assert_(socket.getdefaulttimeout() is None)
+#         self.assertTrue(socket.getdefaulttimeout() is None)
 #         socket.setdefaulttimeout(30)
 #         try:
 #             ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, [])
@@ -727,11 +896,13 @@
             urlopen_FileTests,
             urlopen_HttpTests,
             urlretrieve_FileTests,
+            urlretrieve_HttpTests,
             ProxyTests,
             QuotingTests,
             UnquotingTests,
             urlencode_Tests,
             Pathname_Tests,
+            Utility_Tests,
             URLopener_Tests,
             #FTPWrapperTests,
         )
diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py
--- a/Lib/test/test_urllib2.py
+++ b/Lib/test/test_urllib2.py
@@ -293,6 +293,7 @@
             self._tunnel_headers = headers
         else:
             self._tunnel_headers.clear()
+
     def request(self, method, url, body=None, headers=None):
         self.method = method
         self.selector = url
@@ -304,10 +305,12 @@
         if self.raise_on_endheaders:
             import socket
             raise socket.error()
+
     def getresponse(self):
         return MockHTTPResponse(MockFile(), {}, 200, "OK")
 
-    def close(self): pass
+    def close(self):
+        pass
 
 class MockHandler:
     # useful for testing handler machinery
@@ -595,21 +598,20 @@
 def sanepathname2url(path):
     import urllib
     urlpath = urllib.pathname2url(path)
-    if ((os._name if test_support.is_jython else os.name) == 'nt'
-        and urlpath.startswith("///")):
+    if os.name == "nt" and urlpath.startswith("///"):
         urlpath = urlpath[2:]
     # XXX don't ask me about the mac...
     return urlpath
 
 class HandlerTests(unittest.TestCase):
 
-    @unittest.skipIf(test_support.is_jython, "Required SSL support not yet available on jython")
     def test_ftp(self):
         class MockFTPWrapper:
             def __init__(self, data): self.data = data
             def retrfile(self, filename, filetype):
                 self.filename, self.filetype = filename, filetype
                 return StringIO.StringIO(self.data), len(self.data)
+            def close(self): pass
 
         class NullFTPHandler(urllib2.FTPHandler):
             def __init__(self, data): self.data = data
@@ -661,7 +663,6 @@
             self.assertEqual(headers.get("Content-type"), mimetype)
             self.assertEqual(int(headers["Content-length"]), len(data))
 
-    @unittest.skip("FIXME: not working")
     def test_file(self):
         import rfc822, socket
         h = urllib2.FileHandler()
@@ -974,6 +975,28 @@
             self.assertEqual(count,
                              urllib2.HTTPRedirectHandler.max_redirections)
 
+    def test_invalid_redirect(self):
+        from_url = "http://example.com/a.html"
+        valid_schemes = ['http', 'https', 'ftp']
+        invalid_schemes = ['file', 'imap', 'ldap']
+        schemeless_url = "example.com/b.html"
+        h = urllib2.HTTPRedirectHandler()
+        o = h.parent = MockOpener()
+        req = Request(from_url)
+        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
+
+        for scheme in invalid_schemes:
+            invalid_url = scheme + '://' + schemeless_url
+            self.assertRaises(urllib2.HTTPError, h.http_error_302,
+                              req, MockFile(), 302, "Security Loophole",
+                              MockHeaders({"location": invalid_url}))
+
+        for scheme in valid_schemes:
+            valid_url = scheme + '://' + schemeless_url
+            h.http_error_302(req, MockFile(), 302, "That's fine",
+                MockHeaders({"location": valid_url}))
+            self.assertEqual(o.req.get_full_url(), valid_url)
+
     def test_cookie_redirect(self):
         # cookies shouldn't leak into redirected requests
         from cookielib import CookieJar
@@ -990,6 +1013,15 @@
         o.open("http://www.example.com/")
         self.assertTrue(not hh.req.has_header("Cookie"))
 
+    def test_redirect_fragment(self):
+        redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n'
+        hh = MockHTTPHandler(302, 'Location: ' + redirected_url)
+        hdeh = urllib2.HTTPDefaultErrorHandler()
+        hrh = urllib2.HTTPRedirectHandler()
+        o = build_test_opener(hh, hdeh, hrh)
+        fp = o.open('http://www.example.com')
+        self.assertEqual(fp.geturl(), redirected_url.strip())
+
     def test_proxy(self):
         o = OpenerDirector()
         ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
@@ -1074,12 +1106,30 @@
         self._test_basic_auth(opener, auth_handler, "Authorization",
                               realm, http_handler, password_manager,
                               "http://acme.example.com/protected",
-                              "http://acme.example.com/protected",
-                              )
+                              "http://acme.example.com/protected"
+                             )
 
     def test_basic_auth_with_single_quoted_realm(self):
         self.test_basic_auth(quote_char="'")
 
+    def test_basic_auth_with_unquoted_realm(self):
+        opener = OpenerDirector()
+        password_manager = MockPasswordManager()
+        auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
+        realm = "ACME Widget Store"
+        http_handler = MockHTTPHandler(
+            401, 'WWW-Authenticate: Basic realm=%s\r\n\r\n' % realm)
+        opener.add_handler(auth_handler)
+        opener.add_handler(http_handler)
+        msg = "Basic Auth Realm was unquoted"
+        with test_support.check_warnings((msg, UserWarning)):
+            self._test_basic_auth(opener, auth_handler, "Authorization",
+                                  realm, http_handler, password_manager,
+                                  "http://acme.example.com/protected",
+                                  "http://acme.example.com/protected"
+                                 )
+
+
     def test_proxy_basic_auth(self):
         opener = OpenerDirector()
         ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
@@ -1098,7 +1148,7 @@
                               )
 
     def test_basic_and_digest_auth_handlers(self):
-        # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40*
+        # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40*
         # response (http://python.org/sf/1479302), where it should instead
         # return None to allow another handler (especially
         # HTTPBasicAuthHandler) to handle the response.
@@ -1275,12 +1325,43 @@
         req = Request("<URL:http://www.python.org>")
         self.assertEqual("www.python.org", req.get_host())
 
-    def test_urlwith_fragment(self):
+    def test_url_fragment(self):
         req = Request("http://www.python.org/?qs=query#fragment=true")
         self.assertEqual("/?qs=query", req.get_selector())
         req = Request("http://www.python.org/#fun=true")
         self.assertEqual("/", req.get_selector())
 
+        # Issue 11703: geturl() omits fragment in the original URL.
+        url = 'http://docs.python.org/library/urllib2.html#OK'
+        req = Request(url)
+        self.assertEqual(req.get_full_url(), url)
+
+    def test_HTTPError_interface(self):
+        """
+        Issue 13211 reveals that HTTPError didn't implement the URLError
+        interface even though HTTPError is a subclass of URLError.
+
+        >>> err = urllib2.HTTPError(msg='something bad happened', url=None, code=None, hdrs=None, fp=None)
+        >>> assert hasattr(err, 'reason')
+        >>> err.reason
+        'something bad happened'
+        """
+
+    def test_HTTPError_interface_call(self):
+        """
+        Issue 15701= - HTTPError interface has info method available from URLError.
+        """
+        err = urllib2.HTTPError(msg='something bad happened', url=None,
+                                code=None, hdrs='Content-Length:42', fp=None)
+        self.assertTrue(hasattr(err, 'reason'))
+        assert hasattr(err, 'reason')
+        assert hasattr(err, 'info')
+        assert callable(err.info)
+        try:
+            err.info()
+        except AttributeError:
+            self.fail("err.info() failed")
+        self.assertEqual(err.info(), "Content-Length:42")
 
 def test_main(verbose=None):
     from test import test_urllib2
diff --git a/Lib/test/test_urllib2_localnet.py b/Lib/test/test_urllib2_localnet.py
--- a/Lib/test/test_urllib2_localnet.py
+++ b/Lib/test/test_urllib2_localnet.py
@@ -1,14 +1,16 @@
 #!/usr/bin/env python
 
-import sys
-import threading
 import urlparse
 import urllib2
 import BaseHTTPServer
 import unittest
 import hashlib
+
 from test import test_support
 
+mimetools = test_support.import_module('mimetools', deprecated=True)
+threading = test_support.import_module('threading')
+
 # Loopback http server infrastructure
 
 class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
@@ -40,13 +42,16 @@
 class LoopbackHttpServerThread(threading.Thread):
     """Stoppable thread that runs a loopback http server."""
 
-    def __init__(self, port, RequestHandlerClass):
+    def __init__(self, request_handler):
         threading.Thread.__init__(self)
-        self._RequestHandlerClass = RequestHandlerClass
         self._stop = False
-        self._port = port
-        self._server_address = ('127.0.0.1', self._port)
         self.ready = threading.Event()
+        request_handler.protocol_version = "HTTP/1.0"
+        self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
+                                        request_handler)
+        #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
+        #                                      self.httpd.server_port)
+        self.port = self.httpd.server_port
 
     def stop(self):
         """Stops the webserver if it's currently running."""
@@ -57,19 +62,9 @@
         self.join()
 
     def run(self):
-        protocol = "HTTP/1.0"
-
-        self._RequestHandlerClass.protocol_version = protocol
-        httpd = LoopbackHttpServer(self._server_address,
-                                   self._RequestHandlerClass)
-
-        sa = httpd.socket.getsockname()
-        #print "Serving HTTP on", sa[0], "port", sa[1], "..."
-
         self.ready.set()
         while not self._stop:
-            httpd.handle_request()
-        httpd.server_close()
+            self.httpd.handle_request()
 
 # Authentication infrastructure
 
@@ -161,13 +156,13 @@
         if len(self._users) == 0:
             return True
 
-        if not request_handler.headers.has_key('Proxy-Authorization'):
+        if 'Proxy-Authorization' not in request_handler.headers:
             return self._return_auth_challenge(request_handler)
         else:
             auth_dict = self._create_auth_dict(
                 request_handler.headers['Proxy-Authorization']
                 )
-            if self._users.has_key(auth_dict["username"]):
+            if auth_dict["username"] in self._users:
                 password = self._users[ auth_dict["username"] ]
             else:
                 return self._return_auth_challenge(request_handler)
@@ -202,7 +197,11 @@
     testing.
     """
 
-    digest_auth_handler = DigestAuthHandler()
+    def __init__(self, digest_auth_handler, *args, **kwargs):
+        # This has to be set before calling our parent's __init__(), which will
+        # try to call do_GET().
+        self.digest_auth_handler = digest_auth_handler
+        BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
 
     def log_message(self, format, *args):
         # Uncomment the next line for debugging.
@@ -223,60 +222,68 @@
 
 # Test cases
 
-class ProxyAuthTests(unittest.TestCase):
-    URL = "http://www.foo.com"
+class BaseTestCase(unittest.TestCase):
+    def setUp(self):
+        self._threads = test_support.threading_setup()
 
-    PORT = 58080
+    def tearDown(self):
+        test_support.threading_cleanup(*self._threads)
+
+
+class ProxyAuthTests(BaseTestCase):
+    URL = "http://localhost"
+
     USER = "tester"
     PASSWD = "test123"
     REALM = "TestRealm"
 
-    PROXY_URL = "http://127.0.0.1:%d" % PORT
+    def setUp(self):
+        super(ProxyAuthTests, self).setUp()
+        self.digest_auth_handler = DigestAuthHandler()
+        self.digest_auth_handler.set_users({self.USER: self.PASSWD})
+        self.digest_auth_handler.set_realm(self.REALM)
+        def create_fake_proxy_handler(*args, **kwargs):
+            return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
 
-    def setUp(self):
-        FakeProxyHandler.digest_auth_handler.set_users({
-            self.USER : self.PASSWD
-            })
-        FakeProxyHandler.digest_auth_handler.set_realm(self.REALM)
-
-        self.server = LoopbackHttpServerThread(self.PORT, FakeProxyHandler)
+        self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
         self.server.start()
         self.server.ready.wait()
-
-        handler = urllib2.ProxyHandler({"http" : self.PROXY_URL})
-        self._digest_auth_handler = urllib2.ProxyDigestAuthHandler()
-        self.opener = urllib2.build_opener(handler, self._digest_auth_handler)
+        proxy_url = "http://127.0.0.1:%d" % self.server.port
+        handler = urllib2.ProxyHandler({"http" : proxy_url})
+        self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
+        self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
 
     def tearDown(self):
         self.server.stop()
+        super(ProxyAuthTests, self).tearDown()
 
     def test_proxy_with_bad_password_raises_httperror(self):
-        self._digest_auth_handler.add_password(self.REALM, self.URL,
+        self.proxy_digest_handler.add_password(self.REALM, self.URL,
                                                self.USER, self.PASSWD+"bad")
-        FakeProxyHandler.digest_auth_handler.set_qop("auth")
+        self.digest_auth_handler.set_qop("auth")
         self.assertRaises(urllib2.HTTPError,
                           self.opener.open,
                           self.URL)
 
     def test_proxy_with_no_password_raises_httperror(self):
-        FakeProxyHandler.digest_auth_handler.set_qop("auth")
+        self.digest_auth_handler.set_qop("auth")
         self.assertRaises(urllib2.HTTPError,
                           self.opener.open,
                           self.URL)
 
     def test_proxy_qop_auth_works(self):
-        self._digest_auth_handler.add_password(self.REALM, self.URL,
+        self.proxy_digest_handler.add_password(self.REALM, self.URL,
                                                self.USER, self.PASSWD)
-        FakeProxyHandler.digest_auth_handler.set_qop("auth")
+        self.digest_auth_handler.set_qop("auth")
         result = self.opener.open(self.URL)
         while result.read():
             pass
         result.close()
 
     def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
-        self._digest_auth_handler.add_password(self.REALM, self.URL,
+        self.proxy_digest_handler.add_password(self.REALM, self.URL,
                                                self.USER, self.PASSWD)
-        FakeProxyHandler.digest_auth_handler.set_qop("auth-int")
+        self.digest_auth_handler.set_qop("auth-int")
         try:
             result = self.opener.open(self.URL)
         except urllib2.URLError:
@@ -289,6 +296,244 @@
                 pass
             result.close()
 
+
+def GetRequestHandler(responses):
+
+    class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+
+        server_version = "TestHTTP/"
+        requests = []
+        headers_received = []
+        port = 80
+
+        def do_GET(self):
+            body = self.send_head()
+            if body:
+                self.wfile.write(body)
+
+        def do_POST(self):
+            content_length = self.headers['Content-Length']
+            post_data = self.rfile.read(int(content_length))
+            self.do_GET()
+            self.requests.append(post_data)
+
+        def send_head(self):
+            FakeHTTPRequestHandler.headers_received = self.headers
+            self.requests.append(self.path)
+            response_code, headers, body = responses.pop(0)
+
+            self.send_response(response_code)
+
+            for (header, value) in headers:
+                self.send_header(header, value % self.port)
+            if body:
+                self.send_header('Content-type', 'text/plain')
+                self.end_headers()
+                return body
+            self.end_headers()
+
+        def log_message(self, *args):
+            pass
+
+
+    return FakeHTTPRequestHandler
+
+
+class TestUrlopen(BaseTestCase):
+    """Tests urllib2.urlopen using the network.
+
+    These tests are not exhaustive.  Assuming that testing using files does a
+    good job overall of some of the basic interface features.  There are no
+    tests exercising the optional 'data' and 'proxies' arguments.  No tests
+    for transparent redirection have been written.
+    """
+
+    def setUp(self):
+        proxy_handler = urllib2.ProxyHandler({})
+        opener = urllib2.build_opener(proxy_handler)
+        urllib2.install_opener(opener)
+        super(TestUrlopen, self).setUp()
+
+    def start_server(self, responses):
+        handler = GetRequestHandler(responses)
+
+        self.server = LoopbackHttpServerThread(handler)
+        self.server.start()
+        self.server.ready.wait()
+        port = self.server.port
+        handler.port = port
+        return handler
+
+
+    def test_redirection(self):
+        expected_response = 'We got here...'
+        responses = [
+            (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
+            (200, [], expected_response)
+        ]
+
+        handler = self.start_server(responses)
+
+        try:
+            f = urllib2.urlopen('http://localhost:%s/' % handler.port)
+            data = f.read()
+            f.close()
+
+            self.assertEqual(data, expected_response)
+            self.assertEqual(handler.requests, ['/', '/somewhere_else'])
+        finally:
+            self.server.stop()
+
+
+    def test_404(self):
+        expected_response = 'Bad bad bad...'
+        handler = self.start_server([(404, [], expected_response)])
+
+        try:
+            try:
+                urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
+            except urllib2.URLError, f:
+                pass
+            else:
+                self.fail('404 should raise URLError')
+
+            data = f.read()
+            f.close()
+
+            self.assertEqual(data, expected_response)
+            self.assertEqual(handler.requests, ['/weeble'])
+        finally:
+            self.server.stop()
+
+
+    def test_200(self):
+        expected_response = 'pycon 2008...'
+        handler = self.start_server([(200, [], expected_response)])
+
+        try:
+            f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
+            data = f.read()
+            f.close()
+
+            self.assertEqual(data, expected_response)
+            self.assertEqual(handler.requests, ['/bizarre'])
+        finally:
+            self.server.stop()
+
+    def test_200_with_parameters(self):
+        expected_response = 'pycon 2008...'
+        handler = self.start_server([(200, [], expected_response)])
+
+        try:
+            f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')
+            data = f.read()
+            f.close()
+
+            self.assertEqual(data, expected_response)
+            self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling'])
+        finally:
+            self.server.stop()
+
+
+    def test_sending_headers(self):
+        handler = self.start_server([(200, [], "we don't care")])
+
+        try:
+            req = urllib2.Request("http://localhost:%s/" % handler.port,
+                                  headers={'Range': 'bytes=20-39'})
+            urllib2.urlopen(req)
+            self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
+        finally:
+            self.server.stop()
+
+    def test_basic(self):
+        handler = self.start_server([(200, [], "we don't care")])
+
+        try:
+            open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
+            for attr in ("read", "close", "info", "geturl"):
+                self.assertTrue(hasattr(open_url, attr), "object returned from "
+                             "urlopen lacks the %s attribute" % attr)
+            try:
+                self.assertTrue(open_url.read(), "calling 'read' failed")
+            finally:
+                open_url.close()
+        finally:
+            self.server.stop()
+
+    def test_info(self):
+        handler = self.start_server([(200, [], "we don't care")])
+
+        try:
+            open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
+            info_obj = open_url.info()
+            self.assertIsInstance(info_obj, mimetools.Message,
+                                  "object returned by 'info' is not an "
+                                  "instance of mimetools.Message")
+            self.assertEqual(info_obj.getsubtype(), "plain")
+        finally:
+            self.server.stop()
+
+    def test_geturl(self):
+        # Make sure same URL as opened is returned by geturl.
+        handler = self.start_server([(200, [], "we don't care")])
+
+        try:
+            open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
+            url = open_url.geturl()
+            self.assertEqual(url, "http://localhost:%s" % handler.port)
+        finally:
+            self.server.stop()
+
+
+    def test_bad_address(self):
+        # Make sure proper exception is raised when connecting to a bogus
+        # address.
+        self.assertRaises(IOError,
+                          # Given that both VeriSign and various ISPs have in
+                          # the past or are presently hijacking various invalid
+                          # domain name requests in an attempt to boost traffic
+                          # to their own sites, finding a domain name to use
+                          # for this test is difficult.  RFC2606 leads one to
+                          # believe that '.invalid' should work, but experience
+                          # seemed to indicate otherwise.  Single character
+                          # TLDs are likely to remain invalid, so this seems to
+                          # be the best choice. The trailing '.' prevents a
+                          # related problem: The normal DNS resolver appends
+                          # the domain names from the search path if there is
+                          # no '.' the end and, and if one of those domains
+                          # implements a '*' rule a result is returned.
+                          # However, none of this will prevent the test from
+                          # failing if the ISP hijacks all invalid domain
+                          # requests.  The real solution would be to be able to
+                          # parameterize the framework with a mock resolver.
+                          urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./")
+
+    def test_iteration(self):
+        expected_response = "pycon 2008..."
+        handler = self.start_server([(200, [], expected_response)])
+        try:
+            data = urllib2.urlopen("http://localhost:%s" % handler.port)
+            for line in data:
+                self.assertEqual(line, expected_response)
+        finally:
+            self.server.stop()
+
+    def ztest_line_iteration(self):
+        lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"]
+        expected_response = "".join(lines)
+        handler = self.start_server([(200, [], expected_response)])
+        try:
+            data = urllib2.urlopen("http://localhost:%s" % handler.port)
+            for index, line in enumerate(data):
+                self.assertEqual(line, lines[index],
+                                 "Fetched line number %s doesn't match expected:\n"
+                                 "    Expected length was %s, got %s" %
+                                 (index, len(lines[index]), len(line)))
+        finally:
+            self.server.stop()
+        self.assertEqual(index + 1, len(lines))
+
 def test_main():
     # We will NOT depend on the network resource flag
     # (Lib/test/regrtest.py -u network) since all tests here are only
@@ -296,7 +541,7 @@
     # the next line.
     #test_support.requires("network")
 
-    test_support.run_unittest(ProxyAuthTests)
+    test_support.run_unittest(ProxyAuthTests, TestUrlopen)
 
 if __name__ == "__main__":
     test_main()

-- 
Repository URL: http://hg.python.org/jython


More information about the Jython-checkins mailing list