[Web-SIG] My WSGIHTTPServer implementation

Peter Hunt floydophone at gmail.com
Sat Aug 28 04:42:34 CEST 2004


As the PEAK one is horribly out of date, I decided to implement a new
one. I don't know if this is exactly the interface you want, but it's
a start.

The way it works is you write a .py script, and include a module-level
"application" callable, which is your WSGI application. It will
execute it from there.

It's horribly insecure, but should help with people testing their WSGI
apps. The docs are scarce.

Attached is the implementation as well as the example app from the PEP.
-------------- next part --------------
#!/usr/bin/env python

def application(environ, start_response):
	 """Simplest possible application object"""
	 status = '200 OK'
	 headers = [('Content-type','text/plain')]
	 write = start_response(status, headers)
	 write('Hello world!\n')
-------------- next part --------------
"""WSGI-savvy HTTP Server.


SECURITY WARNING: DON'T USE THIS CODE UNLESS YOU ARE INSIDE A FIREWALL
-- it may execute arbitrary Python code or external programs.

"""


__version__ = "0.4"

__all__ = ["WSGIHTTPRequestHandler"]

import os
import sys
import urllib
import BaseHTTPServer
import SimpleHTTPServer
import select
import traceback

class WSGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):

    """Complete HTTP server with GET, HEAD and POST commands.

    GET and HEAD also support running CGI scripts.

    The POST command is *only* implemented for CGI scripts.

    """

    # Make rfile unbuffered -- we need to read one line and then pass
    # the rest to a subprocess, so we can't use buffered input.
    rbufsize = 0

    def do_POST(self):
        """Serve a POST request.

        This is only implemented for CGI scripts.

        """

        if self.is_cgi():
            self.run_cgi()
        else:
            self.send_error(501, "Can only POST to CGI scripts")

    def do_GET(self):
        if self.is_cgi():
            self.run_cgi()
        else:
            SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)

    def send_head(self):
        """Version of send_head that support CGI scripts"""
        if self.is_cgi():
            return self.run_cgi()
        else:
            return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self)

    def is_cgi(self):
        """Test whether self.path corresponds to a Python script
        """

        path = self.path

        if "?" in path:
            path = path[:path.rfind("?")]

        if self.is_python(path):
            i = path.rfind("/")
            if i == -1:
                self.cgi_info = "/",path
            else:
                self.cgi_info = path[:i],path[i+1:]
            #self.cgi_info = os.path.split(path)#path[:-1], path[-1]
            return True
        else:
            return False

    def is_python(self, path):
        """Test whether argument path is a Python script."""
        head, tail = os.path.splitext(path)
        return tail.lower() in (".py", ".pyw")

    def run_cgi(self):
        """Execute a CGI script."""
        dir, rest = self.cgi_info
        i = rest.rfind('?')
        if i >= 0:
            rest, query = rest[:i], rest[i+1:]
        else:
            query = ''
        i = rest.find('/')
        if i >= 0:
            script, rest = rest[:i], rest[i:]
        else:
            script, rest = rest, ''
            
        scriptname = dir + '/' + script
        scriptfile = self.translate_path(scriptname)
        if not os.path.exists(scriptfile):
            self.send_error(404, "No such CGI script (%s)" % `scriptname`)
            return
        if not os.path.isfile(scriptfile):
            self.send_error(403, "CGI script is not a plain file (%s)" %
                            `scriptname`)
            return
        ispy = self.is_python(scriptname)
        if not ispy:
            self.send_error(403, "CGI script is not a Python script (%s)" %
                            `scriptname`)
            return
        # Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html
        # XXX Much of the following could be prepared ahead of time!
        env = {}
        env['SERVER_SOFTWARE'] = self.version_string()
        env['SERVER_NAME'] = self.server.server_name
        env['GATEWAY_INTERFACE'] = 'CGI/1.1'
        env['SERVER_PROTOCOL'] = self.protocol_version
        env['SERVER_PORT'] = str(self.server.server_port)
        env['REQUEST_METHOD'] = self.command
        uqrest = urllib.unquote(rest)
        env['PATH_INFO'] = uqrest
        env['PATH_TRANSLATED'] = self.translate_path(uqrest)
        env['SCRIPT_NAME'] = scriptname
        if query:
            env['QUERY_STRING'] = query
        host = self.address_string()
        if host != self.client_address[0]:
            env['REMOTE_HOST'] = host
        env['REMOTE_ADDR'] = self.client_address[0]
        # XXX AUTH_TYPE
        # XXX REMOTE_USER
        # XXX REMOTE_IDENT
        if self.headers.typeheader is None:
            env['CONTENT_TYPE'] = self.headers.type
        else:
            env['CONTENT_TYPE'] = self.headers.typeheader
        length = self.headers.getheader('content-length')
        if length:
            env['CONTENT_LENGTH'] = length
        accept = []
        for line in self.headers.getallmatchingheaders('accept'):
            if line[:1] in "\t\n\r ":
                accept.append(line.strip())
            else:
                accept = accept + line[7:].split(',')
        env['HTTP_ACCEPT'] = ','.join(accept)
        ua = self.headers.getheader('user-agent')
        if ua:
            env['HTTP_USER_AGENT'] = ua
        co = filter(None, self.headers.getheaders('cookie'))
        if co:
            env['HTTP_COOKIE'] = ', '.join(co)
        # XXX Other HTTP_* headers
        # Since we're setting the env in the parent, provide empty
        # values to override previously set values
        for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',
                  'HTTP_USER_AGENT', 'HTTP_COOKIE'):
            env.setdefault(k, "")
        env.update(os.environ)

        # now, set WSGI vars
        env['wsgi.input']        = self.rfile
        env['wsgi.errors']       = sys.stderr
        env['wsgi.version']      = '1.0'
        env['wsgi.multithread']  = False
        env['wsgi.multiprocess'] = True
        decoded_query = query.replace('+', ' ')

        try:
            ns = {}
            execfile(scriptfile,ns,ns)
            ns["application"](env, self.start_response)
        except:
            traceback.print_exc(file=sys.stderr)
            self.log_error("WSGI script could not be executed.")
    def start_response(self, status, headers):
        code,desc = status.split(" ",1)
        self.send_response(int(code), desc)
        for k,v in headers:
            self.wfile.write("%s: %s\r\n" % (k,v))
        self.wfile.write("\r\n")
        return self.wfile.write


def test(HandlerClass = WSGIHTTPRequestHandler,
         ServerClass = BaseHTTPServer.HTTPServer):
    SimpleHTTPServer.test(HandlerClass, ServerClass)


if __name__ == '__main__':
    test()


More information about the Web-SIG mailing list