[Python-checkins] python/nondist/sandbox/setuptools/setuptools package_index.py, NONE, 1.1

pje@users.sourceforge.net pje at users.sourceforge.net
Sun Jun 12 05:44:09 CEST 2005


Update of /cvsroot/python/python/nondist/sandbox/setuptools/setuptools
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv15558/setuptools

Added Files:
	package_index.py 
Log Message:
Move package index/downloading stuff to setuptools.package_index module.


--- NEW FILE: package_index.py ---
"""PyPI and direct package downloading"""

import sys, os.path, re, urlparse, urllib2
from pkg_resources import *

HREF = re.compile(r"""href\s*=\s*['"]?([^'"> ]+)""", re.I)
URL_SCHEME = re.compile('([-+.a-z0-9]{2,}):',re.I).match
EXTENSIONS = ".tar.gz .tar.bz2 .tar .zip .tgz".split()

__all__ = [
    'PackageIndex', 'distros_for_url', 
]





























def distros_for_url(url, metadata=None):
    """Yield egg or source distribution objects that might be found at a URL"""

    path = urlparse.urlparse(url)[2]
    base = urllib2.unquote(path.split('/')[-1])

    if base.endswith('.egg'):
        dist = Distribution.from_filename(base, metadata)
        dist.path = url
        yield dist
        return  # only one, unambiguous interpretation

    for ext in EXTENSIONS:
        if base.endswith(ext):
            base = base[:-len(ext)]
            break
    else:
        return  # no extension matched

    # Generate alternative interpretations of a source distro name
    # Because some packages are ambiguous as to name/versions split
    # e.g. "adns-python-1.1.0", "egenix-mx-commercial", etc.
    # So, we generate each possible interepretation (e.g. "adns, python-1.1.0"
    # "adns-python, 1.1.0", and "adns-python-1.1.0, no version").  In practice,
    # the spurious interpretations should be ignored, because in the event
    # there's also an "adns" package, the spurious "python-1.1.0" version will
    # compare lower than any numeric version number, and is therefore unlikely
    # to match a request for it.  It's still a potential problem, though, and
    # in the long run PyPI and the distutils should go for "safe" names and
    # versions in distribution archive names (sdist and bdist).

    parts = base.split('-')
    for p in range(1,len(parts)+1):
        yield Distribution(
            url, metadata, '-'.join(parts[:p]), '-'.join(parts[p:]),
            distro_type = SOURCE_DIST
        )




