[Python-checkins] r82994 - tracker/instances/python-dev/lib/openid2rp.py

martin.v.loewis python-checkins at python.org
Tue Jul 20 17:09:01 CEST 2010


Author: martin.v.loewis
Date: Tue Jul 20 17:09:01 2010
New Revision: 82994

Log:
Import 6d3ad02c7b41 from the openid2rp package.


Modified:
   tracker/instances/python-dev/lib/openid2rp.py

Modified: tracker/instances/python-dev/lib/openid2rp.py
==============================================================================
--- tracker/instances/python-dev/lib/openid2rp.py	(original)
+++ tracker/instances/python-dev/lib/openid2rp.py	Tue Jul 20 17:09:01 2010
@@ -5,32 +5,31 @@
 
 # This library implements OpenID Authentication 2.0,
 # in the role of a relying party
-# It has the following assumptions and limitations:
-# - service discovery requires YADIS (HTML discovery not implemented)
-# - only provider-directed mode (identifier_select) is supported
-# - direct requests require https
-# - as a signature algorithm, HMAC-SHA1 is requested
-
-import urlparse, urllib, httplib, BeautifulSoup, time
-import cStringIO, base64, hmac, sha, datetime, re, binascii, struct
-import itertools
+
+import urlparse, urllib, httplib, time, cgi, htmllib, formatter
+import cStringIO, base64, hmac, hashlib, datetime, re, random
+import itertools, cPickle, sys
 
 try:
     from xml.etree import ElementTree
 except ImportError:
     from elementtree import ElementTree
 
-# Importing M2Crypto patches urllib; don't let them do that
-orig = urllib.URLopener.open_https.im_func
-from M2Crypto import DH
-urllib.URLopener.open_https = orig
-
 # Don't use urllib2, since it breaks in 2.5
 # for https://login.launchpad.net//+xrds
 
 # Don't use urllib, since it sometimes selects HTTP/1.1 (e.g. in PyPI)
 # and then fails to parse chunked responses.
 
+# 3.x portability
+
+if sys.version_info < (3,):
+    def b(s):
+        return s
+else:
+    def b(s):
+        return s.encode('ascii')
+
 def normalize_uri(uri):
     """Normalize an uri according to OpenID section 7.2. Return a pair
     type,value, where type can be either 'xri' or 'uri'."""
@@ -134,11 +133,27 @@
 def parse_response(s):
     '''Parse a key-value form (OpenID section 4.1.1) into a dictionary'''
     res = {}
-    for line in s.splitlines():
+    for line in s.decode('utf-8').splitlines():
         k,v = line.split(':', 1)
         res[k] = v
     return res
 
