[pypy-commit] pypy py3.5: Enough os.scandir() to pass our own tests on Windows

arigo pypy.commits at gmail.com
Wed Dec 20 06:15:26 EST 2017


Author: Armin Rigo <arigo at tunes.org>
Branch: py3.5
Changeset: r93510:6deed1172830
Date: 2017-12-20 10:11 +0100
http://bitbucket.org/pypy/pypy/changeset/6deed1172830/

Log:	Enough os.scandir() to pass our own tests on Windows

diff --git a/pypy/module/posix/interp_scandir.py b/pypy/module/posix/interp_scandir.py
--- a/pypy/module/posix/interp_scandir.py
+++ b/pypy/module/posix/interp_scandir.py
@@ -135,9 +135,6 @@
     assert 0 <= rposix_scandir.DT_LNK <= 255
     FLAG_STAT  = 256
     FLAG_LSTAT = 512
-else:
-    FLAG_STAT  = 256
-    # XXX lstat and symlinks are not implemented on Windows
 
 
 class W_DirEntry(W_Root):
@@ -149,9 +146,9 @@
         self.name = name     # always bytes on Posix; always unicode on Windows
         self.inode = inode
         self.flags = known_type
-        assert known_type == (known_type & 255)
         #
         if not _WIN32:
+            assert known_type == (known_type & 255)
             w_name = self.space.newbytes(name)
             if not scandir_iterator.result_is_bytes:
                 w_name = self.space.fsdecode(w_name)
@@ -178,93 +175,109 @@
     # the end of the class.  Every method only calls methods *before*
     # it in program order, so there is no cycle.
 
-    def get_lstat(self):
-        """Get the lstat() of the direntry."""
-        if (self.flags & FLAG_LSTAT) == 0:
-            # Unlike CPython, try to use fstatat() if possible
-            dirfd = self.scandir_iterator.dirfd
-            if dirfd != -1 and rposix.HAVE_FSTATAT:
-                st = rposix_stat.fstatat(self.name, dirfd,
-                                         follow_symlinks=False)
-            else:
-                path = self.space.fsencode_w(self.fget_path(self.space))
-                st = rposix_stat.lstat(path)
-            self.d_lstat = st
-            self.flags |= FLAG_LSTAT
-        return self.d_lstat
-
-    def get_stat(self):
-        """Get the stat() of the direntry.  This is implemented in
-        such a way that it won't do both a stat() and a lstat().
-        """
-        if (self.flags & FLAG_STAT) == 0:
-            # We don't have the 'd_stat'.  If the known_type says the
-            # direntry is not a DT_LNK, then try to get and cache the
-            # 'd_lstat' instead.  Then, or if we already have a
-            # 'd_lstat' from before, *and* if the 'd_lstat' is not a
-            # S_ISLNK, we can reuse it unchanged for 'd_stat'.
-            #
-            # Note how, in the common case where the known_type says
-            # it is a DT_REG or DT_DIR, then we call and cache lstat()
-            # and that's it.  Also note that in a d_type-less OS or on
-            # a filesystem that always answer DT_UNKNOWN, this method
-            # will instead only call at most stat(), but not cache it
-            # as 'd_lstat'.
-            known_type = self.flags & 255
-            if (known_type != rposix_scandir.DT_UNKNOWN and
-                known_type != rposix_scandir.DT_LNK):
-                self.get_lstat()    # fill the 'd_lstat' cache
-                have_lstat = True
-            else:
-                have_lstat = (self.flags & FLAG_LSTAT) != 0
-
-            if have_lstat:
-                # We have the lstat() but not the stat().  They are
-                # the same, unless the 'd_lstat' is a S_IFLNK.
-                must_call_stat = stat.S_ISLNK(self.d_lstat.st_mode)
-            else:
-                must_call_stat = True
-
-            if must_call_stat:
-                # Must call stat().  Try to use fstatat() if possible
+    if not _WIN32:
+        def get_lstat(self):
+            """Get the lstat() of the direntry."""
+            if (self.flags & FLAG_LSTAT) == 0:
+                # Unlike CPython, try to use fstatat() if possible
                 dirfd = self.scandir_iterator.dirfd
