[Jython-checkins] jython: Import latest test_urllib, test_urllib2 and test_urllib2_localnet from cpython
alan.kennedy
jython-checkins at python.org
Sat Feb 2 16:20:10 CET 2013
http://hg.python.org/jython/rev/a720ee1162d6
changeset: 6979:a720ee1162d6
user: Alan Kennedy <alan at xhaus.com>
date: Sat Feb 02 14:18:15 2013 +0000
summary:
Import latest test_urllib, test_urllib2 and test_urllib2_localnet from cpython 2.7: http://hg.python.org/cpython/file/b6b707063991/
files:
Lib/test/test_urllib.py | 253 +++++++++++--
Lib/test/test_urllib2.py | 99 ++++-
Lib/test/test_urllib2_localnet.py | 331 +++++++++++++++--
3 files changed, 590 insertions(+), 93 deletions(-)
diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py
--- a/Lib/test/test_urllib.py
+++ b/Lib/test/test_urllib.py
@@ -3,12 +3,16 @@
import urllib
import httplib
import unittest
-from test import test_support
import os
+import sys
import mimetools
import tempfile
import StringIO
+from test import test_support
+from base64 import b64encode
+
+
def hexescape(char):
"""Escape char as RFC 2396 specifies"""
hex_repr = hex(ord(char))[2:].upper()
@@ -16,6 +20,43 @@
hex_repr = "0%s" % hex_repr
return "%" + hex_repr
+
+class FakeHTTPMixin(object):
+ def fakehttp(self, fakedata):
+ class FakeSocket(StringIO.StringIO):
+
+ def sendall(self, data):
+ FakeHTTPConnection.buf = data
+
+ def makefile(self, *args, **kwds):
+ return self
+
+ def read(self, amt=None):
+ if self.closed:
+ return ""
+ return StringIO.StringIO.read(self, amt)
+
+ def readline(self, length=None):
+ if self.closed:
+ return ""
+ return StringIO.StringIO.readline(self, length)
+
+ class FakeHTTPConnection(httplib.HTTPConnection):
+
+ # buffer to store data for verification in urlopen tests.
+ buf = ""
+
+ def connect(self):
+ self.sock = FakeSocket(fakedata)
+
+ assert httplib.HTTP._connection_class == httplib.HTTPConnection
+
+ httplib.HTTP._connection_class = FakeHTTPConnection
+
+ def unfakehttp(self):
+ httplib.HTTP._connection_class = httplib.HTTPConnection
+
+
class urlopen_FileTests(unittest.TestCase):
"""Test urlopen() opening a temporary file.
@@ -44,7 +85,7 @@
# Make sure object returned by urlopen() has the specified methods
for attr in ("read", "readline", "readlines", "fileno",
"close", "info", "geturl", "getcode", "__iter__"):
- self.assert_(hasattr(self.returned_obj, attr),
+ self.assertTrue(hasattr(self.returned_obj, attr),
"object returned by urlopen() lacks %s attribute" %
attr)
@@ -66,9 +107,7 @@
def test_fileno(self):
file_num = self.returned_obj.fileno()
- if not test_support.is_jython:
- self.assert_(isinstance(file_num, int),
- "fileno() did not return an int")
+ self.assertIsInstance(file_num, int, "fileno() did not return an int")
self.assertEqual(os.read(file_num, len(self.text)), self.text,
"Reading on the file descriptor returned by fileno() "
"did not return the expected text")
@@ -79,7 +118,7 @@
self.returned_obj.close()
def test_info(self):
- self.assert_(isinstance(self.returned_obj.info(), mimetools.Message))
+ self.assertIsInstance(self.returned_obj.info(), mimetools.Message)
def test_geturl(self):
self.assertEqual(self.returned_obj.geturl(), self.pathname)
@@ -95,6 +134,9 @@
for line in self.returned_obj.__iter__():
self.assertEqual(line, self.text)
+ def test_relativelocalfile(self):
+ self.assertRaises(ValueError,urllib.urlopen,'./' + self.pathname)
+
class ProxyTests(unittest.TestCase):
def setUp(self):
@@ -114,31 +156,15 @@
self.env.set('NO_PROXY', 'localhost')
proxies = urllib.getproxies_environment()
# getproxies_environment use lowered case truncated (no '_proxy') keys
- self.assertEquals('localhost', proxies['no'])
+ self.assertEqual('localhost', proxies['no'])
+ # List of no_proxies with space.
+ self.env.set('NO_PROXY', 'localhost, anotherdomain.com, newdomain.com')
+ self.assertTrue(urllib.proxy_bypass_environment('anotherdomain.com'))
-class urlopen_HttpTests(unittest.TestCase):
+class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin):
"""Test urlopen() opening a fake http connection."""
- def fakehttp(self, fakedata):
- class FakeSocket(StringIO.StringIO):
- def sendall(self, str): pass
- def makefile(self, mode, name): return self
- def read(self, amt=None):
- if self.closed: return ''
- return StringIO.StringIO.read(self, amt)
- def readline(self, length=None):
- if self.closed: return ''
- return StringIO.StringIO.readline(self, length)
- class FakeHTTPConnection(httplib.HTTPConnection):
- def connect(self):
- self.sock = FakeSocket(fakedata)
- assert httplib.HTTP._connection_class == httplib.HTTPConnection
- httplib.HTTP._connection_class = FakeHTTPConnection
-
- def unfakehttp(self):
- httplib.HTTP._connection_class = httplib.HTTPConnection
-
def test_read(self):
self.fakehttp('Hello!')
try:
@@ -150,6 +176,16 @@
finally:
self.unfakehttp()
+ def test_url_fragment(self):
+ # Issue #11703: geturl() omits fragments in the original URL.
+ url = 'http://docs.python.org/library/urllib.html#OK'
+ self.fakehttp('Hello!')
+ try:
+ fp = urllib.urlopen(url)
+ self.assertEqual(fp.geturl(), url)
+ finally:
+ self.unfakehttp()
+
def test_read_bogus(self):
# urlopen() should raise IOError for many error codes.
self.fakehttp('''HTTP/1.1 401 Authentication Required
@@ -186,6 +222,62 @@
finally:
self.unfakehttp()
+ def test_missing_localfile(self):
+ self.assertRaises(IOError, urllib.urlopen,
+ 'file://localhost/a/missing/file.py')
+ fd, tmp_file = tempfile.mkstemp()
+ tmp_fileurl = 'file://localhost/' + tmp_file.replace(os.path.sep, '/')
+ try:
+ self.assertTrue(os.path.exists(tmp_file))
+ fp = urllib.urlopen(tmp_fileurl)
+ finally:
+ os.close(fd)
+ fp.close()
+ os.unlink(tmp_file)
+
+ self.assertFalse(os.path.exists(tmp_file))
+ self.assertRaises(IOError, urllib.urlopen, tmp_fileurl)
+
+ def test_ftp_nonexisting(self):
+ self.assertRaises(IOError, urllib.urlopen,
+ 'ftp://localhost/not/existing/file.py')
+
+
+ def test_userpass_inurl(self):
+ self.fakehttp('Hello!')
+ try:
+ fakehttp_wrapper = httplib.HTTP._connection_class
+ fp = urllib.urlopen("http://user:pass@python.org/")
+ authorization = ("Authorization: Basic %s\r\n" %
+ b64encode('user:pass'))
+ # The authorization header must be in place
+ self.assertIn(authorization, fakehttp_wrapper.buf)
+ self.assertEqual(fp.readline(), "Hello!")
+ self.assertEqual(fp.readline(), "")
+ self.assertEqual(fp.geturl(), 'http://user:pass@python.org/')
+ self.assertEqual(fp.getcode(), 200)
+ finally:
+ self.unfakehttp()
+
+ def test_userpass_with_spaces_inurl(self):
+ self.fakehttp('Hello!')
+ try:
+ url = "http://a b:c d at python.org/"
+ fakehttp_wrapper = httplib.HTTP._connection_class
+ authorization = ("Authorization: Basic %s\r\n" %
+ b64encode('a b:c d'))
+ fp = urllib.urlopen(url)
+ # The authorization header must be in place
+ self.assertIn(authorization, fakehttp_wrapper.buf)
+ self.assertEqual(fp.readline(), "Hello!")
+ self.assertEqual(fp.readline(), "")
+ # the spaces are quoted in URL so no match
+ self.assertNotEqual(fp.geturl(), url)
+ self.assertEqual(fp.getcode(), 200)
+ finally:
+ self.unfakehttp()
+
+
class urlretrieve_FileTests(unittest.TestCase):
"""Test urllib.urlretrieve() on local files"""
@@ -243,9 +335,9 @@
# a headers value is returned.
result = urllib.urlretrieve("file:%s" % test_support.TESTFN)
self.assertEqual(result[0], test_support.TESTFN)
- self.assert_(isinstance(result[1], mimetools.Message),
- "did not get a mimetools.Message instance as second "
- "returned value")
+ self.assertIsInstance(result[1], mimetools.Message,
+ "did not get a mimetools.Message instance as "
+ "second returned value")
def test_copy(self):
# Test that setting the filename argument works.
@@ -254,7 +346,7 @@
result = urllib.urlretrieve(self.constructLocalFileUrl(
test_support.TESTFN), second_temp)
self.assertEqual(second_temp, result[0])
- self.assert_(os.path.exists(second_temp), "copy of the file was not "
+ self.assertTrue(os.path.exists(second_temp), "copy of the file was not "
"made")
FILE = file(second_temp, 'rb')
try:
@@ -268,9 +360,9 @@
def test_reporthook(self):
# Make sure that the reporthook works.
def hooktester(count, block_size, total_size, count_holder=[0]):
- self.assert_(isinstance(count, int))
- self.assert_(isinstance(block_size, int))
- self.assert_(isinstance(total_size, int))
+ self.assertIsInstance(count, int)
+ self.assertIsInstance(block_size, int)
+ self.assertIsInstance(total_size, int)
self.assertEqual(count, count_holder[0])
count_holder[0] = count_holder[0] + 1
second_temp = "%s.2" % test_support.TESTFN
@@ -318,6 +410,45 @@
self.assertEqual(report[0][1], 8192)
self.assertEqual(report[0][2], 8193)
+
+class urlretrieve_HttpTests(unittest.TestCase, FakeHTTPMixin):
+ """Test urllib.urlretrieve() using fake http connections"""
+
+ def test_short_content_raises_ContentTooShortError(self):
+ self.fakehttp('''HTTP/1.1 200 OK
+Date: Wed, 02 Jan 2008 03:03:54 GMT
+Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
+Connection: close
+Content-Length: 100
+Content-Type: text/html; charset=iso-8859-1
+
+FF
+''')
+
+ def _reporthook(par1, par2, par3):
+ pass
+
+ try:
+ self.assertRaises(urllib.ContentTooShortError, urllib.urlretrieve,
+ 'http://example.com', reporthook=_reporthook)
+ finally:
+ self.unfakehttp()
+
+ def test_short_content_raises_ContentTooShortError_without_reporthook(self):
+ self.fakehttp('''HTTP/1.1 200 OK
+Date: Wed, 02 Jan 2008 03:03:54 GMT
+Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e
+Connection: close
+Content-Length: 100
+Content-Type: text/html; charset=iso-8859-1
+
+FF
+''')
+ try:
+ self.assertRaises(urllib.ContentTooShortError, urllib.urlretrieve, 'http://example.com/')
+ finally:
+ self.unfakehttp()
+
class QuotingTests(unittest.TestCase):
"""Tests for urllib.quote() and urllib.quote_plus()
@@ -395,8 +526,10 @@
result = urllib.quote(partial_quote)
self.assertEqual(expected, result,
"using quote(): %s != %s" % (expected, result))
+ result = urllib.quote_plus(partial_quote)
self.assertEqual(expected, result,
"using quote_plus(): %s != %s" % (expected, result))
+ self.assertRaises(TypeError, urllib.quote, None)
def test_quoting_space(self):
# Make sure quote() and quote_plus() handle spaces as specified in
@@ -527,7 +660,7 @@
expect_somewhere = ["1st=1", "2nd=2", "3rd=3"]
result = urllib.urlencode(given)
for expected in expect_somewhere:
- self.assert_(expected in result,
+ self.assertIn(expected, result,
"testing %s: %s not found in %s" %
(test_type, expected, result))
self.assertEqual(result.count('&'), 2,
@@ -536,7 +669,7 @@
amp_location = result.index('&')
on_amp_left = result[amp_location - 1]
on_amp_right = result[amp_location + 1]
- self.assert_(on_amp_left.isdigit() and on_amp_right.isdigit(),
+ self.assertTrue(on_amp_left.isdigit() and on_amp_right.isdigit(),
"testing %s: '&' not located in proper place in %s" %
(test_type, result))
self.assertEqual(len(result), (5 * 3) + 2, #5 chars per thing and amps
@@ -574,8 +707,7 @@
result = urllib.urlencode(given, True)
for value in given["sequence"]:
expect = "sequence=%s" % value
- self.assert_(expect in result,
- "%s not found in %s" % (expect, result))
+ self.assertIn(expect, result)
self.assertEqual(result.count('&'), 2,
"Expected 2 '&'s, got %s" % result.count('&'))
@@ -622,8 +754,45 @@
"url2pathname() failed; %s != %s" %
(expect, result))
+ @unittest.skipUnless(sys.platform == 'win32',
+ 'test specific to the nturl2path library')
+ def test_ntpath(self):
+ given = ('/C:/', '///C:/', '/C|//')
+ expect = 'C:\\'
+ for url in given:
+ result = urllib.url2pathname(url)
+ self.assertEqual(expect, result,
+ 'nturl2path.url2pathname() failed; %s != %s' %
+ (expect, result))
+ given = '///C|/path'
+ expect = 'C:\\path'
+ result = urllib.url2pathname(given)
+ self.assertEqual(expect, result,
+ 'nturl2path.url2pathname() failed; %s != %s' %
+ (expect, result))
+
+class Utility_Tests(unittest.TestCase):
+ """Testcase to test the various utility functions in the urllib."""
+
+ def test_splitpasswd(self):
+ """Some of the password examples are not sensible, but it is added to
+ confirming to RFC2617 and addressing issue4675.
+ """
+ self.assertEqual(('user', 'ab'),urllib.splitpasswd('user:ab'))
+ self.assertEqual(('user', 'a\nb'),urllib.splitpasswd('user:a\nb'))
+ self.assertEqual(('user', 'a\tb'),urllib.splitpasswd('user:a\tb'))
+ self.assertEqual(('user', 'a\rb'),urllib.splitpasswd('user:a\rb'))
+ self.assertEqual(('user', 'a\fb'),urllib.splitpasswd('user:a\fb'))
+ self.assertEqual(('user', 'a\vb'),urllib.splitpasswd('user:a\vb'))
+ self.assertEqual(('user', 'a:b'),urllib.splitpasswd('user:a:b'))
+ self.assertEqual(('user', 'a b'),urllib.splitpasswd('user:a b'))
+ self.assertEqual(('user 2', 'ab'),urllib.splitpasswd('user 2:ab'))
+ self.assertEqual(('user+1', 'a+b'),urllib.splitpasswd('user+1:a+b'))
+
+
class URLopener_Tests(unittest.TestCase):
"""Testcase to test the open method of URLopener class."""
+
def test_quoted_open(self):
class DummyURLopener(urllib.URLopener):
def open_spam(self, url):
@@ -640,7 +809,7 @@
# Just commented them out.
# Can't really tell why keep failing in windows and sparc.
-# Everywhere else they work ok, but on those machines, someteimes
+# Everywhere else they work ok, but on those machines, sometimes
# fail in one of the tests, sometimes in other. I have a linux, and
# the tests go ok.
# If anybody has one of the problematic enviroments, please help!
@@ -689,7 +858,7 @@
# def testTimeoutNone(self):
# # global default timeout is ignored
# import socket
-# self.assert_(socket.getdefaulttimeout() is None)
+# self.assertTrue(socket.getdefaulttimeout() is None)
# socket.setdefaulttimeout(30)
# try:
# ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, [])
@@ -701,7 +870,7 @@
# def testTimeoutDefault(self):
# # global default timeout is used
# import socket
-# self.assert_(socket.getdefaulttimeout() is None)
+# self.assertTrue(socket.getdefaulttimeout() is None)
# socket.setdefaulttimeout(30)
# try:
# ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, [])
@@ -727,11 +896,13 @@
urlopen_FileTests,
urlopen_HttpTests,
urlretrieve_FileTests,
+ urlretrieve_HttpTests,
ProxyTests,
QuotingTests,
UnquotingTests,
urlencode_Tests,
Pathname_Tests,
+ Utility_Tests,
URLopener_Tests,
#FTPWrapperTests,
)
diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py
--- a/Lib/test/test_urllib2.py
+++ b/Lib/test/test_urllib2.py
@@ -293,6 +293,7 @@
self._tunnel_headers = headers
else:
self._tunnel_headers.clear()
+
def request(self, method, url, body=None, headers=None):
self.method = method
self.selector = url
@@ -304,10 +305,12 @@
if self.raise_on_endheaders:
import socket
raise socket.error()
+
def getresponse(self):
return MockHTTPResponse(MockFile(), {}, 200, "OK")
- def close(self): pass
+ def close(self):
+ pass
class MockHandler:
# useful for testing handler machinery
@@ -595,21 +598,20 @@
def sanepathname2url(path):
import urllib
urlpath = urllib.pathname2url(path)
- if ((os._name if test_support.is_jython else os.name) == 'nt'
- and urlpath.startswith("///")):
+ if os.name == "nt" and urlpath.startswith("///"):
urlpath = urlpath[2:]
# XXX don't ask me about the mac...
return urlpath
class HandlerTests(unittest.TestCase):
- @unittest.skipIf(test_support.is_jython, "Required SSL support not yet available on jython")
def test_ftp(self):
class MockFTPWrapper:
def __init__(self, data): self.data = data
def retrfile(self, filename, filetype):
self.filename, self.filetype = filename, filetype
return StringIO.StringIO(self.data), len(self.data)
+ def close(self): pass
class NullFTPHandler(urllib2.FTPHandler):
def __init__(self, data): self.data = data
@@ -661,7 +663,6 @@
self.assertEqual(headers.get("Content-type"), mimetype)
self.assertEqual(int(headers["Content-length"]), len(data))
- @unittest.skip("FIXME: not working")
def test_file(self):
import rfc822, socket
h = urllib2.FileHandler()
@@ -974,6 +975,28 @@
self.assertEqual(count,
urllib2.HTTPRedirectHandler.max_redirections)
+ def test_invalid_redirect(self):
+ from_url = "http://example.com/a.html"
+ valid_schemes = ['http', 'https', 'ftp']
+ invalid_schemes = ['file', 'imap', 'ldap']
+ schemeless_url = "example.com/b.html"
+ h = urllib2.HTTPRedirectHandler()
+ o = h.parent = MockOpener()
+ req = Request(from_url)
+ req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
+
+ for scheme in invalid_schemes:
+ invalid_url = scheme + '://' + schemeless_url
+ self.assertRaises(urllib2.HTTPError, h.http_error_302,
+ req, MockFile(), 302, "Security Loophole",
+ MockHeaders({"location": invalid_url}))
+
+ for scheme in valid_schemes:
+ valid_url = scheme + '://' + schemeless_url
+ h.http_error_302(req, MockFile(), 302, "That's fine",
+ MockHeaders({"location": valid_url}))
+ self.assertEqual(o.req.get_full_url(), valid_url)
+
def test_cookie_redirect(self):
# cookies shouldn't leak into redirected requests
from cookielib import CookieJar
@@ -990,6 +1013,15 @@
o.open("http://www.example.com/")
self.assertTrue(not hh.req.has_header("Cookie"))
+ def test_redirect_fragment(self):
+ redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n'
+ hh = MockHTTPHandler(302, 'Location: ' + redirected_url)
+ hdeh = urllib2.HTTPDefaultErrorHandler()
+ hrh = urllib2.HTTPRedirectHandler()
+ o = build_test_opener(hh, hdeh, hrh)
+ fp = o.open('http://www.example.com')
+ self.assertEqual(fp.geturl(), redirected_url.strip())
+
def test_proxy(self):
o = OpenerDirector()
ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
@@ -1074,12 +1106,30 @@
self._test_basic_auth(opener, auth_handler, "Authorization",
realm, http_handler, password_manager,
"http://acme.example.com/protected",
- "http://acme.example.com/protected",
- )
+ "http://acme.example.com/protected"
+ )
def test_basic_auth_with_single_quoted_realm(self):
self.test_basic_auth(quote_char="'")
+ def test_basic_auth_with_unquoted_realm(self):
+ opener = OpenerDirector()
+ password_manager = MockPasswordManager()
+ auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
+ realm = "ACME Widget Store"
+ http_handler = MockHTTPHandler(
+ 401, 'WWW-Authenticate: Basic realm=%s\r\n\r\n' % realm)
+ opener.add_handler(auth_handler)
+ opener.add_handler(http_handler)
+ msg = "Basic Auth Realm was unquoted"
+ with test_support.check_warnings((msg, UserWarning)):
+ self._test_basic_auth(opener, auth_handler, "Authorization",
+ realm, http_handler, password_manager,
+ "http://acme.example.com/protected",
+ "http://acme.example.com/protected"
+ )
+
+
def test_proxy_basic_auth(self):
opener = OpenerDirector()
ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
@@ -1098,7 +1148,7 @@
)
def test_basic_and_digest_auth_handlers(self):
- # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40*
+ # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40*
# response (http://python.org/sf/1479302), where it should instead
# return None to allow another handler (especially
# HTTPBasicAuthHandler) to handle the response.
@@ -1275,12 +1325,43 @@
req = Request("<URL:http://www.python.org>")
self.assertEqual("www.python.org", req.get_host())
- def test_urlwith_fragment(self):
+ def test_url_fragment(self):
req = Request("http://www.python.org/?qs=query#fragment=true")
self.assertEqual("/?qs=query", req.get_selector())
req = Request("http://www.python.org/#fun=true")
self.assertEqual("/", req.get_selector())
+ # Issue 11703: geturl() omits fragment in the original URL.
+ url = 'http://docs.python.org/library/urllib2.html#OK'
+ req = Request(url)
+ self.assertEqual(req.get_full_url(), url)
+
+ def test_HTTPError_interface(self):
+ """
+ Issue 13211 reveals that HTTPError didn't implement the URLError
+ interface even though HTTPError is a subclass of URLError.
+
+ >>> err = urllib2.HTTPError(msg='something bad happened', url=None, code=None, hdrs=None, fp=None)
+ >>> assert hasattr(err, 'reason')
+ >>> err.reason
+ 'something bad happened'
+ """
+
+ def test_HTTPError_interface_call(self):
+ """
+ Issue 15701= - HTTPError interface has info method available from URLError.
+ """
+ err = urllib2.HTTPError(msg='something bad happened', url=None,
+ code=None, hdrs='Content-Length:42', fp=None)
+ self.assertTrue(hasattr(err, 'reason'))
+ assert hasattr(err, 'reason')
+ assert hasattr(err, 'info')
+ assert callable(err.info)
+ try:
+ err.info()
+ except AttributeError:
+ self.fail("err.info() failed")
+ self.assertEqual(err.info(), "Content-Length:42")
def test_main(verbose=None):
from test import test_urllib2
diff --git a/Lib/test/test_urllib2_localnet.py b/Lib/test/test_urllib2_localnet.py
--- a/Lib/test/test_urllib2_localnet.py
+++ b/Lib/test/test_urllib2_localnet.py
@@ -1,14 +1,16 @@
#!/usr/bin/env python
-import sys
-import threading
import urlparse
import urllib2
import BaseHTTPServer
import unittest
import hashlib
+
from test import test_support
+mimetools = test_support.import_module('mimetools', deprecated=True)
+threading = test_support.import_module('threading')
+
# Loopback http server infrastructure
class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
@@ -40,13 +42,16 @@
class LoopbackHttpServerThread(threading.Thread):
"""Stoppable thread that runs a loopback http server."""
- def __init__(self, port, RequestHandlerClass):
+ def __init__(self, request_handler):
threading.Thread.__init__(self)
- self._RequestHandlerClass = RequestHandlerClass
self._stop = False
- self._port = port
- self._server_address = ('127.0.0.1', self._port)
self.ready = threading.Event()
+ request_handler.protocol_version = "HTTP/1.0"
+ self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
+ request_handler)
+ #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
+ # self.httpd.server_port)
+ self.port = self.httpd.server_port
def stop(self):
"""Stops the webserver if it's currently running."""
@@ -57,19 +62,9 @@
self.join()
def run(self):
- protocol = "HTTP/1.0"
-
- self._RequestHandlerClass.protocol_version = protocol
- httpd = LoopbackHttpServer(self._server_address,
- self._RequestHandlerClass)
-
- sa = httpd.socket.getsockname()
- #print "Serving HTTP on", sa[0], "port", sa[1], "..."
-
self.ready.set()
while not self._stop:
- httpd.handle_request()
- httpd.server_close()
+ self.httpd.handle_request()
# Authentication infrastructure
@@ -161,13 +156,13 @@
if len(self._users) == 0:
return True
- if not request_handler.headers.has_key('Proxy-Authorization'):
+ if 'Proxy-Authorization' not in request_handler.headers:
return self._return_auth_challenge(request_handler)
else:
auth_dict = self._create_auth_dict(
request_handler.headers['Proxy-Authorization']
)
- if self._users.has_key(auth_dict["username"]):
+ if auth_dict["username"] in self._users:
password = self._users[ auth_dict["username"] ]
else:
return self._return_auth_challenge(request_handler)
@@ -202,7 +197,11 @@
testing.
"""
- digest_auth_handler = DigestAuthHandler()
+ def __init__(self, digest_auth_handler, *args, **kwargs):
+ # This has to be set before calling our parent's __init__(), which will
+ # try to call do_GET().
+ self.digest_auth_handler = digest_auth_handler
+ BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
def log_message(self, format, *args):
# Uncomment the next line for debugging.
@@ -223,60 +222,68 @@
# Test cases
-class ProxyAuthTests(unittest.TestCase):
- URL = "http://www.foo.com"
+class BaseTestCase(unittest.TestCase):
+ def setUp(self):
+ self._threads = test_support.threading_setup()
- PORT = 58080
+ def tearDown(self):
+ test_support.threading_cleanup(*self._threads)
+
+
+class ProxyAuthTests(BaseTestCase):
+ URL = "http://localhost"
+
USER = "tester"
PASSWD = "test123"
REALM = "TestRealm"
- PROXY_URL = "http://127.0.0.1:%d" % PORT
+ def setUp(self):
+ super(ProxyAuthTests, self).setUp()
+ self.digest_auth_handler = DigestAuthHandler()
+ self.digest_auth_handler.set_users({self.USER: self.PASSWD})
+ self.digest_auth_handler.set_realm(self.REALM)
+ def create_fake_proxy_handler(*args, **kwargs):
+ return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
- def setUp(self):
- FakeProxyHandler.digest_auth_handler.set_users({
- self.USER : self.PASSWD
- })
- FakeProxyHandler.digest_auth_handler.set_realm(self.REALM)
-
- self.server = LoopbackHttpServerThread(self.PORT, FakeProxyHandler)
+ self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
self.server.start()
self.server.ready.wait()
-
- handler = urllib2.ProxyHandler({"http" : self.PROXY_URL})
- self._digest_auth_handler = urllib2.ProxyDigestAuthHandler()
- self.opener = urllib2.build_opener(handler, self._digest_auth_handler)
+ proxy_url = "http://127.0.0.1:%d" % self.server.port
+ handler = urllib2.ProxyHandler({"http" : proxy_url})
+ self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
+ self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
def tearDown(self):
self.server.stop()
+ super(ProxyAuthTests, self).tearDown()
def test_proxy_with_bad_password_raises_httperror(self):
- self._digest_auth_handler.add_password(self.REALM, self.URL,
+ self.proxy_digest_handler.add_password(self.REALM, self.URL,
self.USER, self.PASSWD+"bad")
- FakeProxyHandler.digest_auth_handler.set_qop("auth")
+ self.digest_auth_handler.set_qop("auth")
self.assertRaises(urllib2.HTTPError,
self.opener.open,
self.URL)
def test_proxy_with_no_password_raises_httperror(self):
- FakeProxyHandler.digest_auth_handler.set_qop("auth")
+ self.digest_auth_handler.set_qop("auth")
self.assertRaises(urllib2.HTTPError,
self.opener.open,
self.URL)
def test_proxy_qop_auth_works(self):
- self._digest_auth_handler.add_password(self.REALM, self.URL,
+ self.proxy_digest_handler.add_password(self.REALM, self.URL,
self.USER, self.PASSWD)
- FakeProxyHandler.digest_auth_handler.set_qop("auth")
+ self.digest_auth_handler.set_qop("auth")
result = self.opener.open(self.URL)
while result.read():
pass
result.close()
def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
- self._digest_auth_handler.add_password(self.REALM, self.URL,
+ self.proxy_digest_handler.add_password(self.REALM, self.URL,
self.USER, self.PASSWD)
- FakeProxyHandler.digest_auth_handler.set_qop("auth-int")
+ self.digest_auth_handler.set_qop("auth-int")
try:
result = self.opener.open(self.URL)
except urllib2.URLError:
@@ -289,6 +296,244 @@
pass
result.close()
+
+def GetRequestHandler(responses):
+
+ class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+
+ server_version = "TestHTTP/"
+ requests = []
+ headers_received = []
+ port = 80
+
+ def do_GET(self):
+ body = self.send_head()
+ if body:
+ self.wfile.write(body)
+
+ def do_POST(self):
+ content_length = self.headers['Content-Length']
+ post_data = self.rfile.read(int(content_length))
+ self.do_GET()
+ self.requests.append(post_data)
+
+ def send_head(self):
+ FakeHTTPRequestHandler.headers_received = self.headers
+ self.requests.append(self.path)
+ response_code, headers, body = responses.pop(0)
+
+ self.send_response(response_code)
+
+ for (header, value) in headers:
+ self.send_header(header, value % self.port)
+ if body:
+ self.send_header('Content-type', 'text/plain')
+ self.end_headers()
+ return body
+ self.end_headers()
+
+ def log_message(self, *args):
+ pass
+
+
+ return FakeHTTPRequestHandler
+
+
+class TestUrlopen(BaseTestCase):
+ """Tests urllib2.urlopen using the network.
+
+ These tests are not exhaustive. Assuming that testing using files does a
+ good job overall of some of the basic interface features. There are no
+ tests exercising the optional 'data' and 'proxies' arguments. No tests
+ for transparent redirection have been written.
+ """
+
+ def setUp(self):
+ proxy_handler = urllib2.ProxyHandler({})
+ opener = urllib2.build_opener(proxy_handler)
+ urllib2.install_opener(opener)
+ super(TestUrlopen, self).setUp()
+
+ def start_server(self, responses):
+ handler = GetRequestHandler(responses)
+
+ self.server = LoopbackHttpServerThread(handler)
+ self.server.start()
+ self.server.ready.wait()
+ port = self.server.port
+ handler.port = port
+ return handler
+
+
+ def test_redirection(self):
+ expected_response = 'We got here...'
+ responses = [
+ (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
+ (200, [], expected_response)
+ ]
+
+ handler = self.start_server(responses)
+
+ try:
+ f = urllib2.urlopen('http://localhost:%s/' % handler.port)
+ data = f.read()
+ f.close()
+
+ self.assertEqual(data, expected_response)
+ self.assertEqual(handler.requests, ['/', '/somewhere_else'])
+ finally:
+ self.server.stop()
+
+
+ def test_404(self):
+ expected_response = 'Bad bad bad...'
+ handler = self.start_server([(404, [], expected_response)])
+
+ try:
+ try:
+ urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
+ except urllib2.URLError, f:
+ pass
+ else:
+ self.fail('404 should raise URLError')
+
+ data = f.read()
+ f.close()
+
+ self.assertEqual(data, expected_response)
+ self.assertEqual(handler.requests, ['/weeble'])
+ finally:
+ self.server.stop()
+
+
+ def test_200(self):
+ expected_response = 'pycon 2008...'
+ handler = self.start_server([(200, [], expected_response)])
+
+ try:
+ f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
+ data = f.read()
+ f.close()
+
+ self.assertEqual(data, expected_response)
+ self.assertEqual(handler.requests, ['/bizarre'])
+ finally:
+ self.server.stop()
+
+ def test_200_with_parameters(self):
+ expected_response = 'pycon 2008...'
+ handler = self.start_server([(200, [], expected_response)])
+
+ try:
+ f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')
+ data = f.read()
+ f.close()
+
+ self.assertEqual(data, expected_response)
+ self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling'])
+ finally:
+ self.server.stop()
+
+
+ def test_sending_headers(self):
+ handler = self.start_server([(200, [], "we don't care")])
+
+ try:
+ req = urllib2.Request("http://localhost:%s/" % handler.port,
+ headers={'Range': 'bytes=20-39'})
+ urllib2.urlopen(req)
+ self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
+ finally:
+ self.server.stop()
+
+ def test_basic(self):
+ handler = self.start_server([(200, [], "we don't care")])
+
+ try:
+ open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
+ for attr in ("read", "close", "info", "geturl"):
+ self.assertTrue(hasattr(open_url, attr), "object returned from "
+ "urlopen lacks the %s attribute" % attr)
+ try:
+ self.assertTrue(open_url.read(), "calling 'read' failed")
+ finally:
+ open_url.close()
+ finally:
+ self.server.stop()
+
+ def test_info(self):
+ handler = self.start_server([(200, [], "we don't care")])
+
+ try:
+ open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
+ info_obj = open_url.info()
+ self.assertIsInstance(info_obj, mimetools.Message,
+ "object returned by 'info' is not an "
+ "instance of mimetools.Message")
+ self.assertEqual(info_obj.getsubtype(), "plain")
+ finally:
+ self.server.stop()
+
+ def test_geturl(self):
+ # Make sure same URL as opened is returned by geturl.
+ handler = self.start_server([(200, [], "we don't care")])
+
+ try:
+ open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
+ url = open_url.geturl()
+ self.assertEqual(url, "http://localhost:%s" % handler.port)
+ finally:
+ self.server.stop()
+
+
+ def test_bad_address(self):
+ # Make sure proper exception is raised when connecting to a bogus
+ # address.
+ self.assertRaises(IOError,
+ # Given that both VeriSign and various ISPs have in
+ # the past or are presently hijacking various invalid
+ # domain name requests in an attempt to boost traffic
+ # to their own sites, finding a domain name to use
+ # for this test is difficult. RFC2606 leads one to
+ # believe that '.invalid' should work, but experience
+ # seemed to indicate otherwise. Single character
+ # TLDs are likely to remain invalid, so this seems to
+ # be the best choice. The trailing '.' prevents a
+ # related problem: The normal DNS resolver appends
+ # the domain names from the search path if there is
+ # no '.' the end and, and if one of those domains
+ # implements a '*' rule a result is returned.
+ # However, none of this will prevent the test from
+ # failing if the ISP hijacks all invalid domain
+ # requests. The real solution would be to be able to
+ # parameterize the framework with a mock resolver.
+ urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./")
+
+ def test_iteration(self):
+ expected_response = "pycon 2008..."
+ handler = self.start_server([(200, [], expected_response)])
+ try:
+ data = urllib2.urlopen("http://localhost:%s" % handler.port)
+ for line in data:
+ self.assertEqual(line, expected_response)
+ finally:
+ self.server.stop()
+
+ def ztest_line_iteration(self):
+ lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"]
+ expected_response = "".join(lines)
+ handler = self.start_server([(200, [], expected_response)])
+ try:
+ data = urllib2.urlopen("http://localhost:%s" % handler.port)
+ for index, line in enumerate(data):
+ self.assertEqual(line, lines[index],
+ "Fetched line number %s doesn't match expected:\n"
+ " Expected length was %s, got %s" %
+ (index, len(lines[index]), len(line)))
+ finally:
+ self.server.stop()
+ self.assertEqual(index + 1, len(lines))
+
def test_main():
# We will NOT depend on the network resource flag
# (Lib/test/regrtest.py -u network) since all tests here are only
@@ -296,7 +541,7 @@
# the next line.
#test_support.requires("network")
- test_support.run_unittest(ProxyAuthTests)
+ test_support.run_unittest(ProxyAuthTests, TestUrlopen)
if __name__ == "__main__":
test_main()
--
Repository URL: http://hg.python.org/jython
More information about the Jython-checkins
mailing list