+class OpenIDParser(htmllib.HTMLParser):
+    def __init__(self):
+        htmllib.HTMLParser.__init__(self, formatter.NullFormatter())
+        self.links = {}
+        self.xrds_location=None
+
+    def do_link(self, attrs):
+        attrs = dict(attrs)
+        self.links[attrs['rel']] = attrs['href']
+
+    def do_meta(self, attrs):
+        attrs = dict(attrs)
+        # Yadis 6.2.5 option 1: meta tag
+        if attrs['http-equiv'].lower() == 'x-xrds-location':
+            self.xrds_location = attrs['content']
+
 def discover(url):
     '''Perform service discovery on the OP URL.
     Return list of service types, and the auth/2.0 URL,
@@ -175,7 +190,10 @@
     if res.status in (301, 302, 303, 307):
         return discover(res.msg.get('location'))
 
-    content_type = res.msg.gettype()
+    if sys.version_info < (3,0):
+        content_type = res.msg.gettype()
+    else:
+        content_type = res.msg.get_content_type()
 
     # Yadis 6.2.5 option 2 and 3: header includes x-xrds-location
     xrds_loc = res.msg.get('x-xrds-location')
@@ -183,31 +201,21 @@
         return discover(xrds_loc)
 
     if content_type in ('text/html', 'application/xhtml+xml'):
-        soup = BeautifulSoup.BeautifulSoup(data)
+        parser = OpenIDParser()
+        parser.feed(data)
+        parser.close()
         # Yadis 6.2.5 option 1: meta tag
-        meta = soup.find('meta', {'http-equiv':lambda v:v and v.lower()=='x-xrds-location'})
-        if meta:
-            xrds_loc = meta['content']
-            return discover(xrds_loc)
+        if parser.xrds_location:
+            return discover(parser.xrds_location)
         # OpenID 7.3.3: attempt html based discovery
-        op_endpoint = soup.find('link', {'rel':lambda v:v and 'openid2.provider' in v.lower()})
+        op_endpoint = parser.links.get('openid2.provider')
         if op_endpoint:
-            op_endpoint = op_endpoint['href']
-            op_local = soup.find('link', {'rel':lambda v:v and 'openid2.local_id' in v.lower()})
-            if op_local:
-                op_local = op_local['href']
-            else:
-                op_local = None
+            op_local = parser.links.get('openid2.local_id')
             return ['http://specs.openid.net/auth/2.0/signon'], op_endpoint, op_local
         # 14.2.1: 1.1 compatibility
-        op_endpoint = soup.find('link', {'rel':lambda v:v and 'openid.server' in v.lower()})
+        op_endpoint = parser.links.get('openid.server')
         if op_endpoint:
-            op_endpoint = op_endpoint['href']
-            op_local = soup.find('link', {'rel':lambda v:v and 'openid.delegate' in v.lower()})
-            if op_local:
-                op_local = op_local['href']
-            else:
-                op_local = None
+            op_local = parser.links.get('openid.delegate')
             return ['http://openid.net/signon/1.1'], op_endpoint, op_local
         # Discovery failed
         return None
@@ -273,16 +281,20 @@
         if uri in services:
             return True
     return False
+is_op_identifier = is_op_endpoint
+
+# 4.1.3: Binary two's complement
+def btwoc(l):
+    res = cPickle.dumps(l, 2)
+    # Pickle result: proto 2, long1 (integer < 256 bytes)
+    # number of bytes, little-endian integer, stop
+    assert res[:3] == '\x80\x02\x8a' 
+    # btwoc ought to produce the shortest representation in two's
+    # complement. Fortunately, pickle already does that.
+    return res[3+ord(res[3]):3:-1]
 
-# OpenSSL MPI integer representation
-def bin2mpi(bin):
-    if ord(bin[0]) >= 128:
-        # avoid interpretation as a negative number
-        bin = "\x00" + bin
-    return struct.pack(">i", len(bin))+bin
-def mpi2bin(mpi):
-    assert len(mpi)-4 == struct.unpack(">i", mpi[:4])[0]
-    return mpi[4:]
+def unbtwoc(b):
+    return cPickle.loads('\x80\x02\x8a'+chr(len(b))+b[::-1]+'.')
 
 # Appendix B; DH default prime
 dh_prime = """
@@ -291,11 +303,7 @@
 7D45C2E7E52DC81C7A171876E5CEA74B1448BFDFAF18828EFD2519F14E45E382
 6634AF1949E5B535CC829A483B8A76223E5D490A257F05BDFF16F2FB22C583AB
 """
-dh_prime = binascii.unhexlify("".join(dh_prime.split()))
-# OpenSSL MPI representation: dh_prime, 2
-dh = DH.set_params(bin2mpi(dh_prime), '\x00\x00\x00\x01\x02')
-dh.gen_key()
-dh_public_base64 = base64.b64encode(mpi2bin(dh.pub))
+dh_prime = long("".join(dh_prime.split()), 16)
 
 def string_xor(s1, s2):
     res = []
@@ -315,6 +323,11 @@
     if url.startswith('http:'):
         # Use DH exchange
         data['openid.session_type'] = "DH-SHA1"
+        # Private key: random number between 1 and dh_prime-1
+        priv = random.randrange(1, dh_prime-1)
+        # Public key: 2^priv mod prime
+        pubkey = pow(2L, priv, dh_prime)
+        dh_public_base64 = base64.b64encode(btwoc(pubkey))
         # No need to send key and generator
         data['openid.dh_consumer_public'] = dh_public_base64
     if is_compat_1x(services):
@@ -328,21 +341,45 @@
         raise ValueError, "associate failed: "+data['error']
     if url.startswith('http:'):
         enc_mac_key = base64.b64decode(data['enc_mac_key'])
-        dh_server_public = base64.b64decode(data['dh_server_public'])
-        # compute_key does not return an MPI
-        shared_secret = dh.compute_key(bin2mpi(dh_server_public))
-        if ord(shared_secret[0]) >= 128:
-            # btwoc: add leading zero if number would otherwise be negative
-            shared_secret = '\x00' + shared_secret
-        shared_secret = sha.new(shared_secret).digest()
+        dh_server_public = unbtwoc(base64.b64decode(data['dh_server_public']))
+        # shared secret: sha1(2^(server_priv*priv) mod prime) xor enc_mac_key
+        shared_secret = btwoc(pow(dh_server_public, priv, dh_prime))
+        shared_secret = hashlib.sha1(shared_secret).digest()
         if len(shared_secret) != len(enc_mac_key):
             raise ValueError, "incorrect DH key size"
         # Fake mac_key result
         data['mac_key'] = base64.b64encode(string_xor(enc_mac_key, shared_secret))
     return data
 
+class _AX:
+    def __init__(self):
+        self.__dict__['_reverse'] = {}
+    def __setattr__(self, name, value):
+        self.__dict__[name] = value
+        self._reverse[value] = name
+    def lookup(self, value):
+        try:
+            return self._reverse[value]
+        except KeyError:
+            return 'a%d' % (hash(value) % 1000000000)
+
+AX = _AX()
+AX.nickname  = "http://axschema.org/namePerson/friendly"
+AX.email     = "http://axschema.org/contact/email"
+AX.fullname  = "http://axschema.org/namePerson"
+AX.dob       = "http://axschema.org/birthDate"
+AX.gender    = "http://axschema.org/person/gender"
+AX.postcode  = "http://axschema.org/contact/postalCode/home"
+AX.country   = "http://axschema.org/contact/country/home"
+AX.language  = "http://axschema.org/pref/language"
+AX.timezone  = "http://axschema.org/pref/timezone"
+AX.first     = "http://axschema.org/namePerson/first"
+AX.last      = "http://axschema.org/namePerson/last"
+
 def request_authentication(services, url, assoc_handle, return_to,
-                           claimed=None, op_local=None, realm=None):
+                           claimed=None, op_local=None, realm=None,
+                           sreg = (('nickname', 'email'), ()),
+                           ax = ((AX.email, AX.first, AX.last), ())):
     '''Request authentication (OpenID section 9).
     services is the list of discovered service types,
     url the OP service URL, assoc_handle the established session