class PackageIndex(AvailableDistributions):
    """A distribution index that scans web pages for download URLs"""

    def __init__(self,index_url="http://www.python.org/pypi",*args,**kw):
        AvailableDistributions.__init__(self,*args,**kw)
        self.index_url = index_url + "/"[:not index_url.endswith('/')]
        self.scanned_urls = {}
        self.fetched_urls = {}
        self.package_pages = {}

    def scan_url(self, url):
        self.process_url(url, True)

    def process_url(self, url, retrieve=False):
        if url in self.scanned_urls and not retrieve:
            return

        self.scanned_urls[url] = True
        dists = list(distros_for_url(url))
        map(self.add, dists)

        if dists or not retrieve or url in self.fetched_urls:
            # don't need the actual page
            return

        f = self.open_url(url)
        self.fetched_urls[url] = self.fetched_urls[f.url] = True
        if 'html' not in f.headers['content-type'].lower():
            f.close()   # not html, we can't process it
            return

        base = f.url     # handle redirects
        page = f.read()
        f.close()
        if url.startswith(self.index_url):
            self.process_index(url, page)
        else:
            for match in HREF.finditer(page):
                link = urlparse.urljoin(base, match.group(1))
                self.process_url(link)

    def find_packages(self,requirement):
        self.scan_url(self.index_url + requirement.distname)
        if not self.package_pages.get(requirement.key):
            # We couldn't find the target package, so search the index page too
            self.scan_url(self.index_url)
        for url in self.package_pages.get(requirement.key,()):
            # scan each page that might be related to the desired package
            self.scan_url(url)

    def process_index(self,url,page):
        def scan(link):
            if link.startswith(self.index_url):
                parts = map(
                    urllib2.unquote, link[len(self.index_url):].split('/')
                )
                if len(parts)==2:
                    # it's a package page, sanitize and index it
                    pkg = safe_name(parts[0])
                    ver = safe_version(parts[1])
                    self.package_pages.setdefault(pkg.lower(),{})[link] = True
        if url==self.index_url or 'Index of Packages</title>' in page:
            # process an index page into the package-page index
            for match in HREF.finditer(page):
                scan( urlparse.urljoin(url, match.group(1)) )
        else:
            scan(url)   # ensure this page is in the page index
            # process individual package page
            for tag in ("<th>Home Page", "<th>Download URL"):
                pos = page.find(tag)
                if pos!=-1:
                    match = HREF.search(page,pos)
                    if match:
                        # Process the found URL
                        self.scan_url(urlparse.urljoin(url, match.group(1)))

    def obtain(self,requirement):
        self.find_packages(requirement)
        for dist in self.get(requirement.key, ()):
            if dist in requirement:
                return dist

    def download(self, spec, tmpdir):
        """Locate and/or download `spec`, returning a local filename

        `spec` may be a ``Requirement`` object, or a string containing a URL,
        an existing local filename, or a package/version requirement spec
        (i.e. the string form of a ``Requirement`` object).

        If necessary, the requirement is searched for in the package index.
        If the download is successful, the return value is a local file path,
        and it is a subpath of `tmpdir` if the distribution had to be
        downloaded.  If no matching distribution is found, return ``None``.
        Various errors may be raised if a problem occurs during downloading.
        """

        if not isinstance(spec,Requirement):
            scheme = URL_SCHEME(spec)
            if scheme:
                # It's a url, download it to tmpdir
                return self._download_url(scheme.group(1), spec, tmpdir)

            elif os.path.exists(spec):
                # Existing file or directory, just return it
                return spec
            else:
                try:
                    spec = Requirement.parse(spec)
                except ValueError:
                    raise RuntimeError(
                        "Not a URL, existing file, or requirement spec: %r" %
                        (spec,)
                    )

        # process a Requirement
        dist = self.best_match(spec,[])
        if dist is not None:
            return self.download(dist.path, tmpdir)

        return None



    dl_blocksize = 8192

    def _download_to(self, url, filename):
        # Download the file
        fp, tfp = None, None
        try:
            fp = self.open_url(url)
            if isinstance(fp, urllib2.HTTPError):
                raise RuntimeError(
                    "Can't download %s: %s %s" % (url, fp.code,fp.msg)
                )

            headers = fp.info()
            blocknum = 0
            bs = self.dl_blocksize
            size = -1

            if "content-length" in headers:
                size = int(headers["Content-Length"])
                self.reporthook(url, filename, blocknum, bs, size)

            tfp = open(filename,'wb')
            while True:
                block = fp.read(bs)
                if block:
                    tfp.write(block)
                    blocknum += 1
                    self.reporthook(url, filename, blocknum, bs, size)
                else:
                    break
            return headers

        finally:
            if fp: fp.close()
            if tfp: tfp.close()

    def reporthook(self, url, filename, blocknum, blksize, size):
        pass    # no-op



    def open_url(self, url):
        try:
            return urllib2.urlopen(url)
        except urllib2.HTTPError, v:
            return v
        except urllib2.URLError, v:
            raise RuntimeError("Download error: %s" % v.reason)


    def _download_url(self, scheme, url, tmpdir):

        # Determine download filename
        #
        name = filter(None,urlparse.urlparse(url)[2].split('/'))
        if name:
            name = name[-1]
            while '..' in name:
                name = name.replace('..','.').replace('\\','_')
        else:
            name = "__downloaded__"    # default if URL has no path contents

        filename = os.path.join(tmpdir,name)

        # Download the file
        #
        if scheme=='svn' or scheme.startswith('svn+'):
            return self._download_svn(url, filename)
        else:
            headers = self._download_to(url, filename)
            if 'html' in headers['content-type'].lower():
                return self._download_html(url, headers, filename, tmpdir)
            else:
                return filename








    def _download_html(self, url, headers, filename, tmpdir):
        # Check for a sourceforge URL
        sf_url = url.startswith('http://prdownloads.')
        file = open(filename)
        for line in file:
            if line.strip():
                # Check for a subversion index page
                if re.search(r'<title>Revision \d+:', line):
                    # it's a subversion index page:
                    file.close()
                    os.unlink(filename)
                    return self._download_svn(url, filename)
                # Check for a SourceForge header
                elif sf_url:
                    if re.search(r'^<HTML><HEAD>', line, re.I):
                        continue    # skip first line
                    elif re.search(r'<TITLE>Select a Mirror for File:',line):
                        # Sourceforge mirror page
                        page = file.read()
                        file.close()
                        os.unlink(filename)
                        return self._download_sourceforge(url, page, tmpdir)
                break   # not an index page
        file.close()
        raise RuntimeError("Unexpected HTML page found at "+url)


    def _download_svn(self, url, filename):
        os.system("svn checkout -q %s %s" % (url, filename))
        return filename











    def _download_sourceforge(self, source_url, sf_page, tmpdir):
        """Download package from randomly-selected SourceForge mirror"""

        mirror_regex = re.compile(r'HREF=(/.*?\?use_mirror=[^>]*)')
        urls = [m.group(1) for m in mirror_regex.finditer(sf_page)]
        if not urls:
            raise RuntimeError(
                "URL looks like a Sourceforge mirror page, but no URLs found"
            )

        import random
        url = urlparse.urljoin(source_url, random.choice(urls))
        f = self.open_url(url)
        match = re.search(
            r'<META HTTP-EQUIV="refresh" content=".*?URL=(.*?)"',
            f.read()
        )
        f.close()

        if match:
            download_url = match.group(1)
            scheme = URL_SCHEME(download_url)
            return self._download_url(scheme.group(1), download_url, tmpdir)
        else:
            raise RuntimeError(
                'No META HTTP-EQUIV="refresh" found in Sourceforge page at %s'
                % url
            )
















More information about the Python-checkins mailing list