-                if dirfd != -1 and rposix.HAVE_FSTATAT:
+                if rposix.HAVE_FSTATAT and dirfd != -1:
                     st = rposix_stat.fstatat(self.name, dirfd,
-                                             follow_symlinks=True)
+                                             follow_symlinks=False)
                 else:
                     path = self.space.fsencode_w(self.fget_path(self.space))
-                    st = rposix_stat.stat(path)
+                    st = rposix_stat.lstat(path)
+                self.d_lstat = st
+                self.flags |= FLAG_LSTAT
+            return self.d_lstat
+
+        def get_stat(self):
+            """Get the stat() of the direntry.  This is implemented in
+            such a way that it won't do both a stat() and a lstat().
+            """
+            if (self.flags & FLAG_STAT) == 0:
+                # We don't have the 'd_stat'.  If the known_type says the
+                # direntry is not a DT_LNK, then try to get and cache the
+                # 'd_lstat' instead.  Then, or if we already have a
+                # 'd_lstat' from before, *and* if the 'd_lstat' is not a
+                # S_ISLNK, we can reuse it unchanged for 'd_stat'.
+                #
+                # Note how, in the common case where the known_type says
+                # it is a DT_REG or DT_DIR, then we call and cache lstat()
+                # and that's it.  Also note that in a d_type-less OS or on
+                # a filesystem that always answer DT_UNKNOWN, this method
+                # will instead only call at most stat(), but not cache it
+                # as 'd_lstat'.
+                known_type = self.flags & 255
+                if (known_type != rposix_scandir.DT_UNKNOWN and
+                    known_type != rposix_scandir.DT_LNK):
+                    self.get_lstat()    # fill the 'd_lstat' cache
+                    have_lstat = True
+                else:
+                    have_lstat = (self.flags & FLAG_LSTAT) != 0
+
+                if have_lstat:
+                    # We have the lstat() but not the stat().  They are
+                    # the same, unless the 'd_lstat' is a S_IFLNK.
+                    must_call_stat = stat.S_ISLNK(self.d_lstat.st_mode)
+                else:
+                    must_call_stat = True
+
+                if must_call_stat:
+                    # Must call stat().  Try to use fstatat() if possible
+                    dirfd = self.scandir_iterator.dirfd
+                    if dirfd != -1 and rposix.HAVE_FSTATAT:
+                        st = rposix_stat.fstatat(self.name, dirfd,
+                                                 follow_symlinks=True)
+                    else:
+                        path = self.space.fsencode_w(self.fget_path(self.space))
+                        st = rposix_stat.stat(path)
+                else:
+                    st = self.d_lstat
+
+                self.d_stat = st
+                self.flags |= FLAG_STAT
+            return self.d_stat
+
+        def get_stat_or_lstat(self, follow_symlinks):
+            if follow_symlinks:
+                return self.get_stat()
             else:
-                st = self.d_lstat
+                return self.get_lstat()
 
-            self.d_stat = st
-            self.flags |= FLAG_STAT
-        return self.d_stat
+        def check_mode(self, follow_symlinks):
+            """Get the stat() or lstat() of the direntry, and return the
+            S_IFMT.  If calling stat()/lstat() gives us ENOENT, return -1
+            instead; it is better to give up and answer "no, not this type"
+            to requests, rather than propagate the error.
+            """
+            try:
+                st = self.get_stat_or_lstat(follow_symlinks)
+            except OSError as e:
+                if e.errno == ENOENT:    # not found
+                    return -1
+                raise wrap_oserror2(self.space, e, self.fget_path(self.space),
+                                    eintr_retry=False)
+            return stat.S_IFMT(st.st_mode)
 
-    def get_stat_or_lstat(self, follow_symlinks):
-        if follow_symlinks:
-            return self.get_stat()
-        else:
-            return self.get_lstat()
+    else:
+        # Win32
+        stat_cached = False
 