@@ -375,28 +412,38 @@
         'openid.claimed_id':claimed,
         'openid.identity':op_local,
         'openid.realm':realm,
-        'openid.ns.sreg':"http://openid.net/sreg/1.0",
         'openid.sreg.required':'nickname,email',
         }
+    sreg_req, sreg_opt = sreg
+    sreg11 = {}
+    if sreg_req or sreg_opt:
+        data['openid.ns.sreg'] = "http://openid.net/sreg/1.0"
+        if sreg_req:
+            data['openid.sreg.required'] = sreg11['openid.sreg11.required'] = ','.join(sreg_req)
+        if sreg_opt:
+            data['openid.sreg.optional'] =  sreg11['openid.sreg11.optional'] =','.join(sreg_opt)
     if is_compat_1x(services):
         del data['openid.ns']
         del data['openid.claimed_id']
         del data['openid.realm']
         data['openid.trust_root'] = return_to
-    if "http://openid.net/srv/ax/1.0" in services:
+    ax_req, ax_opt = ax
+    if "http://openid.net/srv/ax/1.0" in services and (ax_req or ax_opt):
         data.update({
             'openid.ns.ax':"http://openid.net/srv/ax/1.0",
             'openid.ax.mode':'fetch_request',
-            'openid.ax.required':'email,first,last',
-            'openid.ax.type.email':'http://axschema.org/contact/email',
-            'openid.ax.type.first':"http://axschema.org/namePerson/first",
-            'openid.ax.type.last':"http://axschema.org/namePerson/last",
             })
