[Python-checkins] gh-90385: Add `pathlib.Path.walk()` method (GH-92517)

miss-islington webhook-mailer at python.org
Fri Jul 22 19:55:50 EDT 2022


https://github.com/python/cpython/commit/c1e929858ad96fc6e41bc637e5ec9343b4f7e3c7
commit: c1e929858ad96fc6e41bc637e5ec9343b4f7e3c7
branch: main
author: Stanislav Zmiev <zertarx at gmail.com>
committer: miss-islington <31488909+miss-islington at users.noreply.github.com>
date: 2022-07-22T16:55:46-07:00
summary:

gh-90385: Add `pathlib.Path.walk()` method (GH-92517)



Automerge-Triggered-By: GH:brettcannon

files:
A Misc/NEWS.d/next/Library/2022-05-10-16-30-40.gh-issue-90385.1_wBRQ.rst
M Doc/library/pathlib.rst
M Lib/pathlib.py
M Lib/test/support/os_helper.py
M Lib/test/test_pathlib.py

diff --git a/Doc/library/pathlib.rst b/Doc/library/pathlib.rst
index 04549184be2ae..19944bd7bd0a8 100644
--- a/Doc/library/pathlib.rst
+++ b/Doc/library/pathlib.rst
@@ -946,6 +946,101 @@ call fails (for example because the path doesn't exist).
    to the directory after creating the iterator, whether a path object for
    that file be included is unspecified.
 
+.. method:: Path.walk(top_down=True, on_error=None, follow_symlinks=False)
+
+   Generate the file names in a directory tree by walking the tree
+   either top-down or bottom-up.
+
+   For each directory in the directory tree rooted at *self* (including
+   *self* but excluding '.' and '..'), the method yields a 3-tuple of
+   ``(dirpath, dirnames, filenames)``.
+
+   *dirpath* is a :class:`Path` to the directory currently being walked,
+   *dirnames* is a list of strings for the names of subdirectories in *dirpath*
+   (excluding ``'.'`` and ``'..'``), and *filenames* is a list of strings for
+   the names of the non-directory files in *dirpath*. To get a full path
+   (which begins with *self*) to a file or directory in *dirpath*, do
+   ``dirpath / name``. Whether or not the lists are sorted is file
+   system-dependent.
+
+   If the optional argument *top_down* is true (which is the default), the triple for a
+   directory is generated before the triples for any of its subdirectories
+   (directories are walked top-down).  If *top_down* is false, the triple
+   for a directory is generated after the triples for all of its subdirectories
+   (directories are walked bottom-up). No matter the value of *top_down*, the
+   list of subdirectories is retrieved before the triples for the directory and
+   its subdirectories are walked.
+
+   When *top_down* is true, the caller can modify the *dirnames* list in-place
+   (for example, using :keyword:`del` or slice assignment), and :meth:`Path.walk`
+   will only recurse into the subdirectories whose names remain in *dirnames*.
+   This can be used to prune the search, or to impose a specific order of visiting,
+   or even to inform :meth:`Path.walk` about directories the caller creates or
+   renames before it resumes :meth:`Path.walk` again. Modifying *dirnames* when
+   *top_down* is false has no effect on the behavior of :meth:`Path.walk()` since the
+   directories in *dirnames* have already been generated by the time *dirnames*
+   is yielded to the caller.
+
+   By default, errors from :func:`os.scandir` are ignored.  If the optional
+   argument *on_error* is specified, it should be a callable; it will be
+   called with one argument, an :exc:`OSError` instance. The callable can handle the
+   error to continue the walk or re-raise it to stop the walk. Note that the
+   filename is available as the ``filename`` attribute of the exception object.
+
+   By default, :meth:`Path.walk` does not follow symbolic links, and instead adds them
+   to the *filenames* list. Set *follow_symlinks* to true to resolve symlinks
+   and place them in *dirnames* and *filenames* as appropriate for their targets, and
+   consequently visit directories pointed to by symlinks (where supported).
+
+   .. note::
+
+      Be aware that setting *follow_symlinks* to true can lead to infinite
+      recursion if a link points to a parent directory of itself. :meth:`Path.walk`
+      does not keep track of the directories it has already visited.
+
+   .. note::
+      :meth:`Path.walk` assumes the directories it walks are not modified during
+      execution. For example, if a directory from *dirnames* has been replaced
+      with a symlink and *follow_symlinks* is false, :meth:`Path.walk` will
+      still try to descend into it. To prevent such behavior, remove directories
+      from *dirnames* as appropriate.
+
+   .. note::
+
+      Unlike :func:`os.walk`, :meth:`Path.walk` lists symlinks to directories in
+      *filenames* if *follow_symlinks* is false.
+
+   This example displays the number of bytes used by all files in each directory,
+   while ignoring ``__pycache__`` directories::
+
+      from pathlib import Path
+      for root, dirs, files in Path("cpython/Lib/concurrent").walk(on_error=print):
+        print(
+            root,
+            "consumes",
+            sum((root / file).stat().st_size for file in files),
+            "bytes in",
+            len(files),
+            "non-directory files"
+        )
+        if '__pycache__' in dirs:
+              dirs.remove('__pycache__')
+
+   This next example is a simple implementation of :func:`shutil.rmtree`.
+   Walking the tree bottom-up is essential as :func:`rmdir` doesn't allow
+   deleting a directory before it is empty::
+
+      # Delete everything reachable from the directory "top".
+      # CAUTION:  This is dangerous! For example, if top == Path('/'),
+      # it could delete all of your files.
+      for root, dirs, files in top.walk(topdown=False):
+          for name in files:
+              (root / name).unlink()
+          for name in dirs:
+              (root / name).rmdir()
+
+   .. versionadded:: 3.12
+
 .. method:: Path.lchmod(mode)
 
    Like :meth:`Path.chmod` but, if the path points to a symbolic link, the
@@ -1285,6 +1380,7 @@ Below is a table mapping various :mod:`os` functions to their corresponding
 :func:`os.path.expanduser`             :meth:`Path.expanduser` and
                                        :meth:`Path.home`
 :func:`os.listdir`                     :meth:`Path.iterdir`
+:func:`os.walk`                        :meth:`Path.walk`
 :func:`os.path.isdir`                  :meth:`Path.is_dir`
 :func:`os.path.isfile`                 :meth:`Path.is_file`
 :func:`os.path.islink`                 :meth:`Path.is_symlink`
diff --git a/Lib/pathlib.py b/Lib/pathlib.py
index 62dd0fa1b1237..2aee71742b23e 100644
--- a/Lib/pathlib.py
+++ b/Lib/pathlib.py
@@ -1321,6 +1321,49 @@ def expanduser(self):
 
         return self
 
+    def walk(self, top_down=True, on_error=None, follow_symlinks=False):
+        """Walk the directory tree from this directory, similar to os.walk()."""
+        sys.audit("pathlib.Path.walk", self, on_error, follow_symlinks)
+        return self._walk(top_down, on_error, follow_symlinks)
+
+    def _walk(self, top_down, on_error, follow_symlinks):
+        # We may not have read permission for self, in which case we can't
+        # get a list of the files the directory contains. os.walk
+        # always suppressed the exception then, rather than blow up for a
+        # minor reason when (say) a thousand readable directories are still
+        # left to visit. That logic is copied here.
+        try:
+            scandir_it = self._scandir()
+        except OSError as error:
+            if on_error is not None:
+                on_error(error)
+            return
+
+        with scandir_it:
+            dirnames = []
+            filenames = []
+            for entry in scandir_it:
+                try:
+                    is_dir = entry.is_dir(follow_symlinks=follow_symlinks)
+                except OSError:
+                    # Carried over from os.path.isdir().
+                    is_dir = False
+
+                if is_dir:
+                    dirnames.append(entry.name)
+                else:
+                    filenames.append(entry.name)
+
+        if top_down:
+            yield self, dirnames, filenames
+
+        for dirname in dirnames:
+            dirpath = self._make_child_relpath(dirname)
+            yield from dirpath._walk(top_down, on_error, follow_symlinks)
+
+        if not top_down:
+            yield self, dirnames, filenames
+
 
 class PosixPath(Path, PurePosixPath):
     """Path subclass for non-Windows systems.
diff --git a/Lib/test/support/os_helper.py b/Lib/test/support/os_helper.py
index 4edb1abec202a..61d12f59e4a60 100644
--- a/Lib/test/support/os_helper.py
+++ b/Lib/test/support/os_helper.py
@@ -572,7 +572,7 @@ def fs_is_case_insensitive(directory):
 
 
 class FakePath:
-    """Simple implementing of the path protocol.
+    """Simple implementation of the path protocol.
     """
     def __init__(self, path):
         self.path = path
diff --git a/Lib/test/test_pathlib.py b/Lib/test/test_pathlib.py
index a42619853399f..6f3b2a4df890b 100644
--- a/Lib/test/test_pathlib.py
+++ b/Lib/test/test_pathlib.py
@@ -2478,6 +2478,203 @@ def test_complex_symlinks_relative(self):
     def test_complex_symlinks_relative_dot_dot(self):
         self._check_complex_symlinks(os.path.join('dirA', '..'))
 
+class WalkTests(unittest.TestCase):
+
+    def setUp(self):
+        self.addCleanup(os_helper.rmtree, os_helper.TESTFN)
+
+        # Build:
+        #     TESTFN/
+        #       TEST1/              a file kid and two directory kids
+        #         tmp1
+        #         SUB1/             a file kid and a directory kid
+        #           tmp2
+        #           SUB11/          no kids
+        #         SUB2/             a file kid and a dirsymlink kid
+        #           tmp3
+        #           SUB21/          not readable
+        #             tmp5
+        #           link/           a symlink to TEST2
+        #           broken_link
+        #           broken_link2
+        #           broken_link3
+        #       TEST2/
+        #         tmp4              a lone file
+        self.walk_path = pathlib.Path(os_helper.TESTFN, "TEST1")
+        self.sub1_path = self.walk_path / "SUB1"
+        self.sub11_path = self.sub1_path / "SUB11"
+        self.sub2_path = self.walk_path / "SUB2"
+        sub21_path= self.sub2_path / "SUB21"
+        tmp1_path = self.walk_path / "tmp1"
+        tmp2_path = self.sub1_path / "tmp2"
+        tmp3_path = self.sub2_path / "tmp3"
+        tmp5_path = sub21_path / "tmp3"
+        self.link_path = self.sub2_path / "link"
+        t2_path = pathlib.Path(os_helper.TESTFN, "TEST2")
+        tmp4_path = pathlib.Path(os_helper.TESTFN, "TEST2", "tmp4")
+        broken_link_path = self.sub2_path / "broken_link"
+        broken_link2_path = self.sub2_path / "broken_link2"
+        broken_link3_path = self.sub2_path / "broken_link3"
+
+        os.makedirs(self.sub11_path)
+        os.makedirs(self.sub2_path)
+        os.makedirs(sub21_path)
+        os.makedirs(t2_path)
+
+        for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path, tmp5_path:
+            with open(path, "x", encoding='utf-8') as f:
+                f.write(f"I'm {path} and proud of it.  Blame test_pathlib.\n")
+
+        if os_helper.can_symlink():
+            os.symlink(os.path.abspath(t2_path), self.link_path)
+            os.symlink('broken', broken_link_path, True)
+            os.symlink(pathlib.Path('tmp3', 'broken'), broken_link2_path, True)
+            os.symlink(pathlib.Path('SUB21', 'tmp5'), broken_link3_path, True)
+            self.sub2_tree = (self.sub2_path, ["SUB21"],
+                              ["broken_link", "broken_link2", "broken_link3",
+                               "link", "tmp3"])
+        else:
+            self.sub2_tree = (self.sub2_path, ["SUB21"], ["tmp3"])
+
+        if not is_emscripten:
+            # Emscripten fails with inaccessible directories.
+            os.chmod(sub21_path, 0)
+        try:
+            os.listdir(sub21_path)
+        except PermissionError:
+            self.addCleanup(os.chmod, sub21_path, stat.S_IRWXU)
+        else:
+            os.chmod(sub21_path, stat.S_IRWXU)
+            os.unlink(tmp5_path)
+            os.rmdir(sub21_path)
+            del self.sub2_tree[1][:1]
+
+    def test_walk_topdown(self):
+        all = list(self.walk_path.walk())
+
+        self.assertEqual(len(all), 4)
+        # We can't know which order SUB1 and SUB2 will appear in.
+        # Not flipped:  TESTFN, SUB1, SUB11, SUB2
+        #     flipped:  TESTFN, SUB2, SUB1, SUB11
+        flipped = all[0][1][0] != "SUB1"
+        all[0][1].sort()
+        all[3 - 2 * flipped][-1].sort()
+        all[3 - 2 * flipped][1].sort()
+        self.assertEqual(all[0], (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
+        self.assertEqual(all[1 + flipped], (self.sub1_path, ["SUB11"], ["tmp2"]))
+        self.assertEqual(all[2 + flipped], (self.sub11_path, [], []))
+        self.assertEqual(all[3 - 2 * flipped], self.sub2_tree)
+
+    def test_walk_prune(self, walk_path=None):
+        if walk_path is None:
+            walk_path = self.walk_path
+        # Prune the search.
+        all = []
+        for root, dirs, files in walk_path.walk():
+            all.append((root, dirs, files))
+            if 'SUB1' in dirs:
+                # Note that this also mutates the dirs we appended to all!
+                dirs.remove('SUB1')
+
+        self.assertEqual(len(all), 2)
+        self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"]))
+
+        all[1][-1].sort()
+        all[1][1].sort()
+        self.assertEqual(all[1], self.sub2_tree)
+
+    def test_file_like_path(self):
+        self.test_walk_prune(FakePath(self.walk_path).__fspath__())
+
+    def test_walk_bottom_up(self):
+        all = list(self.walk_path.walk( top_down=False))
+
+        self.assertEqual(len(all), 4, all)
+        # We can't know which order SUB1 and SUB2 will appear in.
+        # Not flipped:  SUB11, SUB1, SUB2, TESTFN
+        #     flipped:  SUB2, SUB11, SUB1, TESTFN
+        flipped = all[3][1][0] != "SUB1"
+        all[3][1].sort()
+        all[2 - 2 * flipped][-1].sort()
+        all[2 - 2 * flipped][1].sort()
+        self.assertEqual(all[3],
+                         (self.walk_path, ["SUB1", "SUB2"], ["tmp1"]))
+        self.assertEqual(all[flipped],
+                         (self.sub11_path, [], []))
+        self.assertEqual(all[flipped + 1],
+                         (self.sub1_path, ["SUB11"], ["tmp2"]))
+        self.assertEqual(all[2 - 2 * flipped],
+                         self.sub2_tree)
+
+    @os_helper.skip_unless_symlink
+    def test_walk_follow_symlinks(self):
+        walk_it = self.walk_path.walk(follow_symlinks=True)
+        for root, dirs, files in walk_it:
+            if root == self.link_path:
+                self.assertEqual(dirs, [])
+                self.assertEqual(files, ["tmp4"])
+                break
+        else:
+            self.fail("Didn't follow symlink with follow_symlinks=True")
+
+    def test_walk_symlink_location(self):
+        # Tests whether symlinks end up in filenames or dirnames depending
+        # on the `follow_symlinks` argument.
+        walk_it = self.walk_path.walk(follow_symlinks=False)
+        for root, dirs, files in walk_it:
+            if root == self.sub2_path:
+                self.assertIn("link", files)
+                break
+        else:
+            self.fail("symlink not found")
+
+        walk_it = self.walk_path.walk(follow_symlinks=True)
+        for root, dirs, files in walk_it:
+            if root == self.sub2_path:
+                self.assertIn("link", dirs)
+                break
+
+    def test_walk_bad_dir(self):
+        errors = []
+        walk_it = self.walk_path.walk(on_error=errors.append)
+        root, dirs, files = next(walk_it)
+        self.assertEqual(errors, [])
+        dir1 = 'SUB1'
+        path1 = root / dir1
+        path1new = (root / dir1).with_suffix(".new")
+        path1.rename(path1new)
+        try:
+            roots = [r for r, _, _ in walk_it]
+            self.assertTrue(errors)
+            self.assertNotIn(path1, roots)
+            self.assertNotIn(path1new, roots)
+            for dir2 in dirs:
+                if dir2 != dir1:
+                    self.assertIn(root / dir2, roots)
+        finally:
+            path1new.rename(path1)
+
+    def test_walk_many_open_files(self):
+        depth = 30
+        base = pathlib.Path(os_helper.TESTFN, 'deep')
+        path = pathlib.Path(base, *(['d']*depth))
+        path.mkdir(parents=True)
+
+        iters = [base.walk(top_down=False) for _ in range(100)]
+        for i in range(depth + 1):
+            expected = (path, ['d'] if i else [], [])
+            for it in iters:
+                self.assertEqual(next(it), expected)
+            path = path.parent
+
+        iters = [base.walk(top_down=True) for _ in range(100)]
+        path = base
+        for i in range(depth + 1):
+            expected = (path, ['d'] if i < depth else [], [])
+            for it in iters:
+                self.assertEqual(next(it), expected)
+            path = path / 'd'
+
 
 class PathTest(_BasePathTest, unittest.TestCase):
     cls = pathlib.Path
diff --git a/Misc/NEWS.d/next/Library/2022-05-10-16-30-40.gh-issue-90385.1_wBRQ.rst b/Misc/NEWS.d/next/Library/2022-05-10-16-30-40.gh-issue-90385.1_wBRQ.rst
new file mode 100644
index 0000000000000..24aa4403f8a68
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2022-05-10-16-30-40.gh-issue-90385.1_wBRQ.rst
@@ -0,0 +1 @@
+Add :meth:`pathlib.Path.walk` as an alternative to :func:`os.walk`.



More information about the Python-checkins mailing list