[issue1599254] mailbox: other programs' messages can vanish without trace

David Watson report at bugs.python.org
Sun Mar 30 23:50:12 CEST 2014


David Watson added the comment:

On Sun 23 Mar 2014, David Watson wrote:
> There actually isn't a test for the original locking issue, as
> the tests all use the mailbox API, which doesn't provide a way to
> turn off dot-locking. ...

On second thought, the underlying error doesn't actually have
anything to do with locking - another process can open the
mailbox, and mailbox.py can replace the file before that process
even tries to lock it.  Andrew's test_concurrent_append()
originally *did* test for this, but commit c37cb11b546f changed
the single-file mailbox classes to just sync the file rather than
replacing it when messages have only been added, as in that test,
meaning it's no longer effective (for that issue at least).

I've now made a test for the original renaming-vs-copying issue
which just uses the simple _subprocess mechanism that the other
tests use, rather than trying to set up any special locking
conditions.  I've also split the tests into two patches -
mailbox-tests-2.7-part1-for-copy-back.diff has the test for the
original issue, and passes with the copy-back patch applied,
while mailbox-tests-2.7-part2.diff applies on top and includes
the rest (these involve the separate issues around rereading, and
mostly fail).

----------
Added file: http://bugs.python.org/file34672/mailbox-tests-2.7-part1-for-copy-back.diff
Added file: http://bugs.python.org/file34673/mailbox-tests-2.7-part2.diff

_______________________________________
Python tracker <report at bugs.python.org>
<http://bugs.python.org/issue1599254>
_______________________________________
-------------- next part --------------
# HG changeset patch
# Parent a293c01337a62718938d9639653cf1b8dfffa054

diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py
--- a/Lib/test/test_mailbox.py
+++ b/Lib/test/test_mailbox.py
@@ -13,6 +13,7 @@ from test import test_support
 import unittest
 import mailbox
 import glob
+from contextlib import contextmanager
 try:
     import fcntl
 except ImportError:
@@ -21,6 +22,27 @@ except ImportError:
 # Silence Py3k warning
 rfc822 = test_support.import_module('rfc822', deprecated=True)
 
+try:
+    import multiprocessing
+except ImportError:
+    multiprocessing = None
+else:
+    @contextmanager
+    def child_process(func, *args, **kwargs):
+        """Context manager to run a function concurrently in a child process.
+
+        Runs func(*args, **kwargs) in a subprocess using
+        multiprocessing and waits for it to terminate.
+
+        """
+        process = multiprocessing.Process(target=func, args=args, kwargs=kwargs)
+        try:
+            process.start()
+            yield
+        finally:
+            process.join()
+
+
 class TestBase:
 
     def _check_sample(self, msg):
@@ -45,6 +67,53 @@ class TestBase:
             test_support.unlink(target)
 
 
+def add_message(factory, path, msg):
+    # Add "msg" to mailbox at "path", using mailbox instance returned
+    # by "factory".
+    mbox = factory(path)
+    try:
+        mbox.lock()
+        mbox.add(msg)
+    finally:
+        mbox.close()
+
+def add_two_delete_one(factory, path, msg1, msg2):
+    # Add "msg1" and "msg2" to mailbox at "path", then delete "msg1".
+    # Uses mailbox instance returned by "factory".
+    mbox = factory(path)
+    try:
+        mbox.lock()
+        key = mbox.add(msg1)
+        mbox.add(msg2)
+        # Flushing out two messages then deleting one ensures that for
+        # the single-file mailbox formats, the subsequent flush has to
+        # rewrite the mailbox.
+        mbox.flush()
+        del mbox[key]
+        mbox.flush()
+    finally:
+        mbox.close()
+
+def only_yield():
+    yield
+
+def child_func(to_child, from_parent, child, child_args):
+    # Used by _subprocess method below.  Waits for Connection object
+    # "from_parent" to receive EOF, and then calls "child" with
+    # arguments "child_args".
+    try:
+        to_child.close()
+        try:
+            from_parent.recv()
+        except EOFError:
+            pass
+        else:
+            raise AssertionError("Unexpectedly received data from parent "
+                                 "process.")
+    finally:
+        from_parent.close()
+    child(*child_args)
+
 class TestMailbox(TestBase):
 
     _factory = None     # Overridden by subclasses to reuse tests
