[Python-checkins] r86128 - in python/branches/py3k/Lib: test/test_wsgiref.py wsgiref/handlers.py wsgiref/headers.py wsgiref/validate.py

phillip.eby python-checkins at python.org
Tue Nov 2 23:28:59 CET 2010


Author: phillip.eby
Date: Tue Nov  2 23:28:59 2010
New Revision: 86128

Log:
Update wsgiref for PEP 3333, and fix errors introduced into the test suite by converting type() checks to isinstance().
(When WSGI specifies a built-in type, it does NOT mean "this type or a subclass" -- it means 'type(x) is SpecifiedType'.)


Modified:
   python/branches/py3k/Lib/test/test_wsgiref.py
   python/branches/py3k/Lib/wsgiref/handlers.py
   python/branches/py3k/Lib/wsgiref/headers.py
   python/branches/py3k/Lib/wsgiref/validate.py

Modified: python/branches/py3k/Lib/test/test_wsgiref.py
==============================================================================
--- python/branches/py3k/Lib/test/test_wsgiref.py	(original)
+++ python/branches/py3k/Lib/test/test_wsgiref.py	Tue Nov  2 23:28:59 2010
@@ -47,7 +47,7 @@
         ('Content-Type','text/plain'),
         ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
     ])
-    return ["Hello, world!"]
+    return [b"Hello, world!"]
 
 def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"):
     server = make_server("", 80, app, MockServer, MockHandler)
@@ -165,7 +165,7 @@
     def test_wsgi_input(self):
         def bad_app(e,s):
             e["wsgi.input"].read()
-            s(b"200 OK", [(b"Content-Type", b"text/plain; charset=utf-8")])
+            s("200 OK", [("Content-Type", "text/plain; charset=utf-8")])
             return [b"data"]
         out, err = run_amock(validator(bad_app))
         self.assertTrue(out.endswith(
@@ -177,8 +177,8 @@
 
     def test_bytes_validation(self):
         def app(e, s):
-            s(b"200 OK", [
-                (b"Content-Type", b"text/plain; charset=utf-8"),
+            s("200 OK", [
+                ("Content-Type", "text/plain; charset=utf-8"),
                 ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"),
                 ])
             return [b"data"]
@@ -420,29 +420,6 @@
             '\r\n'
         )
 
-    def testBytes(self):
-        h = Headers([
-            (b"Content-Type", b"text/plain; charset=utf-8"),
-            ])
-        self.assertEqual("text/plain; charset=utf-8", h.get("Content-Type"))
-
-        h[b"Foo"] = bytes(b"bar")
-        self.assertEqual("bar", h.get("Foo"))
-        self.assertEqual("bar", h.get(b"Foo"))
-
-        h.setdefault(b"Bar", b"foo")
-        self.assertEqual("foo", h.get("Bar"))
-        self.assertEqual("foo", h.get(b"Bar"))
-
-        h.add_header(b'content-disposition', b'attachment',
-            filename=b'bud.gif')
-        self.assertEqual('attachment; filename="bud.gif"',
-            h.get("content-disposition"))
-
-        del h['content-disposition']
-        self.assertNotIn(b'content-disposition', h)
-
-
 class ErrorHandler(BaseCGIHandler):
     """Simple handler subclass for testing BaseHandler"""
 
@@ -529,10 +506,10 @@
 
         def trivial_app1(e,s):
             s('200 OK',[])
-            return [e['wsgi.url_scheme']]
+            return [e['wsgi.url_scheme'].encode('iso-8859-1')]
 
         def trivial_app2(e,s):
-            s('200 OK',[])(e['wsgi.url_scheme'])
+            s('200 OK',[])(e['wsgi.url_scheme'].encode('iso-8859-1'))
             return []
 
         def trivial_app3(e,s):
@@ -590,13 +567,13 @@
             ("Status: %s\r\n"
             "Content-Type: text/plain\r\n"
             "Content-Length: %d\r\n"
-            "\r\n%s" % (h.error_status,len(h.error_body),h.error_body)
-            ).encode("iso-8859-1"))
+            "\r\n" % (h.error_status,len(h.error_body))).encode('iso-8859-1')
+            + h.error_body)
 
         self.assertIn("AssertionError", h.stderr.getvalue())
 
     def testErrorAfterOutput(self):