-    def check_mode(self, follow_symlinks):
-        """Get the stat() or lstat() of the direntry, and return the
-        S_IFMT.  If calling stat()/lstat() gives us ENOENT, return -1
-        instead; it is better to give up and answer "no, not this type"
-        to requests, rather than propagate the error.
-        """
-        try:
-            st = self.get_stat_or_lstat(follow_symlinks)
-        except OSError as e:
-            if e.errno == ENOENT:    # not found
-                return -1
-            raise wrap_oserror2(self.space, e, self.fget_path(self.space),
-                                eintr_retry=False)
-        return stat.S_IFMT(st.st_mode)
+        def check_mode(self, follow_symlinks):
+            return self.flags
+
+        def get_stat_or_lstat(self, follow_symlinks):     # 'follow_symlinks' ignored
+            if not self.stat_cached:
+                path = self.space.unicode_w(self.fget_path(self.space))
+                self.d_stat = rposix_stat.stat(path)
+                self.stat_cached = True
+            return self.d_stat
+
 
     def is_dir(self, follow_symlinks):
         known_type = self.flags & 255
-        if known_type != rposix_scandir.DT_UNKNOWN:
+        if not _WIN32 and known_type != rposix_scandir.DT_UNKNOWN:
             if known_type == rposix_scandir.DT_DIR:
                 return True
             elif follow_symlinks and known_type == rposix_scandir.DT_LNK:
@@ -275,7 +288,7 @@
 
     def is_file(self, follow_symlinks):
         known_type = self.flags & 255
-        if known_type != rposix_scandir.DT_UNKNOWN:
+        if not _WIN32 and known_type != rposix_scandir.DT_UNKNOWN:
             if known_type == rposix_scandir.DT_REG:
                 return True
             elif follow_symlinks and known_type == rposix_scandir.DT_LNK:
@@ -287,7 +300,7 @@
     def is_symlink(self):
         """Check if the direntry is a symlink.  May get the lstat()."""
         known_type = self.flags & 255
-        if known_type != rposix_scandir.DT_UNKNOWN:
+        if not _WIN32 and known_type != rposix_scandir.DT_UNKNOWN:
             return known_type == rposix_scandir.DT_LNK
         return self.check_mode(follow_symlinks=False) == stat.S_IFLNK
 
@@ -316,7 +329,15 @@
         return build_stat_result(space, st)
 
     def descr_inode(self, space):