@@ -59,6 +128,166 @@ class TestMailbox(TestBase):
         self._box.close()
         self._delete_recursively(self._path)
 
+    def _acquire_lock(self, mbox=None):
+        # Keep trying to acquire lock on self._box (or mbox if given)
+        # until we get it.
+        if mbox is None:
+            mbox = self._box
+        while True:
+            try:
+                mbox.lock()
+                break
+            except mailbox.ExternalClashError:
+                time.sleep(0.01)
+
+    @contextmanager
+    def _locked(self, mbox=None):
+        # Context manager to lock and unlock self._box, or mbox if given.
+        if mbox is None:
+            mbox = self._box
+        try:
+            self._acquire_lock(mbox)
+            yield
+        finally:
+            mbox.unlock()
+
+    def _compare_mailbox(self, mapping, other=(), mbox=None):
+        # Check that .as_string() values of mbox contents match
+        # strings in "mapping" and "other".  Messages in "mapping"
+        # must be present under their respective keys, while messages
+        # in "other" may have any key.  No other messages may be
+        # present in mbox.
+        if mbox is None:
+            mbox = self._box
+        self.assertEqual(len(mbox), len(mapping) + len(other))
+        other = list(other)
+        for key in mbox.iterkeys():
+            msgstr = mbox[key].as_string()
+            if key in mapping:
+                self.assertEqual(mapping[key], msgstr)
+            else:
+                self.assertIn(msgstr, other)
+                del other[other.index(msgstr)]
+
+    def _subprocess(self, parent, child, child_args, inspect=None, path=None,
+                    lock1=False, lock2=False, flush=False):
+        # Method to run code in parent and child processes under
+        # various conditions.  The function "child" is run in the
+        # child process with arguments "child_args", while "parent"
+        # should be a generator function which yields when it wants to
+        # allow the child to run; once the child has returned, the
+        # generator will be resumed.  Finally, the function "inspect"
+        # will be run.  Both "parent" and "inspect" are called with no
+        # arguments, and separate mailbox instances on self._box.
+        #
+        # If "lock1" is true, self._box will be locked when the first
+        # step of the parent generator is run, and unlocked when it
+        # yields.  If "flush" is true, self._box.flush() will be
+        # called when the generator first yields, before releasing the
+        # lock (if set) and allowing the child to run.  If "lock2" is
+        # true, self._box will be locked during the second step.
+        if multiprocessing is None:
+            self.skipTest("requires multiprocessing")
+        if path is None:
+            path = self._path
+        @contextmanager
+        def nullcm(*args, **kwargs):
+            yield
+        lock1cm = self._locked if lock1 else nullcm
+        lock2cm = self._locked if lock2 else nullcm
+        self._box.close()
+        self._delete_recursively(self._path)
+        from_parent, to_child = multiprocessing.Pipe(duplex=False)
+        with child_process(child_func, to_child, from_parent,
+                           child, child_args):
+            try:
+                from_parent.close()
+                self._box = self._factory(path)
+                parent_iter = parent()
+                with lock1cm():
+                    parent_iter.next()
+                    if flush:
+                        self._box.flush()
+            finally:
+                to_child.close()  # Allow child to continue
+        with lock2cm():
+            try:
+                parent_iter.next()
+            except StopIteration:
+                pass
+        self._box.close()
+        if inspect is not None:
+            self._box = self._factory(path)
+            inspect()
+
+    def _subprocess_correct(self, parent, child, child_args,
+                            inspect=None, path=None):
+        # Run with proper locking and flushing in parent.
+        self._subprocess(parent, child, child_args, inspect, path,
+                         lock1=True, lock2=True, flush=True)
+
+    def _subprocess_modify_unlocked_flush(self, parent, child, child_args,
+                                          inspect=None, path=None):
+        # Run first step unlocked, but flush before yielding to child.
+        self._subprocess(parent, child, child_args, inspect, path,
+                         lock1=False, lock2=True, flush=True)
+
+    def _subprocess_modify_unlocked(self, parent, child, child_args,
+                                    inspect=None, path=None):
+        # Run first step without locks, and yield to child without flushing.
+        self._subprocess(parent, child, child_args, inspect, path,
+                         lock1=False, lock2=True, flush=False)
+
+    def _subprocess_tests(self, parent, child, child_args,
+                          inspect=None, path=None):
+        # Run with some particular conditions we want to test for.
+        # (Note that since the actions of parent and child are
+        # explicitly serialized, the mailbox should not be corrupted
+        # whether they use locking or not.)
+        self._subprocess_correct(parent, child, child_args, inspect, path)
+        self._subprocess_modify_unlocked_flush(parent, child, child_args,
+                                               inspect, path)
+        self._subprocess_modify_unlocked(parent, child, child_args,
+                                         inspect, path)
+
+    def test_subprocess(self):
+        # Check that self._subprocess runs OK with various options.
+        for n in range(8):
+            self._subprocess(only_yield, only_yield, (), lambda: None,
+                             lock1=(n & 4), lock2=(n & 2), flush=(n & 1))
+
+    def test_add_by_other(self):
+        # Check that other process can add a message and we can read it.
+        msg = self._template % 0
+        def parent():
+            yield
+            self._compare_mailbox({}, [msg])
+        self._subprocess_tests(parent, add_message,
+                               (self._factory, self._path, msg))
+
+    def test_child_add_and_delete(self):
+        # (Issue #1599254) Check that when one process (in this case
+        # the child) makes changes which require rewriting a
+        # single-file mailbox, it does not interfere with another
+        # process which already has the mailbox open and is about to
+        # modify it, but hasn't (yet) acquired a lock on it, or
+        # written anything.  Previously the rewritten mailbox was
+        # renamed over the original, causing the other process to
+        # write its own changes (e.g. new mail) into a deleted file.
+        msgp = self._template % "p"
+        msgc1 = self._template % "c1"
+        msgc2 = self._template % "c2"
+        def parent():
+            # self._box has already been created at this point, and
+            # the .add() below will use the same instance.
+            yield
+            self._box.add(msgp)
+        def inspect():
+            self._compare_mailbox({}, [msgc2, msgp])
+        self._subprocess_tests(parent, add_two_delete_one,
+                               (self._factory, self._path, msgc1, msgc2),
+                               inspect)
+
     def test_add(self):
         # Add copies of a sample message
         keys = []