-        MSG = "Some output has been sent"
+        MSG = b"Some output has been sent"
         def error_app(e,s):
             s("200 OK",[])(MSG)
             raise AssertionError("This should be caught by handler")
@@ -605,7 +582,7 @@
         h.run(error_app)
         self.assertEqual(h.stdout.getvalue(),
             ("Status: 200 OK\r\n"
-            "\r\n"+MSG).encode("iso-8859-1"))
+            "\r\n".encode("iso-8859-1")+MSG))
         self.assertIn("AssertionError", h.stderr.getvalue())
 
 
@@ -654,8 +631,8 @@
 
     def testBytesData(self):
         def app(e, s):
-            s(b"200 OK", [
-                (b"Content-Type", b"text/plain; charset=utf-8"),
+            s("200 OK", [
+                ("Content-Type", "text/plain; charset=utf-8"),
                 ])
             return [b"data"]
 

Modified: python/branches/py3k/Lib/wsgiref/handlers.py
==============================================================================
--- python/branches/py3k/Lib/wsgiref/handlers.py	(original)
+++ python/branches/py3k/Lib/wsgiref/handlers.py	Tue Nov  2 23:28:59 2010
@@ -46,7 +46,7 @@
     traceback_limit = None  # Print entire traceback to self.get_stderr()
     error_status = "500 Internal Server Error"
     error_headers = [('Content-Type','text/plain')]
-    error_body = "A server error occurred.  Please contact the administrator."
+    error_body = b"A server error occurred.  Please contact the administrator."
 
     # State variables (don't mess with these)
     status = result = None
@@ -137,7 +137,7 @@
             self.set_content_length()
 
     def start_response(self, status, headers,exc_info=None):
-        """'start_response()' callable as specified by PEP 333"""
+        """'start_response()' callable as specified by PEP 3333"""
 
         if exc_info:
             try:
@@ -149,49 +149,48 @@
         elif self.headers is not None:
             raise AssertionError("Headers already set!")
 
+        self.status = status
+        self.headers = self.headers_class(headers)
         status = self._convert_string_type(status, "Status")
         assert len(status)>=4,"Status must be at least 4 characters"
         assert int(status[:3]),"Status message must begin w/3-digit code"
         assert status[3]==" ", "Status message must have a space after code"
 
-        str_headers = []
-        for name,val in headers:
-            name = self._convert_string_type(name, "Header name")
-            val = self._convert_string_type(val, "Header value")
-            str_headers.append((name, val))
-            assert not is_hop_by_hop(name),"Hop-by-hop headers not allowed"
+        if __debug__:
+            for name, val in headers:
+                name = self._convert_string_type(name, "Header name")
+                val = self._convert_string_type(val, "Header value")
+                assert not is_hop_by_hop(name),"Hop-by-hop headers not allowed"
 
-        self.status = status
-        self.headers = self.headers_class(str_headers)
         return self.write
 
     def _convert_string_type(self, value, title):
         """Convert/check value type."""
-        if isinstance(value, str):
+        if type(value) is str:
             return value
-        assert isinstance(value, bytes), \
-            "{0} must be a string or bytes object (not {1})".format(title, value)
-        return str(value, "iso-8859-1")
+        raise AssertionError(
+            "{0} must be of type str (got {1})".format(title, repr(value))
+        )
 
     def send_preamble(self):
         """Transmit version/status/date/server, via self._write()"""
         if self.origin_server:
             if self.client_is_modern():
-                self._write('HTTP/%s %s\r\n' % (self.http_version,self.status))
+                self._write(('HTTP/%s %s\r\n' % (self.http_version,self.status)).encode('iso-8859-1'))
                 if 'Date' not in self.headers:
                     self._write(
-                        'Date: %s\r\n' % format_date_time(time.time())
+                        ('Date: %s\r\n' % format_date_time(time.time())).encode('iso-8859-1')
                     )
                 if self.server_software and 'Server' not in self.headers:
-                    self._write('Server: %s\r\n' % self.server_software)
+                    self._write(('Server: %s\r\n' % self.server_software).encode('iso-8859-1'))
         else:
-            self._write('Status: %s\r\n' % self.status)
+            self._write(('Status: %s\r\n' % self.status).encode('iso-8859-1'))
 
     def write(self, data):
-        """'write()' callable as specified by PEP 333"""
+        """'write()' callable as specified by PEP 3333"""
 
-        assert isinstance(data, (str, bytes)), \
-            "write() argument must be a string or bytes"
+        assert type(data) is bytes, \
+            "write() argument must be a bytes instance"
 
         if not self.status:
             raise AssertionError("write() before start_response()")
@@ -256,7 +255,7 @@
         self.headers_sent = True
         if not self.origin_server or self.client_is_modern():
             self.send_preamble()
-            self._write(str(self.headers))
+            self._write(bytes(self.headers))
 
 
     def result_is_file(self):
@@ -376,12 +375,6 @@
         self.environ.update(self.base_env)
 
     def _write(self,data):
-        if isinstance(data, str):
-            try:
-                data = data.encode("iso-8859-1")
-            except UnicodeEncodeError:
-                raise ValueError("Unicode data must contain only code points"
-                    " representable in ISO-8859-1 encoding")
         self.stdout.write(data)
 
     def _flush(self):

Modified: python/branches/py3k/Lib/wsgiref/headers.py
==============================================================================
--- python/branches/py3k/Lib/wsgiref/headers.py	(original)
+++ python/branches/py3k/Lib/wsgiref/headers.py	Tue Nov  2 23:28:59 2010
@@ -30,21 +30,20 @@
     """Manage a collection of HTTP response headers"""
 
     def __init__(self,headers):
-        if not isinstance(headers, list):
+        if type(headers) is not list:
             raise TypeError("Headers must be a list of name/value tuples")
-        self._headers = []
-        for k, v in headers:
-            k = self._convert_string_type(k)
-            v = self._convert_string_type(v)
-            self._headers.append((k, v))
+        self._headers = headers
+        if __debug__:
+            for k, v in headers:
+                self._convert_string_type(k)
+                self._convert_string_type(v)
 
     def _convert_string_type(self, value):
         """Convert/check value type."""
-        if isinstance(value, str):
+        if type(value) is str:
             return value
-        assert isinstance(value, bytes), ("Header names/values must be"
-            " a string or bytes object (not {0})".format(value))
-        return str(value, "iso-8859-1")
+        raise AssertionError("Header names/values must be"
+            " of type str (got {0})".format(repr(value)))
 
     def __len__(self):
         """Return the total number of headers, including duplicates."""
@@ -139,6 +138,9 @@
         suitable for direct HTTP transmission."""
         return '\r\n'.join(["%s: %s" % kv for kv in self._headers]+['',''])
 
+    def __bytes__(self):
+        return str(self).encode('iso-8859-1')
+
     def setdefault(self,name,value):
         """Return first matching header value for 'name', or 'value'
 

Modified: python/branches/py3k/Lib/wsgiref/validate.py
==============================================================================
--- python/branches/py3k/Lib/wsgiref/validate.py	(original)
+++ python/branches/py3k/Lib/wsgiref/validate.py	Tue Nov  2 23:28:59 2010
@@ -128,11 +128,10 @@
         raise AssertionError(*args)
 
 def check_string_type(value, title):
-    if isinstance(value, str):
+    if type (value) is str:
         return value
-    assert isinstance(value, bytes), \
-        "{0} must be a string or bytes object (not {1})".format(title, value)
-    return str(value, "iso-8859-1")
+    raise AssertionError(
+        "{0} must be of type str (got {1})".format(title, repr(value)))
 
 def validator(application):
 
@@ -197,20 +196,21 @@
     def read(self, *args):
         assert_(len(args) == 1)
         v = self.input.read(*args)
-        assert_(isinstance(v, bytes))
+        assert_(type(v) is bytes)
         return v
 
-    def readline(self):
-        v = self.input.readline()
-        assert_(isinstance(v, bytes))
+    def readline(self, *args):
+        assert_(len(args) <= 1)
+        v = self.input.readline(*args)
+        assert_(type(v) is bytes)
         return v
 
     def readlines(self, *args):
         assert_(len(args) <= 1)
         lines = self.input.readlines(*args)
-        assert_(isinstance(lines, list))
+        assert_(type(lines) is list)
         for line in lines:
-            assert_(isinstance(line, bytes))
+            assert_(type(line) is bytes)
         return lines
 
     def __iter__(self):
@@ -229,7 +229,7 @@
         self.errors = wsgi_errors
 
     def write(self, s):
-        assert_(isinstance(s, str))
+        assert_(type(s) is str)
         self.errors.write(s)
 
     def flush(self):
@@ -248,7 +248,7 @@
         self.writer = wsgi_writer
 
     def __call__(self, s):
-        assert_(isinstance(s, (str, bytes)))
+        assert_(type(s) is bytes)
         self.writer(s)
 
 class PartialIteratorWrapper:
@@ -275,6 +275,8 @@
         assert_(not self.closed,
             "Iterator read after closed")
         v = next(self.iterator)
+        if type(v) is not bytes:
+            assert_(False, "Iterator yielded non-bytestring (%r)" % (v,))
         if self.check_start_response is not None:
             assert_(self.check_start_response,
                 "The application returns and we started iterating over its body, but start_response has not yet been called")
@@ -294,7 +296,7 @@
             "Iterator garbage collected without being closed")
 
 def check_environ(environ):
-    assert_(isinstance(environ, dict),
+    assert_(type(environ) is dict,
         "Environment is not of the right type: %r (environment: %r)"
         % (type(environ), environ))
 
@@ -321,11 +323,11 @@
         if '.' in key:
             # Extension, we don't care about its type
             continue
-        assert_(isinstance(environ[key], str),
+        assert_(type(environ[key]) is str,
             "Environmental variable %s is not a string: %r (value: %r)"
             % (key, type(environ[key]), environ[key]))
 
-    assert_(isinstance(environ['wsgi.version'], tuple),
+    assert_(type(environ['wsgi.version']) is tuple,
         "wsgi.version should be a tuple (%r)" % (environ['wsgi.version'],))
     assert_(environ['wsgi.url_scheme'] in ('http', 'https'),
         "wsgi.url_scheme unknown: %r" % environ['wsgi.url_scheme'])
@@ -385,12 +387,12 @@
             % status, WSGIWarning)
 
 def check_headers(headers):
-    assert_(isinstance(headers, list),
+    assert_(type(headers) is list,
         "Headers (%r) must be of type list: %r"
         % (headers, type(headers)))
     header_names = {}
     for item in headers:
-        assert_(isinstance(item, tuple),
+        assert_(type(item) is tuple,
             "Individual headers (%r) must be of type tuple: %r"
             % (item, type(item)))
         assert_(len(item) == 2)
@@ -428,14 +430,14 @@
         assert_(0, "No Content-Type header found in headers (%s)" % headers)
 
 def check_exc_info(exc_info):
-    assert_(exc_info is None or isinstance(exc_info, tuple),
+    assert_(exc_info is None or type(exc_info) is tuple,
         "exc_info (%r) is not a tuple: %r" % (exc_info, type(exc_info)))
     # More exc_info checks?
 
 def check_iterator(iterator):
-    # Technically a string is legal, which is why it's a really bad
+    # Technically a bytestring is legal, which is why it's a really bad
     # idea, because it may cause the response to be returned
     # character-by-character
     assert_(not isinstance(iterator, (str, bytes)),
         "You should not return a string as your application iterator, "
-        "instead return a single-item list containing that string.")
+        "instead return a single-item list containing a bytestring.")


More information about the Python-checkins mailing list