[Python-checkins] python/nondist/sandbox/digestauth .cvsignore, NONE, 1.1 digestauth.py, NONE, 1.1 httpclient.py, NONE, 1.1 httpserver.py, NONE, 1.1 urllib2.py, NONE, 1.1

anthonybaxter at users.sourceforge.net anthonybaxter at users.sourceforge.net
Wed Oct 15 11:06:43 EDT 2003


Update of /cvsroot/python/python/nondist/sandbox/digestauth
In directory sc8-pr-cvs1:/tmp/cvs-serv9670

Added Files:
	.cvsignore digestauth.py httpclient.py httpserver.py 
	urllib2.py 
Log Message:
sandbox for digest auth support

--- NEW FILE: .cvsignore ---
*.pyc

--- NEW FILE: digestauth.py ---
import md5, sha, time, random


def generate_nonce(bits, randomness=None):
    "This could be stronger"
    if bits%8 != 0:
        raise ValueError, "bits must be a multiple of 8"
    nonce = sha.new(str(randomness) + str(time.time()) + 
            str(random.random()) ).hexdigest()
    nonce = nonce[:bits/4]
    return nonce

def parse_keqv_list(l):
    """Parse list of key=value strings where keys are not duplicated."""
    parsed = {}
    for elt in l:
        k, v = elt.split('=', 1)
        if v[0] == '"' and v[-1] == '"':
            v = v[1:-1]
        parsed[k] = v
    return parsed

class DigestAuthServer:

    def __init__(self, default_realm, algorithm="MD5"):
        self.default_realm = default_realm
        if algorithm != 'MD5':
            raise ValueError, "Don't know about algorithm %s"%(MD5)
        self.algorithm = algorithm
        self._user_hashes = {}

    def get_algorithm_impls(self, algorithm=None):
        # lambdas assume digest modules are imported at the top level
        if algorithm is None:
            algorithm = self.algorithm
        if algorithm == 'MD5':
            H = lambda x: md5.new(x).hexdigest()
        elif algorithm == 'SHA':
            H = lambda x: sha.new(x).hexdigest()
        # XXX MD5-sess
        KD = lambda s, d, H=H: H("%s:%s" % (s, d))
        return H, KD

    def add_user(self, user, password, realm=None):
        "add the given user and password"
        H, KD = self.get_algorithm_impls()
        if realm is None:
            realm = self.default_realm
        A1 = H('%s:%s:%s'%(user, realm, password))
        self._user_hashes[(user, realm)] = A1

    def add_user_hash(self, user, A1, realm=None):
        "add the given user with the stated hash"
        if realm is None:
            realm = self.default_realm
        self._user_hashes[(user, realm)] = A1

    def parse_apache_digest_authfile(self, filename):
        "Parse a password file, as generated by htdigest"
        for line in open(filename, 'rU'):
            line = line.strip()
            user, realm, hash = line.split(':')
            self.add_user_hash(user, hash, realm)

    def generate_challenge(self, realm=None):
        if realm is None:
            realm = self.default_realm

        # We should save off the nonce to make sure it's one we've
        # offered already. And check for replay attacks :-(
        chal = 'realm="%s", nonce="%s", ' \
               'algorithm=%s, qop="auth"'%(realm, 
					   generate_nonce(bits=208), 
					   self.algorithm)
        return chal

# Firebird
# username="anthony", realm="TestAuth", 
# nonce="9da7db19648f95bd71f26a07b3423d91917b5205", uri="/test/foo", 
# algorithm=MD5, response="f61ca0cb8a85e9bd985b7ab808978f1e", 
# qop=auth, nc=00000001, cnonce="424a1ed1ddaa76ca"

