[Web-SIG] My WSGIHTTPServer implementation
Peter Hunt
floydophone at gmail.com
Sat Aug 28 04:42:34 CEST 2004
As the PEAK one is horribly out of date, I decided to implement a new
one. I don't know if this is exactly the interface you want, but it's
a start.
The way it works is you write a .py script, and include a module-level
"application" callable, which is your WSGI application. It will
execute it from there.
It's horribly insecure, but should help with people testing their WSGI
apps. The docs are scarce.
Attached is the implementation as well as the example app from the PEP.
-------------- next part --------------
#!/usr/bin/env python
def application(environ, start_response):
"""Simplest possible application object"""
status = '200 OK'
headers = [('Content-type','text/plain')]
write = start_response(status, headers)
write('Hello world!\n')
-------------- next part --------------
"""WSGI-savvy HTTP Server.
SECURITY WARNING: DON'T USE THIS CODE UNLESS YOU ARE INSIDE A FIREWALL
-- it may execute arbitrary Python code or external programs.
"""
__version__ = "0.4"
__all__ = ["WSGIHTTPRequestHandler"]
import os
import sys
import urllib
import BaseHTTPServer
import SimpleHTTPServer
import select
import traceback
class WSGIHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
"""Complete HTTP server with GET, HEAD and POST commands.
GET and HEAD also support running CGI scripts.
The POST command is *only* implemented for CGI scripts.
"""
# Make rfile unbuffered -- we need to read one line and then pass
# the rest to a subprocess, so we can't use buffered input.
rbufsize = 0
def do_POST(self):
"""Serve a POST request.
This is only implemented for CGI scripts.
"""
if self.is_cgi():
self.run_cgi()
else:
self.send_error(501, "Can only POST to CGI scripts")
def do_GET(self):
if self.is_cgi():
self.run_cgi()
else:
SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
def send_head(self):
"""Version of send_head that support CGI scripts"""
if self.is_cgi():
return self.run_cgi()
else:
return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self)
def is_cgi(self):
"""Test whether self.path corresponds to a Python script
"""
path = self.path
if "?" in path:
path = path[:path.rfind("?")]
if self.is_python(path):
i = path.rfind("/")
if i == -1:
self.cgi_info = "/",path
else:
self.cgi_info = path[:i],path[i+1:]
#self.cgi_info = os.path.split(path)#path[:-1], path[-1]
return True
else:
return False
def is_python(self, path):
"""Test whether argument path is a Python script."""
head, tail = os.path.splitext(path)
return tail.lower() in (".py", ".pyw")
def run_cgi(self):
"""Execute a CGI script."""
dir, rest = self.cgi_info
i = rest.rfind('?')
if i >= 0:
rest, query = rest[:i], rest[i+1:]
else:
query = ''
i = rest.find('/')
if i >= 0:
script, rest = rest[:i], rest[i:]
else:
script, rest = rest, ''
scriptname = dir + '/' + script
scriptfile = self.translate_path(scriptname)
if not os.path.exists(scriptfile):
self.send_error(404, "No such CGI script (%s)" % `scriptname`)
return
if not os.path.isfile(scriptfile):
self.send_error(403, "CGI script is not a plain file (%s)" %
`scriptname`)
return
ispy = self.is_python(scriptname)
if not ispy:
self.send_error(403, "CGI script is not a Python script (%s)" %
`scriptname`)
return
# Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html
# XXX Much of the following could be prepared ahead of time!
env = {}
env['SERVER_SOFTWARE'] = self.version_string()
env['SERVER_NAME'] = self.server.server_name
env['GATEWAY_INTERFACE'] = 'CGI/1.1'
env['SERVER_PROTOCOL'] = self.protocol_version
env['SERVER_PORT'] = str(self.server.server_port)
env['REQUEST_METHOD'] = self.command
uqrest = urllib.unquote(rest)
env['PATH_INFO'] = uqrest
env['PATH_TRANSLATED'] = self.translate_path(uqrest)
env['SCRIPT_NAME'] = scriptname
if query:
env['QUERY_STRING'] = query
host = self.address_string()
if host != self.client_address[0]:
env['REMOTE_HOST'] = host
env['REMOTE_ADDR'] = self.client_address[0]
# XXX AUTH_TYPE
# XXX REMOTE_USER
# XXX REMOTE_IDENT
if self.headers.typeheader is None:
env['CONTENT_TYPE'] = self.headers.type
else:
env['CONTENT_TYPE'] = self.headers.typeheader
length = self.headers.getheader('content-length')
if length:
env['CONTENT_LENGTH'] = length
accept = []
for line in self.headers.getallmatchingheaders('accept'):
if line[:1] in "\t\n\r ":
accept.append(line.strip())
else:
accept = accept + line[7:].split(',')
env['HTTP_ACCEPT'] = ','.join(accept)
ua = self.headers.getheader('user-agent')
if ua:
env['HTTP_USER_AGENT'] = ua
co = filter(None, self.headers.getheaders('cookie'))
if co:
env['HTTP_COOKIE'] = ', '.join(co)
# XXX Other HTTP_* headers
# Since we're setting the env in the parent, provide empty
# values to override previously set values
for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',
'HTTP_USER_AGENT', 'HTTP_COOKIE'):
env.setdefault(k, "")
env.update(os.environ)
# now, set WSGI vars
env['wsgi.input'] = self.rfile
env['wsgi.errors'] = sys.stderr
env['wsgi.version'] = '1.0'
env['wsgi.multithread'] = False
env['wsgi.multiprocess'] = True
decoded_query = query.replace('+', ' ')
try:
ns = {}
execfile(scriptfile,ns,ns)
ns["application"](env, self.start_response)
except:
traceback.print_exc(file=sys.stderr)
self.log_error("WSGI script could not be executed.")
def start_response(self, status, headers):
code,desc = status.split(" ",1)
self.send_response(int(code), desc)
for k,v in headers:
self.wfile.write("%s: %s\r\n" % (k,v))
self.wfile.write("\r\n")
return self.wfile.write
def test(HandlerClass = WSGIHTTPRequestHandler,
ServerClass = BaseHTTPServer.HTTPServer):
SimpleHTTPServer.test(HandlerClass, ServerClass)
if __name__ == '__main__':
test()
More information about the Web-SIG
mailing list