-    if "http://openid.net/extensions/sreg/1.1" in services:
+        for uri in ax_req + ax_opt:
+            data['openid.ax.type.'+AX.lookup(uri)] = uri
+        if ax_req:
+            data['openid.ax.required'] = ','.join(AX.lookup(uri) for uri in ax_req)
+        if ax_opt:
+            data['openid.ax.if_available'] = ','.join(AX.lookup(uri) for uri in ax_req)
+    if "http://openid.net/extensions/sreg/1.1" in services and sreg11:
         data.update({
             'openid.ns.sreg11':"http://openid.net/extensions/sreg/1.1",
-            'openid.sreg11.required':'nickname,email'
             })
+        data.update(sreg11)
     if '?' in url:
         return url+'&'+urllib.urlencode(data)
     else:
@@ -405,20 +452,29 @@
 class NotAuthenticated(Exception):
     pass
 
+def _prepare_response(response):
+    if isinstance(response, str):
+        return cgi.parse_qs(response)
+    # backwards compatibility: allow caller to pass parse_qs result
+    # already
+    pass
+    return response
+
 def authenticate(session, response):
-    '''Process an authentication response.
-    session must be the established session (minimally including
-    assoc_handle and mac_key), response is the query string as parsed
-    by cgi.parse_qs.
-    If authentication succeeds, return the list of signed fields.
-    If the user was not authenticated, NotAuthenticated is raised.
-    If the HTTP request is invalid (missing parameters, failure to
-    validate signature), different exceptions will be raised, typically
-    ValueError.
+    '''Process an authentication response.  session must be the
+    established session (minimally including assoc_handle and
+    mac_key), response the query string as given in the original URL
+    (i.e. as the CGI variable QUERY_STRING).  If authentication
+    succeeds, return the list of signed fields.  If the user was not
+    authenticated, NotAuthenticated is raised.  If the HTTP request is
+    invalid (missing parameters, failure to validate signature),
+    different exceptions will be raised, typically ValueError.
 
     Callers must check openid.response_nonce for replay attacks.
     '''
 
+    response = _prepare_response(response)
+
     # 1.1 compat: openid.ns may not be sent
     # if response['openid.ns'][0] != 'http://specs.openid.net/auth/2.0':
     #    raise ValueError('missing openid.ns')
@@ -431,22 +487,21 @@
     if  'openid.identity' not in response:
         raise ValueError('missing openid.identity')
 
-    # Won't check nonce value - caller must verify this is not a replay
+    # Will not check nonce value - caller must verify this is not a replay
 
     signed = response['openid.signed'][0].split(',')
     query = []
     for name in signed:
-        if isinstance(name, unicode):
-            name = name.encode('ascii')
         value = response['openid.'+name][0]
-        if isinstance(value, unicode):
+        value = '%s:%s\n' % (name, value)
+        if sys.version_info >= (3,):
             value = value.encode('utf-8')
-        query.append('%s:%s\n' % (name, value))
-    query = ''.join(query)
+        query.append(value)
+    query = b('').join(query)
 
-    mac_key = base64.decodestring(session['mac_key'])
-    transmitted_sig = base64.decodestring(response['openid.sig'][0])
-    computed_sig = hmac.new(mac_key, query, sha).digest()
+    mac_key = base64.decodestring(b(session['mac_key']))
+    transmitted_sig = base64.decodestring(b(response['openid.sig'][0]))
+    computed_sig = hmac.new(mac_key, query, hashlib.sha1).digest()
 
     if transmitted_sig != computed_sig:
         raise ValueError('Invalid signature')
@@ -474,6 +529,7 @@
     return stamp
 
 def get_namespaces(resp):
+    resp = _prepare_response(resp)
     res = {}
     for k, v in resp.items():
         if k.startswith('openid.ns.'):
@@ -487,6 +543,7 @@
     ax = ns["http://openid.net/srv/ax/1.0"]+"."
     oax = "openid."+ax
     res = {}
+    resp = _prepare_response(resp)
     for k, v in resp.items():
         if k.startswith(oax+"type."):
             k = k.rsplit('.',1)[1]
@@ -495,11 +552,23 @@
                 continue
             res[v[0]] = resp[value_name][0]
     return res
-    
+
+def get_sreg(resp, validated):
+    """Return the dictionary of simple registration parameters in resp,
+    with the openid.sreg. prefix stripped."""
+    res = {}
+    resp = _prepare_response(resp)
+    for k, v in resp.items():
+        if k.startswith('openid.sreg.'):
+            k = k[len('openid.sreg.'):]
+            if 'sreg.'+k in validated:
+                res[k] = v[0]
+    return res
 
 def get_email(resp):
     "Return the email address embedded response, or None."
 
+    resp = _prepare_response(resp)
     validated = resp['openid.signed'][0]
 
     # SREG 1.0; doesn't require namespace, as the protocol doesn't
@@ -518,8 +587,11 @@
     return None
 
 def get_username(resp):
-    "Return either nickname or (first, last) or None."
+    """Return either nickname or (first, last) or None.
+    This function is deprecated; use get_ax and get_sreg instead.
+    """
 
+    resp = _prepare_response(resp)
     validated = resp['openid.signed'][0]
     if 'openid.sreg.nickname' in resp and \
        'sreg.nickname' in validated:
@@ -535,167 +607,3 @@
 
     # TODO: SREG 1.1
     return
-
-
-################ Test Server #################################
-
-import BaseHTTPServer, cgi
-
-# supported providers
-providers = (
-    ('Google', 'http://www.google.com/favicon.ico', 'https://www.google.com/accounts/o8/id'),
-    ('Yahoo', 'http://www.yahoo.com/favicon.ico', 'http://yahoo.com/'),
-    # Verisigns service URL is not https
-    #('Verisign', 'https://pip.verisignlabs.com/favicon.ico', 'https://pip.verisignlabs.com')
-    ('myOpenID', 'https://www.myopenid.com/favicon.ico', 'https://www.myopenid.com/'),
-    ('Launchpad', 'https://login.launchpad.net/favicon.ico', 'https://login.launchpad.net/')
-    )
-             
-sessions = []
-class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
-
-    def write(self, payload, type):
-        self.send_response(200)
-        self.send_header("Content-type", type)
-        self.send_header("Content-length", str(len(payload)))
-        self.end_headers()
-        self.wfile.write(payload)
-
-    def do_GET(self):
-        if self.path == '/':
-            return self.root()
-        path = self.path
-        i = path.rfind('?')
-        if i >= 0:
-            query = cgi.parse_qs(path[i+1:])
-            path = path[:i]
-        else:
-            query = {}
-        if path == '/':
-            if 'provider' in query:
-                prov = [p for p in providers if p[0]  == query['provider'][0]]
-                if len(prov) != 1:
-                    return self.not_found()
-                prov = prov[0]
-                services, url, op_local = discover(prov[2])
-                session = associate(services, url)
-                sessions.append(session)
-                self.send_response(307) # temporary redirect - do not cache
-                self.send_header("Location", request_authentication
-                                 (services, url, session['assoc_handle'],
-                                  self.base_url+"?returned=1"))
-                self.end_headers()
-                return
-            if 'claimed' in query:
-                kind, claimed = normalize_uri(query['claimed'][0])
-                if kind == 'xri':
-                    return self.error('XRI resolution not supported')
-                res = discover(claimed)
-                if res is None:
-                    return self.error('Discovery failed')
-                services, url, op_local = res
-                session = associate(services, url)
-                sessions.append(session)
-                self.send_response(307)
-                self.send_header("Location", request_authentication
-                                 (services, url, session['assoc_handle'],
-                                  self.base_url+"?returned=1",
-                                  claimed, op_local))
-                self.end_headers()
-                return                
-            if 'returned' in query:
-                if 'openid.identity' not in query:
-                    return self.rp_discovery()
-                handle = query['openid.assoc_handle'][0]
-                for session in sessions:
-                    if session['assoc_handle'] == handle:
-                        break
-                else:
-                    session = None
-                if not session:
-                    return self.error('Not authenticated (no session)')
-                try:
-                    signed = authenticate(session, query)
-                except Exception, e:
-                    self.error("Authentication failed: "+repr(e))
-                    return
-                if 'openid.claimed_id' in query:
-                    if 'claimed_id' not in signed:
-                        return self.error('Incomplete signature')
-                    claimed = query['openid.claimed_id'][0]
-                else:
-                    # OpenID 1, claimed ID not reported - should set cookie
-                    if 'identity' not in signed:
-                        return self.error('Incomplete signature')
-                    claimed = query['openid.identity'][0]
-                payload = "Hello "+claimed+"\n"
-                email = get_email(query)
-                if email:
-                    payload += 'Your email is '+email+"\n"
-                else:
-                    payload += 'No email address is known\n'
-                username = get_username(query)
-                if isinstance(username, tuple):
-                    username = " ".join(username)
-                if username:
-                    payload += 'Your nickname is '+username+'\n'
-                else:
-                    payload += 'No nickname is known\n'
-                return self.write(payload, "text/plain")
-                
-        return self.not_found()
-
-    
-
-    def debug(self, value):
-        payload = repr(value)
-        self.write(payload, "text/plain")
-
-    def error(self, text):
-        self.write(text, "text/plain")
-
-    def root(self):
-        payload = "<html><head><title>OpenID login</title></head><body>\n"
-        
-        for name, icon, provider in providers:
-            payload += "<p><a href='%s?provider=%s'><img src='%s' alt='%s'></a></p>\n" % (
-                self.base_url, name, icon, name)
-        payload += "<form>Type your OpenID:<input name='claimed'/><input type='submit'/></form>\n"
-        payload += "</body></html>"
-        self.write(payload, "text/html")
-
-    def rp_discovery(self):
-        payload = '''<xrds:XRDS  
-                xmlns:xrds="xri://$xrds"  
-                xmlns="xri://$xrd*($v*2.0)">  
-                <XRD>  
-                     <Service priority="1">  
-                              <Type>http://specs.openid.net/auth/2.0/return_to</Type>  
-                              <URI>%s</URI>  
-                     </Service>  
-                </XRD>  
-                </xrds:XRDS>
-        ''' % (self.base_url+"/?returned=1")
-        self.write(payload, 'application/xrds+xml')
-
-    def not_found(self):
-        self.send_response(404)
-        self.end_headers()
-        
-# OpenID providers often attempt relying-party discovery
-# This requires the test server to use a globally valid URL
-# If Python cannot correctly determine the base URL, you
-# can pass it as command line argument
-def test_server():
-    import socket, sys
-    if len(sys.argv) > 1:
-        base_url = sys.argv[1]
-    else:
-        base_url = "http://" + socket.getfqdn() + ":8000/"
-    Handler.base_url = base_url
-    BaseHTTPServer.HTTPServer.address_family = socket.AF_INET6
-    httpd = BaseHTTPServer.HTTPServer(('', 8000), Handler)
-    httpd.serve_forever()
-
-if __name__ == '__main__':
-    test_server()


More information about the Python-checkins mailing list