# Konqi
# username="anthony", realm="TestAuth", 
# nonce="7c8bdda0ed44db7de74bee97cec8dfd4fb59af0f", uri="/test/foo", 
# algorithm="MD5", qop="auth", cnonce="ODQwMTk=", nc=00000001, 
# response="1bebadb47d2aa5eab53cb419b94599f3"

    def check_auth(self, header, method='GET'):
        "Check a response to our auth challenge"
        from urllib2 import parse_http_list
        H, KD = self.get_algorithm_impls()
        resp = parse_keqv_list(parse_http_list(header))
        if resp.get('algorithm', 'MD5').upper() != self.algorithm:
            return False, "unknown algo %s"%algorithm 
        user = resp['username']
        realm = resp['realm']
        nonce = resp['nonce']
        # XXX Check the nonce is something we've issued
        HA1 = self._user_hashes.get((user,realm))
        if not HA1:
            return False, "unknown user/realm %s/%s"%(user, realm)
        qop = resp.get('qop')
        if qop != 'auth':
            return False, "unknown qop %r"%(qop)
        cnonce, ncvalue = resp.get('cnonce'), resp.get('nc')
        if not cnonce or not ncvalue:
            return False, "failed to provide cnonce"
        # Check the URI is correct!
        A2 = '%s:%s'%(method, resp['uri'])
        noncebit = "%s:%s:%s:%s:%s" % (nonce,ncvalue,cnonce,qop,H(A2))
        respdig = KD(HA1, noncebit)
        if respdig != resp['response']:
            return False, "response incorrect"
        print "all ok"
        return True, "OK"


--- NEW FILE: httpclient.py ---



import urllib2

#URL='http://localhost/test/'
URL='http://localhost:8000/test'

auth = urllib2.HTTPPasswordMgr()
auth.add_password('TestAuth', URL, 'anthony', 'arbadmin2')

opener = urllib2.build_opener(
            urllib2.HTTPDigestAuthHandler(auth)
        )
u =  opener.open(URL)
u =  opener.open(URL)
u =  opener.open(URL)
print u.readlines()

--- NEW FILE: httpserver.py ---
import SimpleHTTPServer, BaseHTTPServer

import digestauth

digester = digestauth.DigestAuthServer(default_realm='TestAuth')
digester.parse_apache_digest_authfile('/var/www/passwords')

class Handler(SimpleHTTPServer.SimpleHTTPRequestHandler):
    def do_GET(self, *args):
	path = self.path

	if path.startswith("/test"):
            if not self.headers.has_key('Authorization'):
                self.send_auth('auth required')
                return
            else:
                auth = self.headers['Authorization']
                if auth.split()[0].lower() == 'digest':
                    ok, reason = digester.check_auth(auth[7:])
                    if not ok:
                        self.send_auth('auth failed: %s'%reason)
                        return
	else:
	    self.send_response(200)
	self.send_header("Content-type", "text/plain")
	self.end_headers()
	self.wfile.write("all good: %s\n"%path)

    def send_auth(self, text):
        self.send_response(401)
        chal = digester.generate_challenge()
        self.send_header('WWW-Authenticate', 'Digest %s'%(chal))
        self.send_header('Content-type', 'text/html')
        self.end_headers()
        self.wfile.write('<html><body><p>'+text+'\n\n</p></body></html>')
        return


def run(server_class=BaseHTTPServer.HTTPServer,
        handler_class=Handler):
    server_address = ('', 8000)
    httpd = server_class(server_address, handler_class)
    httpd.serve_forever()

run()

--- NEW FILE: urllib2.py ---
"""An extensible library for opening URLs using a variety of protocols

The simplest way to use this module is to call the urlopen function,
which accepts a string containing a URL or a Request object (described
below).  It opens the URL and returns the results as file-like
object; the returned object has some extra methods described below.

The OpenerDirector manages a collection of Handler objects that do
all the actual work.  Each Handler implements a particular protocol or
option.  The OpenerDirector is a composite object that invokes the
Handlers needed to open the requested URL.  For example, the
HTTPHandler performs HTTP GET and POST requests and deals with
non-error returns.  The HTTPRedirectHandler automatically deals with
HTTP 301, 302, 303 and 307 redirect errors, and the HTTPDigestAuthHandler
deals with digest authentication.

urlopen(url, data=None) -- basic usage is that same as original
urllib.  pass the url and optionally data to post to an HTTP URL, and
get a file-like object back.  One difference is that you can also pass
[...1143 lines suppressed...]
    install_opener(build_opener(cfh, GopherHandler))

    for url in urls:
        if isinstance(url, tuple):
            url, req = url
        else:
            req = None
        print url
        try:
            f = urlopen(url, req)
        except IOError, err:
            print "IOError:", err
        except socket.error, err:
            print "socket.error:", err
        else:
            buf = f.read()
            f.close()
            print "read %d bytes" % len(buf)
        print
        time.sleep(0.1)





More information about the Python-checkins mailing list