[Python-checkins] python/nondist/sandbox/setuptools/setuptools package_index.py, NONE, 1.1
pje@users.sourceforge.net
pje at users.sourceforge.net
Sun Jun 12 05:44:09 CEST 2005
- Previous message: [Python-checkins] python/nondist/sandbox/setuptools EasyInstall.txt, 1.7, 1.8 easy_install.py, 1.15, 1.16
- Next message: [Python-checkins] python/dist/src/Lib/idlelib CREDITS.txt, 1.10, 1.11 ColorDelegator.py, 1.14, 1.15 EditorWindow.py, 1.67, 1.68 NEWS.txt, 1.58, 1.59 help.txt, 1.12, 1.13
- Messages sorted by:
[ date ]
[ thread ]
[ subject ]
[ author ]
Update of /cvsroot/python/python/nondist/sandbox/setuptools/setuptools
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv15558/setuptools
Added Files:
package_index.py
Log Message:
Move package index/downloading stuff to setuptools.package_index module.
--- NEW FILE: package_index.py ---
"""PyPI and direct package downloading"""
import sys, os.path, re, urlparse, urllib2
from pkg_resources import *
HREF = re.compile(r"""href\s*=\s*['"]?([^'"> ]+)""", re.I)
URL_SCHEME = re.compile('([-+.a-z0-9]{2,}):',re.I).match
EXTENSIONS = ".tar.gz .tar.bz2 .tar .zip .tgz".split()
__all__ = [
'PackageIndex', 'distros_for_url',
]
def distros_for_url(url, metadata=None):
"""Yield egg or source distribution objects that might be found at a URL"""
path = urlparse.urlparse(url)[2]
base = urllib2.unquote(path.split('/')[-1])
if base.endswith('.egg'):
dist = Distribution.from_filename(base, metadata)
dist.path = url
yield dist
return # only one, unambiguous interpretation
for ext in EXTENSIONS:
if base.endswith(ext):
base = base[:-len(ext)]
break
else:
return # no extension matched
# Generate alternative interpretations of a source distro name
# Because some packages are ambiguous as to name/versions split
# e.g. "adns-python-1.1.0", "egenix-mx-commercial", etc.
# So, we generate each possible interepretation (e.g. "adns, python-1.1.0"
# "adns-python, 1.1.0", and "adns-python-1.1.0, no version"). In practice,
# the spurious interpretations should be ignored, because in the event
# there's also an "adns" package, the spurious "python-1.1.0" version will
# compare lower than any numeric version number, and is therefore unlikely
# to match a request for it. It's still a potential problem, though, and
# in the long run PyPI and the distutils should go for "safe" names and
# versions in distribution archive names (sdist and bdist).
parts = base.split('-')
for p in range(1,len(parts)+1):
yield Distribution(
url, metadata, '-'.join(parts[:p]), '-'.join(parts[p:]),
distro_type = SOURCE_DIST
)
class PackageIndex(AvailableDistributions):
"""A distribution index that scans web pages for download URLs"""
def __init__(self,index_url="http://www.python.org/pypi",*args,**kw):
AvailableDistributions.__init__(self,*args,**kw)
self.index_url = index_url + "/"[:not index_url.endswith('/')]
self.scanned_urls = {}
self.fetched_urls = {}
self.package_pages = {}
def scan_url(self, url):
self.process_url(url, True)
def process_url(self, url, retrieve=False):
if url in self.scanned_urls and not retrieve:
return
self.scanned_urls[url] = True
dists = list(distros_for_url(url))
map(self.add, dists)
if dists or not retrieve or url in self.fetched_urls:
# don't need the actual page
return
f = self.open_url(url)
self.fetched_urls[url] = self.fetched_urls[f.url] = True
if 'html' not in f.headers['content-type'].lower():
f.close() # not html, we can't process it
return
base = f.url # handle redirects
page = f.read()
f.close()
if url.startswith(self.index_url):
self.process_index(url, page)
else:
for match in HREF.finditer(page):
link = urlparse.urljoin(base, match.group(1))
self.process_url(link)
def find_packages(self,requirement):
self.scan_url(self.index_url + requirement.distname)
if not self.package_pages.get(requirement.key):
# We couldn't find the target package, so search the index page too
self.scan_url(self.index_url)
for url in self.package_pages.get(requirement.key,()):
# scan each page that might be related to the desired package
self.scan_url(url)
def process_index(self,url,page):
def scan(link):
if link.startswith(self.index_url):
parts = map(
urllib2.unquote, link[len(self.index_url):].split('/')
)
if len(parts)==2:
# it's a package page, sanitize and index it
pkg = safe_name(parts[0])
ver = safe_version(parts[1])
self.package_pages.setdefault(pkg.lower(),{})[link] = True
if url==self.index_url or 'Index of Packages</title>' in page:
# process an index page into the package-page index
for match in HREF.finditer(page):
scan( urlparse.urljoin(url, match.group(1)) )
else:
scan(url) # ensure this page is in the page index
# process individual package page
for tag in ("<th>Home Page", "<th>Download URL"):
pos = page.find(tag)
if pos!=-1:
match = HREF.search(page,pos)
if match:
# Process the found URL
self.scan_url(urlparse.urljoin(url, match.group(1)))
def obtain(self,requirement):
self.find_packages(requirement)
for dist in self.get(requirement.key, ()):
if dist in requirement:
return dist
def download(self, spec, tmpdir):
"""Locate and/or download `spec`, returning a local filename
`spec` may be a ``Requirement`` object, or a string containing a URL,
an existing local filename, or a package/version requirement spec
(i.e. the string form of a ``Requirement`` object).
If necessary, the requirement is searched for in the package index.
If the download is successful, the return value is a local file path,
and it is a subpath of `tmpdir` if the distribution had to be
downloaded. If no matching distribution is found, return ``None``.
Various errors may be raised if a problem occurs during downloading.
"""
if not isinstance(spec,Requirement):
scheme = URL_SCHEME(spec)
if scheme:
# It's a url, download it to tmpdir
return self._download_url(scheme.group(1), spec, tmpdir)
elif os.path.exists(spec):
# Existing file or directory, just return it
return spec
else:
try:
spec = Requirement.parse(spec)
except ValueError:
raise RuntimeError(
"Not a URL, existing file, or requirement spec: %r" %
(spec,)
)
# process a Requirement
dist = self.best_match(spec,[])
if dist is not None:
return self.download(dist.path, tmpdir)
return None
dl_blocksize = 8192
def _download_to(self, url, filename):
# Download the file
fp, tfp = None, None
try:
fp = self.open_url(url)
if isinstance(fp, urllib2.HTTPError):
raise RuntimeError(
"Can't download %s: %s %s" % (url, fp.code,fp.msg)
)
headers = fp.info()
blocknum = 0
bs = self.dl_blocksize
size = -1
if "content-length" in headers:
size = int(headers["Content-Length"])
self.reporthook(url, filename, blocknum, bs, size)
tfp = open(filename,'wb')
while True:
block = fp.read(bs)
if block:
tfp.write(block)
blocknum += 1
self.reporthook(url, filename, blocknum, bs, size)
else:
break
return headers
finally:
if fp: fp.close()
if tfp: tfp.close()
def reporthook(self, url, filename, blocknum, blksize, size):
pass # no-op
def open_url(self, url):
try:
return urllib2.urlopen(url)
except urllib2.HTTPError, v:
return v
except urllib2.URLError, v:
raise RuntimeError("Download error: %s" % v.reason)
def _download_url(self, scheme, url, tmpdir):
# Determine download filename
#
name = filter(None,urlparse.urlparse(url)[2].split('/'))
if name:
name = name[-1]
while '..' in name:
name = name.replace('..','.').replace('\\','_')
else:
name = "__downloaded__" # default if URL has no path contents
filename = os.path.join(tmpdir,name)
# Download the file
#
if scheme=='svn' or scheme.startswith('svn+'):
return self._download_svn(url, filename)
else:
headers = self._download_to(url, filename)
if 'html' in headers['content-type'].lower():
return self._download_html(url, headers, filename, tmpdir)
else:
return filename
def _download_html(self, url, headers, filename, tmpdir):
# Check for a sourceforge URL
sf_url = url.startswith('http://prdownloads.')
file = open(filename)
for line in file:
if line.strip():
# Check for a subversion index page
if re.search(r'<title>Revision \d+:', line):
# it's a subversion index page:
file.close()
os.unlink(filename)
return self._download_svn(url, filename)
# Check for a SourceForge header
elif sf_url:
if re.search(r'^<HTML><HEAD>', line, re.I):
continue # skip first line
elif re.search(r'<TITLE>Select a Mirror for File:',line):
# Sourceforge mirror page
page = file.read()
file.close()
os.unlink(filename)
return self._download_sourceforge(url, page, tmpdir)
break # not an index page
file.close()
raise RuntimeError("Unexpected HTML page found at "+url)
def _download_svn(self, url, filename):
os.system("svn checkout -q %s %s" % (url, filename))
return filename
def _download_sourceforge(self, source_url, sf_page, tmpdir):
"""Download package from randomly-selected SourceForge mirror"""
mirror_regex = re.compile(r'HREF=(/.*?\?use_mirror=[^>]*)')
urls = [m.group(1) for m in mirror_regex.finditer(sf_page)]
if not urls:
raise RuntimeError(
"URL looks like a Sourceforge mirror page, but no URLs found"
)
import random
url = urlparse.urljoin(source_url, random.choice(urls))
f = self.open_url(url)
match = re.search(
r'<META HTTP-EQUIV="refresh" content=".*?URL=(.*?)"',
f.read()
)
f.close()
if match:
download_url = match.group(1)
scheme = URL_SCHEME(download_url)
return self._download_url(scheme.group(1), download_url, tmpdir)
else:
raise RuntimeError(
'No META HTTP-EQUIV="refresh" found in Sourceforge page at %s'
% url
)
- Previous message: [Python-checkins] python/nondist/sandbox/setuptools EasyInstall.txt, 1.7, 1.8 easy_install.py, 1.15, 1.16
- Next message: [Python-checkins] python/dist/src/Lib/idlelib CREDITS.txt, 1.10, 1.11 ColorDelegator.py, 1.14, 1.15 EditorWindow.py, 1.67, 1.68 NEWS.txt, 1.58, 1.59 help.txt, 1.12, 1.13
- Messages sorted by:
[ date ]
[ thread ]
[ subject ]
[ author ]
More information about the Python-checkins
mailing list