-        return space.newint(self.inode)
+        inode = self.inode
+        if inode is None:    # _WIN32
+            try:
+                st = self.get_stat_or_lstat(follow_symlinks=False)
+            except OSError as e:
+                raise wrap_oserror2(space, e, self.fget_path(space),
+                                    eintr_retry=False)
+            inode = st.st_ino
+        return space.newint(inode)
 
 
 W_DirEntry.typedef = TypeDef(
diff --git a/pypy/module/posix/test/test_scandir.py b/pypy/module/posix/test/test_scandir.py
--- a/pypy/module/posix/test/test_scandir.py
+++ b/pypy/module/posix/test/test_scandir.py
@@ -1,4 +1,5 @@
 import sys, os
+import py
 from rpython.tool.udir import udir
 from pypy.module.posix.test import test_posix2
 
@@ -97,6 +98,8 @@
         assert d.stat().st_mode & 0o170000 == 0o100000    # S_IFREG
         assert d.stat().st_size == 0
 
+    @py.test.mark.skipif(sys.platform == "win32",
+                         reason="no symlink support so far")
     def test_stat4(self):
         posix = self.posix
         d = next(posix.scandir(self.dir4))
@@ -126,6 +129,8 @@
         assert not d.is_file(follow_symlinks=False)
         assert     d.is_dir(follow_symlinks=False)
 
+    @py.test.mark.skipif(sys.platform == "win32",
+                         reason="no symlink support so far")
     def test_dir3(self):
         posix = self.posix
         d = next(posix.scandir(self.dir3))
@@ -136,6 +141,8 @@
         assert     d.is_file(follow_symlinks=True)
         assert not d.is_file(follow_symlinks=False)
 
+    @py.test.mark.skipif(sys.platform == "win32",
+                         reason="no symlink support so far")
     def test_dir4(self):
         posix = self.posix
         d = next(posix.scandir(self.dir4))
@@ -146,6 +153,8 @@
         assert     d.is_dir(follow_symlinks=True)
         assert not d.is_dir(follow_symlinks=False)
 
+    @py.test.mark.skipif(sys.platform == "win32",
+                         reason="no symlink support so far")
     def test_dir5(self):
         posix = self.posix
         d = next(posix.scandir(self.dir5))
@@ -155,6 +164,8 @@
         assert     d.is_symlink()
         raises(OSError, d.stat)
 
+    @py.test.mark.skipif(sys.platform == "win32",
+                         reason="no symlink support so far")
     def test_dir6(self):
         posix = self.posix
         d = next(posix.scandir(self.dir6))
diff --git a/rpython/rlib/rposix_scandir.py b/rpython/rlib/rposix_scandir.py
--- a/rpython/rlib/rposix_scandir.py
+++ b/rpython/rlib/rposix_scandir.py
@@ -1,6 +1,7 @@
 from rpython.rlib import rposix, rwin32
 from rpython.rlib.objectmodel import specialize
 from rpython.rtyper.lltypesystem import lltype, rffi
+from rpython.rlib.rarithmetic import intmask
 
 
 if not rwin32.WIN32:
@@ -54,8 +55,10 @@
 
 else:
     # ----- Win32 version -----
+    import stat
     from rpython.rlib._os_support import unicode_traits
     from rpython.rlib.rwin32file import make_win32_traits
+    from rpython.rlib import rposix_stat
 
     win32traits = make_win32_traits(unicode_traits)
 
@@ -70,21 +73,6 @@
             if self.hFindFile != rwin32.INVALID_HANDLE_VALUE:
                 win32traits.FindClose(self.hFindFile)
 
-    class DirEntP:
-        def __init__(self, filedata):
-            self.filedata = filedata
-            # ^^^ note that this structure is overwritten by the next() call, so
-            # we must copy a few pieces of information out of it now:
-            self.dwFileAttributes = filedata.c_dwFileAttributes
-            self.CreationTimeLow = filedata.c_ftCreationTime.c_dwLowDateTime
-            self.CreationTimeHigh = filedata.c_ftCreationTime.c_dwHighDateTime
-            self.LastAccessTimeLow = filedata.c_ftLastAccessTime.c_dwLowDateTime
-            self.LastAccessTimeHigh = filedata.c_ftLastAccessTime.c_dwHighDateTime
-            self.LastWriteTimeLow = filedata.c_ftLastWriteTime.c_dwLowDateTime
-            self.LastWriteTimeHigh = filedata.c_ftLastWriteTime.c_dwHighDateTime
-            self.nFileSizeHigh = filedata.c_nFileSizeHigh
-            self.nFileSizeLow = filedata.c_nFileSizeLow
-
 
     # must only be called with unicode!
     def opendir(path):
@@ -113,7 +101,7 @@
         Use the methods has_xxx() and get_xxx() to read from that
         opaque object.  The opaque object is valid until the next
         time nextentry() or closedir() is called.  This may raise
-        WindowsError, or return None when exhausted.  Note
+        WindowsError, or return NULL when exhausted.  Note
         that this doesn't filter out the "." and ".." entries.
         """
         if dirp.first_time:
@@ -123,16 +111,18 @@
                 # error or no more files
                 error = rwin32.GetLastError_saved()
                 if error == win32traits.ERROR_NO_MORE_FILES:
-                    return None
+                    return lltype.nullptr(win32traits.WIN32_FIND_DATA)
                 raise WindowsError(error,  "FindNextFileW failed")
-        return DirEntP(dirp.filedata)
+        return dirp.filedata
 
-    def get_name_unicode(direntp):
+    def get_name_unicode(filedata):
         return unicode_traits.charp2str(rffi.cast(unicode_traits.CCHARP,
-                                                  direntp.filedata.c_cFileName))
+                                                  filedata.c_cFileName))
 
     def get_known_type(filedata):
-        return 0
+        attr = filedata.c_dwFileAttributes
+        st_mode = rposix_stat.win32_attributes_to_mode(win32traits, attr)
+        return stat.S_IFMT(st_mode)
 
     def get_inode(filedata):
         return None


More information about the pypy-commit mailing list