@@ -494,11 +723,13 @@ class TestMailboxSuperclass(TestBase, un
         self.assertRaises(NotImplementedError, lambda: box.close())
 
 
+def factory_Maildir(path, factory=None):
+    return mailbox.Maildir(path, factory)
+
 class TestMaildir(TestMailbox, unittest.TestCase):
 
-    _factory = lambda self, path, factory=None: mailbox.Maildir(path, factory)
-
     def setUp(self):
+        self._factory = factory_Maildir
         TestMailbox.setUp(self)
         if os.name in ('nt', 'os2') or sys.platform == 'cygwin':
             self._box.colon = '!'
@@ -985,9 +1216,14 @@ class _TestMboxMMDF(_TestSingleFile):
         self._box.close()
 
 
+def factory_mbox(path, factory=None):
+    return mailbox.mbox(path, factory)
+
 class TestMbox(_TestMboxMMDF, unittest.TestCase):
 
-    _factory = lambda self, path, factory=None: mailbox.mbox(path, factory)
+    def setUp(self):
+        self._factory = factory_mbox
+        _TestMboxMMDF.setUp(self)
 
     @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
     @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
@@ -1032,14 +1268,24 @@ class TestMbox(_TestMboxMMDF, unittest.T
             self.assertEqual(data[-3:], '0\n\n')
 
 
+def factory_MMDF(path, factory=None):
+    return mailbox.MMDF(path, factory)
+
 class TestMMDF(_TestMboxMMDF, unittest.TestCase):
 
-    _factory = lambda self, path, factory=None: mailbox.MMDF(path, factory)
+    def setUp(self):
+        self._factory = factory_MMDF
+        _TestMboxMMDF.setUp(self)
 
 
+def factory_MH(path, factory=None):
+    return mailbox.MH(path, factory)
+
 class TestMH(TestMailbox, unittest.TestCase):
 
-    _factory = lambda self, path, factory=None: mailbox.MH(path, factory)
+    def setUp(self):
+        self._factory = factory_MH
+        TestMailbox.setUp(self)
 
     def test_list_folders(self):
         # List folders
@@ -1169,9 +1415,14 @@ class TestMH(TestMailbox, unittest.TestC
         return os.path.join(self._path, '.mh_sequences.lock')
 
 
+def factory_Babyl(path, factory=None):
+    return mailbox.Babyl(path, factory)
+
 class TestBabyl(_TestSingleFile, unittest.TestCase):
 
-    _factory = lambda self, path, factory=None: mailbox.Babyl(path, factory)
+    def setUp(self):
+        self._factory = factory_Babyl
+        _TestSingleFile.setUp(self)
 
     def tearDown(self):
         self._box.close()
-------------- next part --------------
# HG changeset patch
# Parent 94e6a8993abb1e2db3ecea065879f6304250c0c0

diff --git a/Lib/test/test_mailbox.py b/Lib/test/test_mailbox.py
--- a/Lib/test/test_mailbox.py
+++ b/Lib/test/test_mailbox.py
@@ -67,6 +67,40 @@ class TestBase:
             test_support.unlink(target)
 
 
+def random_message():
+    # Generate a random message body
+    import random
+    body = ""
+    for i in range(random.randint(1, 10)):
+        line = "a" * random.randint(0, 75) + '\n'
+        body += line
+
+    return body
+
+def add_25_messages(factory, path):
+    "Helper function to add 25 messages to a mailbox."
+    mbox = factory(path)
+    try:
+        for i in range(25):
+            msg = """Subject: %i, pid %i
+From: sender at example.com
+
+Content goes here.
+%s""" % (i, os.getpid(), random_message())
+            while True:
+                try:
+                    mbox.lock()
+                except mailbox.ExternalClashError:
+                    # In case of conflict, wait a bit and try again.
+                    time.sleep(0.01)
+                else:
+                    break
+            mbox.add(msg)
+            mbox.flush()
+            mbox.unlock()
+    finally:
+        mbox.close()
+
 def add_message(factory, path, msg):
     # Add "msg" to mailbox at "path", using mailbox instance returned
     # by "factory".
@@ -288,6 +322,174 @@ class TestMailbox(TestBase):
                                (self._factory, self._path, msgc1, msgc2),
                                inspect)
 
+    def test_child_add_and_delete_interleave(self):
+        # Like test_child_add_and_delete, but parent process writes
+        # messages before and after the child rewrites the mailbox.
+        msgp1 = self._template % "p1"
+        msgp2 = self._template % "p2"
+        msgc1 = self._template % "c1"
+        msgc2 = self._template % "c2"
+        def parent():
+            self.assertEqual(len(self._box), 0)
+            self._box.add(msgp1)
+            yield
+            self._box.add(msgp2)
+        def inspect():
+            self._compare_mailbox({}, [msgp1, msgc2, msgp2])
+        self._subprocess_tests(parent, add_two_delete_one,
+                               (self._factory, self._path, msgc1, msgc2),
+                               inspect)
+
+    def test_add_by_other_reread(self):
+        # Check we can read other process' message after writing our own.
+        msgp = self._template % 0
+        msgc = self._template % 1
+        def parent():
+            key = self._box.add(msgp)
+            yield
+            self._compare_mailbox({key: msgp}, [msgc])
+        self._subprocess_tests(parent, add_message,
+                               (self._factory, self._path, msgc))
+
+    def test_interleave(self):
+        # Check that other process can add a message in between our messages.
+        p1 = self._template % "p1"
+        p2 = self._template % "p2"
+        c1 = self._template % "c1"
+        def parent():
+            k1 = self._box.add(p1)
+            yield
+            k2 = self._box.add(p2)
+            self._compare_mailbox({k1: p1, k2: p2}, [c1])
+        def inspect():
+            self._compare_mailbox({}, [p1, c1, p2])
+        self._subprocess_tests(parent, add_message,
+                               (self._factory, self._path, c1), inspect)
+
+    def test_delete_reread(self):
+        # Have other process add a message after we've deleted one.
+        p1 = self._template % "p1"
+        c1 = self._template % "c1"
+        def parent():
+            k1 = self._box.add(p1)
+            del self._box[k1]
+            yield
+            self._compare_mailbox({}, [c1])
+        def inspect():
+            self._compare_mailbox({}, [c1])
+        self._subprocess_tests(parent, add_message,
+                               (self._factory, self._path, c1), inspect)
+
+    def test_delete_reread2(self):
+        # As above, but have parent add more messages before and after.
+        p1 = self._template % "p1"
+        p2 = self._template % "p2"
+        p3 = self._template % "p3"
+        p4 = self._template % "p4"
+        c1 = self._template % "c1"
+        def parent():
+            k1 = self._box.add(p1)
+            k2 = self._box.add(p2)
+            del self._box[k2]
+            k3 = self._box.add(p3)
+            yield
+            k4 = self._box.add(p4)
+            self._compare_mailbox({k1: p1, k3: p3, k4: p4}, [c1])
+        def inspect():
+            self._compare_mailbox({}, [p1, p3, c1, p4])
+        self._subprocess_tests(parent, add_message,
+                               (self._factory, self._path, c1), inspect)
+
+    def test_replace_reread(self):
+        # Have other process add a message after we've replaced one.
+        p1 = self._template % "p1"
+        p2 = self._template % "p2"
+        c1 = self._template % "c1"
+        def parent():
+            k1 = self._box.add(p1)
+            self._box[k1] = p2
+            yield
+            self._compare_mailbox({k1: p2}, [c1])
+        def inspect():
+            self._compare_mailbox({}, [p2, c1])
+        self._subprocess_tests(parent, add_message,
+                               (self._factory, self._path, c1), inspect)
+
+    def test_replace_reread2(self):
+        # As above, but have parent add more messages before and after.
+        p1 = self._template % "p1"
+        p2 = self._template % "p2"
+        p3 = self._template % "p3"
+        p4 = self._template % "p4"
+        p5 = self._template % "p5"
+        c1 = self._template % "c1"
+        def parent():
+            k1 = self._box.add(p1)
+            k2 = self._box.add(p2)
+            self._box[k2] = p3
+            k4 = self._box.add(p4)
+            yield
+            k5 = self._box.add(p5)
+            self._compare_mailbox({k1: p1, k2: p3, k4: p4, k5: p5}, [c1])
+        def inspect():
+            self._compare_mailbox({}, [p1, p3, p4, c1, p5])
+        self._subprocess_tests(parent, add_message,
+                               (self._factory, self._path, c1), inspect)
+
+    @unittest.skipIf(multiprocessing is None, "requires multiprocessing")
+    def test_concurrent_add(self):
+        # Simple test of concurrent addition to a mailbox.
+        # This exercises the add() and flush() methods, based on bug #1599254.
+        # This bug affected only the classes based on _singlefileMailbox
+        # (mbox, MMDF, Babyl), but this test can apply to any mailbox type.
+
+        self._box.close()
+
+        # Fire off a subprocess that will add 25 messages to a mailbox
+        # file, locking and unlocking it each time.  The parent process
+        # will do the same.  The resulting mailbox should contain 50 messages.
+        with child_process(add_25_messages, self._factory, self._path):
+            add_25_messages(self._factory, self._path)
+
+        # We expect the mailbox to contain 50 messages.
+        self._box = self._factory(self._path)
+        self._box.lock()
+        self.assertEqual(len(self._box), 50)
+        self._box.unlock()
+
+    def test_double_shorten(self):
+        # Check that flush() can shorten the mailbox twice
+        self._test_remove_two_of_three(broken_locking=False)
+
+    def test_remove_with_broken_locking(self):
+        # Check that a (broken) application releasing the lock and
+        # then removing messages using the existing keys does not
+        # delete the wrong messages.
+        self._test_remove_two_of_three(broken_locking=True)
+
+    def _test_remove_two_of_three(self, broken_locking=False):
+        self._box.lock()
+        key0 = self._box.add(self._template % 0)
+        key1 = self._box.add(self._template % 1)
+        key2 = self._box.add(self._template % 2)
+        self._box.flush()
+        self._box.remove(key0)
+        self._box.flush()
+        if broken_locking:
+            # As the name suggests, code that does this is likely to
+            # be broken (releasing the lock invalidates the keys, in
+            # general), but ideally it should not malfunction if no
+            # other process modifies the mailbox.
+            self._box.unlock()
+            self._box.lock()
+        self._box.remove(key1)
+        self._box.flush()
+        self._box.unlock()
+        self._box.close()
+        self._box = self._factory(self._path)
+        self.assertEqual(len(self._box), 1)
+        self.assertEqual(self._box.itervalues().next().get_payload(), '2\n')
+
     def test_add(self):
         # Add copies of a sample message
         keys = []


More information about the Python-bugs-list mailing list