[Python-checkins] distutils2: moved the upload_docs command

tarek.ziade python-checkins at python.org
Sun Jul 4 11:48:38 CEST 2010


tarek.ziade pushed fa9aa6f78f8f to distutils2:

http://hg.python.org/distutils2/rev/fa9aa6f78f8f
changeset:   260:fa9aa6f78f8f
user:        Konrad Delong <konryd at gmail.com>
date:        Sun May 09 13:37:11 2010 -0400
summary:     moved the upload_docs command
files:       src/distutils2/command/upload_docs.py, src/distutils2/tests/test_upload_docs.py

diff --git a/src/distutils2/command/upload_docs.py b/src/distutils2/command/upload_docs.py
new file mode 100644
--- /dev/null
+++ b/src/distutils2/command/upload_docs.py
@@ -0,0 +1,174 @@
+# -*- coding: utf-8 -*-
+"""upload_docs
+
+Implements a Distutils2 'upload_docs' subcommand (upload documentation to
+PyPI's packages.python.org).
+"""
+
+import os
+import socket
+import zipfile
+import httplib
+import base64
+import urlparse
+import tempfile
+import sys
+
+from distutils2 import log
+from distutils2.errors import DistutilsOptionError
+from distutils2.command.upload import upload
+
+_IS_PYTHON3 = sys.version > '3'
+
+try:
+    bytes
+except NameError:
+    bytes = str
+
+def b(str_or_bytes):
+    """Return bytes by either encoding the argument as ASCII or simply return
+    the argument as-is."""
+    if not isinstance(str_or_bytes, bytes):
+        return str_or_bytes.encode('ascii')
+    else:
+        return str_or_bytes
+
+
+class upload_docs(upload):
+
+    description = 'Upload documentation to PyPI'
+
+    user_options = [
+        ('repository=', 'r',
+         "url of repository [default: %s]" % upload.DEFAULT_REPOSITORY),
+        ('show-response', None,
+         'display full response text from server'),
+        ('upload-dir=', None, 'directory to upload'),
+        ]
+    boolean_options = upload.boolean_options
+
+    def initialize_options(self):
+        upload.initialize_options(self)
+        self.upload_dir = None
+
+    def finalize_options(self):
+        upload.finalize_options(self)
+        if self.upload_dir is None:
+            build = self.get_finalized_command('build')
+            self.upload_dir = os.path.join(build.build_base, 'docs')
+            self.mkpath(self.upload_dir)
+        self.ensure_dirname('upload_dir')
+        self.announce('Using upload directory %s' % self.upload_dir)
+
+    def create_zipfile(self):
+        name = self.distribution.metadata['Name']
+        tmp_dir = tempfile.mkdtemp()
+        tmp_file = os.path.join(tmp_dir, "%s.zip" % name)
+        zip_file = zipfile.ZipFile(tmp_file, "w")
+        for root, dirs, files in os.walk(self.upload_dir):
+            if root == self.upload_dir and not files:
+                raise DistutilsOptionError(
+                    "no files found in upload directory '%s'"
+                    % self.upload_dir)
+            for name in files:
+                full = os.path.join(root, name)
+                relative = root[len(self.upload_dir):].lstrip(os.path.sep)
+                dest = os.path.join(relative, name)
+                zip_file.write(full, dest)
+        zip_file.close()
+        return tmp_file
+
+    def run(self):
+        zip_file = self.create_zipfile()
+        self.upload_file(zip_file)
+
+    def upload_file(self, filename):
+        content = open(filename, 'rb').read()
+        meta = self.distribution.metadata
+        data = {
+            ':action': 'doc_upload',
+            'name': meta['Name'],
+            'content': (os.path.basename(filename), content),
+        }
+        # set up the authentication
+        credentials = self.username + ':' + self.password
+        if _IS_PYTHON3:  # base64 only works with bytes in Python 3.
+            encoded_creds = base64.encodebytes(credentials.encode('utf8'))
+            auth = bytes("Basic ")
+        else:
+            encoded_creds = base64.encodestring(credentials)
+            auth = "Basic "
+        auth += encoded_creds.strip()
+
+        # Build up the MIME payload for the POST data
+        boundary = b('--------------GHSKFJDLGDS7543FJKLFHRE75642756743254')
+        sep_boundary = b('\n--') + boundary
+        end_boundary = sep_boundary + b('--')
+        body = []
+        for key, values in data.items():
+            # handle multiple entries for the same name
+            if type(values) != type([]):
+                values = [values]
+            for value in values:
+                if type(value) is tuple:
+                    fn = b(';filename="%s"' % value[0])
+                    value = value[1]
+                else:
+                    fn = b("")
+                body.append(sep_boundary)
+                body.append(b('\nContent-Disposition: form-data; name="%s"'%key))
+                body.append(fn)
+                body.append(b("\n\n"))
+                body.append(b(value))
+                if value and value[-1] == b('\r'):
+                    body.append(b('\n'))  # write an extra newline (lurve Macs)
+        body.append(end_boundary)
+        body.append(b("\n"))
+        body = b('').join(body)
+
+        self.announce("Submitting documentation to %s" % (self.repository),
+                      log.INFO)
+
+        # build the Request
+        # We can't use urllib2 since we need to send the Basic
+        # auth right with the first request
+        schema, netloc, url, params, query, fragments = \
+            urlparse.urlparse(self.repository)
+        assert not params and not query and not fragments
+        if schema == 'http':
+            conn = httplib.HTTPConnection(netloc)
+        elif schema == 'https':
+            conn = httplib.HTTPSConnection(netloc)
+        else:
+            raise AssertionError("unsupported schema "+schema)
+
+        data = ''
+        loglevel = log.INFO
+        try:
+            conn.connect()
+            conn.putrequest("POST", url)
+            conn.putheader('Content-type',
+                           'multipart/form-data; boundary=%s'%boundary)
+            conn.putheader('Content-length', str(len(body)))
+            conn.putheader('Authorization', auth)
+            conn.endheaders()
+            conn.send(body)
+        except socket.error, e:
+            self.announce(str(e), log.ERROR)
+            return
+
+        r = conn.getresponse()
+        if r.status == 200:
+            self.announce('Server response (%s): %s' % (r.status, r.reason),
+                          log.INFO)
+        elif r.status == 301:
+            location = r.getheader('Location')
+            if location is None:
+                location = 'http://packages.python.org/%s/' % meta['Name']
+            self.announce('Upload successful. Visit %s' % location,
+                          log.INFO)
+        else:
+            self.announce('Upload failed (%s): %s' % (r.status, r.reason),
+                          log.ERROR)
+        if self.show_response:
+            print '-'*75, r.read(), '-'*75
diff --git a/src/distutils2/tests/test_upload_docs.py b/src/distutils2/tests/test_upload_docs.py
new file mode 100644
--- /dev/null
+++ b/src/distutils2/tests/test_upload_docs.py
@@ -0,0 +1,69 @@
+"""build_ext tests
+"""
+import sys, os, shutil, tempfile, unittest2, site, zipfile
+from distutils2.command.upload_docs import upload_docs
+from distutils2.dist import Distribution
+
+SETUP_PY = """\
+from distutils2.core import setup
+
+setup(name='foo')
+"""
+
+class TestUploadDocsTest(unittest2.TestCase):
+    def setUp(self):
+        self.dir = tempfile.mkdtemp()
+        setup = os.path.join(self.dir, 'setup.py')
+        f = open(setup, 'w')
+        f.write(SETUP_PY)
+        f.close()
+        self.old_cwd = os.getcwd()
+        os.chdir(self.dir)
+        
+        self.upload_dir = os.path.join(self.dir, 'build')
+        os.mkdir(self.upload_dir)
+        
+        # A test document.
+        f = open(os.path.join(self.upload_dir, 'index.html'), 'w')
+        f.write("Hello world.")
+        f.close()
+        
+        # An empty folder.
+        os.mkdir(os.path.join(self.upload_dir, 'empty'))
+        
+        if sys.version >= "2.6":
+            self.old_base = site.USER_BASE
+            site.USER_BASE = upload_docs.USER_BASE = tempfile.mkdtemp()
+            self.old_site = site.USER_SITE
+            site.USER_SITE = upload_docs.USER_SITE = tempfile.mkdtemp()
+    
+    def tearDown(self):
+        os.chdir(self.old_cwd)
+        shutil.rmtree(self.dir)
+        if sys.version >= "2.6":
+            shutil.rmtree(site.USER_BASE)
+            shutil.rmtree(site.USER_SITE)
+            site.USER_BASE = self.old_base
+            site.USER_SITE = self.old_site
+
+    def test_create_zipfile(self):
+        # Test to make sure zipfile creation handles common cases.
+        # This explicitly includes a folder containing an empty folder.
+        
+        dist = Distribution()
+        
+        cmd = upload_docs(dist)
+        cmd.upload_dir = self.upload_dir
+        zip_file = cmd.create_zipfile()
+        
+        assert zipfile.is_zipfile(zip_file)
+        
+        zip_f = zipfile.ZipFile(zip_file) # woh...
+        
+        assert zip_f.namelist() == ['index.html']
+
+def test_suite():
+    return unittest2.makeSuite(TestUploadDocsTest)
+
+if __name__ == "__main__":
+    unittest2.main(defaultTest="test_suite")

--
Repository URL: http://hg.python.org/distutils2


More information about the Python-checkins mailing list