[Python-checkins] cpython: asyncio: Refactor SIGCHLD handling. By Anthony Baire.

guido.van.rossum python-checkins at python.org
Tue Nov 5 00:50:55 CET 2013


http://hg.python.org/cpython/rev/8d93ad260714
changeset:   86932:8d93ad260714
parent:      86929:268259370a01
user:        Guido van Rossum <guido at dropbox.com>
date:        Mon Nov 04 15:50:46 2013 -0800
summary:
  asyncio: Refactor SIGCHLD handling. By Anthony Baire.

files:
  Lib/asyncio/events.py                     |   70 +-
  Lib/asyncio/unix_events.py                |  396 +++-
  Lib/asyncio/windows_events.py             |   17 +-
  Lib/test/test_asyncio/test_events.py      |   44 +-
  Lib/test/test_asyncio/test_unix_events.py |  987 ++++++++-
  5 files changed, 1315 insertions(+), 199 deletions(-)


diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -1,10 +1,11 @@
 """Event loop and event loop policy."""
 
-__all__ = ['AbstractEventLoopPolicy', 'DefaultEventLoopPolicy',
+__all__ = ['AbstractEventLoopPolicy',
            'AbstractEventLoop', 'AbstractServer',
            'Handle', 'TimerHandle',
            'get_event_loop_policy', 'set_event_loop_policy',
            'get_event_loop', 'set_event_loop', 'new_event_loop',
+           'get_child_watcher', 'set_child_watcher',
            ]
 
 import subprocess
@@ -318,8 +319,18 @@
         """XXX"""
         raise NotImplementedError
 
+    # Child processes handling (Unix only).
 
-class DefaultEventLoopPolicy(threading.local, AbstractEventLoopPolicy):
+    def get_child_watcher(self):
+        """XXX"""
+        raise NotImplementedError
+
+    def set_child_watcher(self, watcher):
+        """XXX"""
+        raise NotImplementedError
+
+
+class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
     """Default policy implementation for accessing the event loop.
 
     In this policy, each thread has its own event loop.  However, we
@@ -332,28 +343,34 @@
     associated).
     """
 
-    _loop = None
-    _set_called = False
+    _loop_factory = None
+
+    class _Local(threading.local):
+        _loop = None
+        _set_called = False
+
+    def __init__(self):
+        self._local = self._Local()
 
     def get_event_loop(self):
         """Get the event loop.
 
         This may be None or an instance of EventLoop.
         """
-        if (self._loop is None and
-            not self._set_called and
+        if (self._local._loop is None and
+            not self._local._set_called and
             isinstance(threading.current_thread(), threading._MainThread)):
-            self._loop = self.new_event_loop()
-        assert self._loop is not None, \
+            self._local._loop = self.new_event_loop()
+        assert self._local._loop is not None, \
                ('There is no current event loop in thread %r.' %
                 threading.current_thread().name)
-        return self._loop
+        return self._local._loop
 
     def set_event_loop(self, loop):
         """Set the event loop."""
-        self._set_called = True
+        self._local._set_called = True
         assert loop is None or isinstance(loop, AbstractEventLoop)
-        self._loop = loop
+        self._local._loop = loop
 
     def new_event_loop(self):
         """Create a new event loop.
@@ -361,12 +378,7 @@
         You must call set_event_loop() to make this the current event
         loop.
         """
-        if sys.platform == 'win32':  # pragma: no cover
-            from . import windows_events
-            return windows_events.SelectorEventLoop()
-        else:  # pragma: no cover
-            from . import unix_events
-            return unix_events.SelectorEventLoop()
+        return self._loop_factory()
 
 
 # Event loop policy.  The policy itself is always global, even if the
@@ -375,12 +387,22 @@
 # call to get_event_loop_policy().
 _event_loop_policy = None
 
+# Lock for protecting the on-the-fly creation of the event loop policy.
+_lock = threading.Lock()
+
+
+def _init_event_loop_policy():
+    global _event_loop_policy
+    with _lock:
+        if _event_loop_policy is None:  # pragma: no branch
+            from . import DefaultEventLoopPolicy
+            _event_loop_policy = DefaultEventLoopPolicy()
+
 
 def get_event_loop_policy():
     """XXX"""
-    global _event_loop_policy
     if _event_loop_policy is None:
-        _event_loop_policy = DefaultEventLoopPolicy()
+        _init_event_loop_policy()
     return _event_loop_policy
 
 
@@ -404,3 +426,13 @@
 def new_event_loop():
     """XXX"""
     return get_event_loop_policy().new_event_loop()
+
+
+def get_child_watcher():
+    """XXX"""
+    return get_event_loop_policy().get_child_watcher()
+
+
+def set_child_watcher(watcher):
+    """XXX"""
+    return get_event_loop_policy().set_child_watcher(watcher)
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -8,6 +8,7 @@
 import stat
 import subprocess
 import sys
+import threading
 
 
 from . import base_subprocess
@@ -20,7 +21,10 @@
 from .log import logger
 
 
-__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR']
+__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR',
+           'AbstractChildWatcher', 'SafeChildWatcher',
+           'FastChildWatcher', 'DefaultEventLoopPolicy',
+           ]
 
 STDIN = 0
 STDOUT = 1
@@ -31,7 +35,7 @@
     raise ImportError('Signals are not really supported on Windows')
 
 
-class SelectorEventLoop(selector_events.BaseSelectorEventLoop):
+class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
     """Unix event loop
 
     Adds signal handling to SelectorEventLoop
@@ -40,17 +44,10 @@
     def __init__(self, selector=None):
         super().__init__(selector)
         self._signal_handlers = {}
-        self._subprocesses = {}
 
     def _socketpair(self):
         return socket.socketpair()
 
-    def close(self):
-        handler = self._signal_handlers.get(signal.SIGCHLD)
-        if handler is not None:
-            self.remove_signal_handler(signal.SIGCHLD)
-        super().close()
-
     def add_signal_handler(self, sig, callback, *args):
         """Add a handler for a signal.  UNIX only.
 
@@ -152,49 +149,20 @@
     def _make_subprocess_transport(self, protocol, args, shell,
                                    stdin, stdout, stderr, bufsize,
                                    extra=None, **kwargs):
-        self._reg_sigchld()
-        transp = _UnixSubprocessTransport(self, protocol, args, shell,
-                                          stdin, stdout, stderr, bufsize,
-                                          extra=None, **kwargs)
-        self._subprocesses[transp.get_pid()] = transp
+        with events.get_child_watcher() as watcher:
+            transp = _UnixSubprocessTransport(self, protocol, args, shell,
+                                              stdin, stdout, stderr, bufsize,
+                                              extra=None, **kwargs)
+            watcher.add_child_handler(transp.get_pid(),
+                                      self._child_watcher_callback, transp)
         yield from transp._post_init()
         return transp
 
-    def _reg_sigchld(self):
-        if signal.SIGCHLD not in self._signal_handlers:
-            self.add_signal_handler(signal.SIGCHLD, self._sig_chld)
+    def _child_watcher_callback(self, pid, returncode, transp):
+        self.call_soon_threadsafe(transp._process_exited, returncode)
 
-    def _sig_chld(self):
-        try:
-            # Because of signal coalescing, we must keep calling waitpid() as
-            # long as we're able to reap a child.
-            while True:
-                try:
-                    pid, status = os.waitpid(-1, os.WNOHANG)
-                except ChildProcessError:
-                    break  # No more child processes exist.
-                if pid == 0:
-                    break  # All remaining child processes are still alive.
-                elif os.WIFSIGNALED(status):
-                    # A child process died because of a signal.
-                    returncode = -os.WTERMSIG(status)
-                elif os.WIFEXITED(status):
-                    # A child process exited (e.g. sys.exit()).
-                    returncode = os.WEXITSTATUS(status)
-                else:
-                    # A child exited, but we don't understand its status.
-                    # This shouldn't happen, but if it does, let's just
-                    # return that status; perhaps that helps debug it.
-                    returncode = status
-                transp = self._subprocesses.get(pid)
-                if transp is not None:
-                    transp._process_exited(returncode)
-        except Exception:
-            logger.exception('Unknown exception in SIGCHLD handler')
-
-    def _subprocess_closed(self, transport):
-        pid = transport.get_pid()
-        self._subprocesses.pop(pid, None)
+    def _subprocess_closed(self, transp):
+        pass
 
 
 def _set_nonblocking(fd):
@@ -423,3 +391,335 @@
         if stdin_w is not None:
             stdin.close()
             self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
+
+
+class AbstractChildWatcher:
+    """Abstract base class for monitoring child processes.
+
+    Objects derived from this class monitor a collection of subprocesses and
+    report their termination or interruption by a signal.
+
+    New callbacks are registered with .add_child_handler(). Starting a new
+    process must be done within a 'with' block to allow the watcher to suspend
+    its activity until the new process if fully registered (this is needed to
+    prevent a race condition in some implementations).
+
+    Example:
+        with watcher:
+            proc = subprocess.Popen("sleep 1")
+            watcher.add_child_handler(proc.pid, callback)
+
+    Notes:
+        Implementations of this class must be thread-safe.
+
+        Since child watcher objects may catch the SIGCHLD signal and call
+        waitpid(-1), there should be only one active object per process.
+    """
+
+    def add_child_handler(self, pid, callback, *args):
+        """Register a new child handler.
+
+        Arrange for callback(pid, returncode, *args) to be called when
+        process 'pid' terminates. Specifying another callback for the same
+        process replaces the previous handler.
+
+        Note: callback() must be thread-safe
+        """
+        raise NotImplementedError()
+
+    def remove_child_handler(self, pid):
+        """Removes the handler for process 'pid'.
+
+        The function returns True if the handler was successfully removed,
+        False if there was nothing to remove."""
+
+        raise NotImplementedError()
+
+    def set_loop(self, loop):
+        """Reattach the watcher to another event loop.
+
+        Note: loop may be None
+        """
+        raise NotImplementedError()
+
+    def close(self):
+        """Close the watcher.
+
+        This must be called to make sure that any underlying resource is freed.
+        """
+        raise NotImplementedError()
+
+    def __enter__(self):
+        """Enter the watcher's context and allow starting new processes
+
+        This function must return self"""
+        raise NotImplementedError()
+
+    def __exit__(self, a, b, c):
+        """Exit the watcher's context"""
+        raise NotImplementedError()
+
+
+class BaseChildWatcher(AbstractChildWatcher):
+
+    def __init__(self, loop):
+        self._loop = None
+        self._callbacks = {}
+
+        self.set_loop(loop)
+
+    def close(self):
+        self.set_loop(None)
+        self._callbacks.clear()
+
+    def _do_waitpid(self, expected_pid):
+        raise NotImplementedError()
+
+    def _do_waitpid_all(self):
+        raise NotImplementedError()
+
+    def set_loop(self, loop):
+        assert loop is None or isinstance(loop, events.AbstractEventLoop)
+
+        if self._loop is not None:
+            self._loop.remove_signal_handler(signal.SIGCHLD)
+
+        self._loop = loop
+        if loop is not None:
+            loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
+
+            # Prevent a race condition in case a child terminated
+            # during the switch.
+            self._do_waitpid_all()
+
+    def remove_child_handler(self, pid):
+        try:
+            del self._callbacks[pid]
+            return True
+        except KeyError:
+            return False
+
+    def _sig_chld(self):
+        try:
+            self._do_waitpid_all()
+        except Exception:
+            logger.exception('Unknown exception in SIGCHLD handler')
+
+    def _compute_returncode(self, status):
+        if os.WIFSIGNALED(status):
+            # The child process died because of a signal.
+            return -os.WTERMSIG(status)
+        elif os.WIFEXITED(status):
+            # The child process exited (e.g sys.exit()).
+            return os.WEXITSTATUS(status)
+        else:
+            # The child exited, but we don't understand its status.
+            # This shouldn't happen, but if it does, let's just
+            # return that status; perhaps that helps debug it.
+            return status
+
+
+class SafeChildWatcher(BaseChildWatcher):
+    """'Safe' child watcher implementation.
+
+    This implementation avoids disrupting other code spawning processes by
+    polling explicitly each process in the SIGCHLD handler instead of calling
+    os.waitpid(-1).
+
+    This is a safe solution but it has a significant overhead when handling a
+    big number of children (O(n) each time SIGCHLD is raised)
+    """
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, a, b, c):
+        pass
+
+    def add_child_handler(self, pid, callback, *args):
+        self._callbacks[pid] = callback, args
+
+        # Prevent a race condition in case the child is already terminated.
+        self._do_waitpid(pid)
+
+    def _do_waitpid_all(self):
+
+        for pid in list(self._callbacks):
+            self._do_waitpid(pid)
+
+    def _do_waitpid(self, expected_pid):
+        assert expected_pid > 0
+
+        try:
+            pid, status = os.waitpid(expected_pid, os.WNOHANG)
+        except ChildProcessError:
+            # The child process is already reaped
+            # (may happen if waitpid() is called elsewhere).
+            pid = expected_pid
+            returncode = 255
+            logger.warning(
+                "Unknown child process pid %d, will report returncode 255",
+                pid)
+        else:
+            if pid == 0:
+                # The child process is still alive.
+                return
+
+            returncode = self._compute_returncode(status)
+
+        try:
+            callback, args = self._callbacks.pop(pid)
+        except KeyError:  # pragma: no cover
+            # May happen if .remove_child_handler() is called
+            # after os.waitpid() returns.
+            pass
+        else:
+            callback(pid, returncode, *args)
+
+
+class FastChildWatcher(BaseChildWatcher):
+    """'Fast' child watcher implementation.
+
+    This implementation reaps every terminated processes by calling
+    os.waitpid(-1) directly, possibly breaking other code spawning processes
+    and waiting for their termination.
+
+    There is no noticeable overhead when handling a big number of children
+    (O(1) each time a child terminates).
+    """
+    def __init__(self, loop):
+        super().__init__(loop)
+
+        self._lock = threading.Lock()
+        self._zombies = {}
+        self._forks = 0
+
+    def close(self):
+        super().close()
+        self._zombies.clear()
+
+    def __enter__(self):
+        with self._lock:
+            self._forks += 1
+
+            return self
+
+    def __exit__(self, a, b, c):
+        with self._lock:
+            self._forks -= 1
+
+            if self._forks or not self._zombies:
+                return
+
+            collateral_victims = str(self._zombies)
+            self._zombies.clear()
+
+        logger.warning(
+            "Caught subprocesses termination from unknown pids: %s",
+            collateral_victims)
+
+    def add_child_handler(self, pid, callback, *args):
+        assert self._forks, "Must use the context manager"
+
+        self._callbacks[pid] = callback, args
+
+        try:
+            # Ensure that the child is not already terminated.
+            # (raise KeyError if still alive)
+            returncode = self._zombies.pop(pid)
+
+            # Child is dead, therefore we can fire the callback immediately.
+            # First we remove it from the dict.
+            # (raise KeyError if .remove_child_handler() was called in-between)
+            del self._callbacks[pid]
+        except KeyError:
+            pass
+        else:
+            callback(pid, returncode, *args)
+
+    def _do_waitpid_all(self):
+        # Because of signal coalescing, we must keep calling waitpid() as
+        # long as we're able to reap a child.
+        while True:
+            try:
+                pid, status = os.waitpid(-1, os.WNOHANG)
+            except ChildProcessError:
+                # No more child processes exist.
+                return
+            else:
+                if pid == 0:
+                    # A child process is still alive.
+                    return
+
+                returncode = self._compute_returncode(status)
+
+            try:
+                callback, args = self._callbacks.pop(pid)
+            except KeyError:
+                # unknown child
+                with self._lock:
+                    if self._forks:
+                        # It may not be registered yet.
+                        self._zombies[pid] = returncode
+                        continue
+
+                logger.warning(
+                    "Caught subprocess termination from unknown pid: "
+                    "%d -> %d", pid, returncode)
+            else:
+                callback(pid, returncode, *args)
+
+
+class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
+    """XXX"""
+    _loop_factory = _UnixSelectorEventLoop
+
+    def __init__(self):
+        super().__init__()
+        self._watcher = None
+
+    def _init_watcher(self):
+        with events._lock:
+            if self._watcher is None:  # pragma: no branch
+                if isinstance(threading.current_thread(),
+                              threading._MainThread):
+                    self._watcher = SafeChildWatcher(self._local._loop)
+                else:
+                    self._watcher = SafeChildWatcher(None)
+
+    def set_event_loop(self, loop):
+        """Set the event loop.
+
+        As a side effect, if a child watcher was set before, then calling
+        .set_event_loop() from the main thread will call .set_loop(loop) on the
+        child watcher.
+        """
+
+        super().set_event_loop(loop)
+
+        if self._watcher is not None and \
+            isinstance(threading.current_thread(), threading._MainThread):
+            self._watcher.set_loop(loop)
+
+    def get_child_watcher(self):
+        """Get the child watcher
+
+        If not yet set, a SafeChildWatcher object is automatically created.
+        """
+        if self._watcher is None:
+            self._init_watcher()
+
+        return self._watcher
+
+    def set_child_watcher(self, watcher):
+        """Set the child watcher"""
+
+        assert watcher is None or isinstance(watcher, AbstractChildWatcher)
+
+        if self._watcher is not None:
+            self._watcher.close()
+
+        self._watcher = watcher
+
+SelectorEventLoop = _UnixSelectorEventLoop
+DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -7,6 +7,7 @@
 import struct
 import _winapi
 
+from . import events
 from . import base_subprocess
 from . import futures
 from . import proactor_events
@@ -17,7 +18,9 @@
 from . import _overlapped
 
 
-__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor']
+__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
+           'DefaultEventLoopPolicy',
+           ]
 
 
 NULL = 0
@@ -108,7 +111,7 @@
     __del__ = close
 
 
-class SelectorEventLoop(selector_events.BaseSelectorEventLoop):
+class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
     """Windows version of selector event loop."""
 
     def _socketpair(self):
@@ -453,3 +456,13 @@
 
         f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
         f.add_done_callback(callback)
+
+
+SelectorEventLoop = _WindowsSelectorEventLoop
+
+
+class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
+    _loop_factory = SelectorEventLoop
+
+
+DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -1308,8 +1308,17 @@
     from asyncio import selectors
     from asyncio import unix_events
 
+    class UnixEventLoopTestsMixin(EventLoopTestsMixin):
+        def setUp(self):
+            super().setUp()
+            events.set_child_watcher(unix_events.SafeChildWatcher(self.loop))
+
+        def tearDown(self):
+            events.set_child_watcher(None)
+            super().tearDown()
+
     if hasattr(selectors, 'KqueueSelector'):
-        class KqueueEventLoopTests(EventLoopTestsMixin,
+        class KqueueEventLoopTests(UnixEventLoopTestsMixin,
                                    SubprocessTestsMixin,
                                    unittest.TestCase):
 
@@ -1318,7 +1327,7 @@
                     selectors.KqueueSelector())
 
     if hasattr(selectors, 'EpollSelector'):
-        class EPollEventLoopTests(EventLoopTestsMixin,
+        class EPollEventLoopTests(UnixEventLoopTestsMixin,
                                   SubprocessTestsMixin,
                                   unittest.TestCase):
 
@@ -1326,7 +1335,7 @@
                 return unix_events.SelectorEventLoop(selectors.EpollSelector())
 
     if hasattr(selectors, 'PollSelector'):
-        class PollEventLoopTests(EventLoopTestsMixin,
+        class PollEventLoopTests(UnixEventLoopTestsMixin,
                                  SubprocessTestsMixin,
                                  unittest.TestCase):
 
@@ -1334,7 +1343,7 @@
                 return unix_events.SelectorEventLoop(selectors.PollSelector())
 
     # Should always exist.
-    class SelectEventLoopTests(EventLoopTestsMixin,
+    class SelectEventLoopTests(UnixEventLoopTestsMixin,
                                SubprocessTestsMixin,
                                unittest.TestCase):
 
@@ -1557,25 +1566,36 @@
 
 class PolicyTests(unittest.TestCase):
 
+    def create_policy(self):
+        if sys.platform == "win32":
+            from asyncio import windows_events
+            return windows_events.DefaultEventLoopPolicy()
+        else:
+            from asyncio import unix_events
+            return unix_events.DefaultEventLoopPolicy()
+
     def test_event_loop_policy(self):
         policy = events.AbstractEventLoopPolicy()
         self.assertRaises(NotImplementedError, policy.get_event_loop)
         self.assertRaises(NotImplementedError, policy.set_event_loop, object())
         self.assertRaises(NotImplementedError, policy.new_event_loop)
+        self.assertRaises(NotImplementedError, policy.get_child_watcher)
+        self.assertRaises(NotImplementedError, policy.set_child_watcher,
+                          object())
 
     def test_get_event_loop(self):
-        policy = events.DefaultEventLoopPolicy()
-        self.assertIsNone(policy._loop)
+        policy = self.create_policy()
+        self.assertIsNone(policy._local._loop)
 
         loop = policy.get_event_loop()
         self.assertIsInstance(loop, events.AbstractEventLoop)
 
-        self.assertIs(policy._loop, loop)
+        self.assertIs(policy._local._loop, loop)
         self.assertIs(loop, policy.get_event_loop())
         loop.close()
 
     def test_get_event_loop_after_set_none(self):
-        policy = events.DefaultEventLoopPolicy()
+        policy = self.create_policy()
         policy.set_event_loop(None)
         self.assertRaises(AssertionError, policy.get_event_loop)
 
@@ -1583,7 +1603,7 @@
     def test_get_event_loop_thread(self, m_current_thread):
 
         def f():
-            policy = events.DefaultEventLoopPolicy()
+            policy = self.create_policy()
             self.assertRaises(AssertionError, policy.get_event_loop)
 
         th = threading.Thread(target=f)
@@ -1591,14 +1611,14 @@
         th.join()
 
     def test_new_event_loop(self):
-        policy = events.DefaultEventLoopPolicy()
+        policy = self.create_policy()
 
         loop = policy.new_event_loop()
         self.assertIsInstance(loop, events.AbstractEventLoop)
         loop.close()
 
     def test_set_event_loop(self):
-        policy = events.DefaultEventLoopPolicy()
+        policy = self.create_policy()
         old_loop = policy.get_event_loop()
 
         self.assertRaises(AssertionError, policy.set_event_loop, object())
@@ -1621,7 +1641,7 @@
 
         old_policy = events.get_event_loop_policy()
 
-        policy = events.DefaultEventLoopPolicy()
+        policy = self.create_policy()
         events.set_event_loop_policy(policy)
         self.assertIs(policy, events.get_event_loop_policy())
         self.assertIsNot(policy, old_policy)
diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py
--- a/Lib/test/test_asyncio/test_unix_events.py
+++ b/Lib/test/test_asyncio/test_unix_events.py
@@ -3,10 +3,12 @@
 import gc
 import errno
 import io
+import os
 import pprint
 import signal
 import stat
 import sys
+import threading
 import unittest
 import unittest.mock
 
@@ -181,124 +183,6 @@
         self.assertRaises(
             RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP)
 
-    @unittest.mock.patch('os.WTERMSIG')
-    @unittest.mock.patch('os.WEXITSTATUS')
-    @unittest.mock.patch('os.WIFSIGNALED')
-    @unittest.mock.patch('os.WIFEXITED')
-    @unittest.mock.patch('os.waitpid')
-    def test__sig_chld(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED,
-                       m_WEXITSTATUS, m_WTERMSIG):
-        m_waitpid.side_effect = [(7, object()), ChildProcessError]
-        m_WIFEXITED.return_value = True
-        m_WIFSIGNALED.return_value = False
-        m_WEXITSTATUS.return_value = 3
-        transp = unittest.mock.Mock()
-        self.loop._subprocesses[7] = transp
-
-        self.loop._sig_chld()
-        transp._process_exited.assert_called_with(3)
-        self.assertFalse(m_WTERMSIG.called)
-
-    @unittest.mock.patch('os.WTERMSIG')
-    @unittest.mock.patch('os.WEXITSTATUS')
-    @unittest.mock.patch('os.WIFSIGNALED')
-    @unittest.mock.patch('os.WIFEXITED')
-    @unittest.mock.patch('os.waitpid')
-    def test__sig_chld_signal(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED,
-                              m_WEXITSTATUS, m_WTERMSIG):
-        m_waitpid.side_effect = [(7, object()), ChildProcessError]
-        m_WIFEXITED.return_value = False
-        m_WIFSIGNALED.return_value = True
-        m_WTERMSIG.return_value = 1
-        transp = unittest.mock.Mock()
-        self.loop._subprocesses[7] = transp
-
-        self.loop._sig_chld()
-        transp._process_exited.assert_called_with(-1)
-        self.assertFalse(m_WEXITSTATUS.called)
-
-    @unittest.mock.patch('os.WTERMSIG')
-    @unittest.mock.patch('os.WEXITSTATUS')
-    @unittest.mock.patch('os.WIFSIGNALED')
-    @unittest.mock.patch('os.WIFEXITED')
-    @unittest.mock.patch('os.waitpid')
-    def test__sig_chld_zero_pid(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED,
-                                m_WEXITSTATUS, m_WTERMSIG):
-        m_waitpid.side_effect = [(0, object()), ChildProcessError]
-        transp = unittest.mock.Mock()
-        self.loop._subprocesses[7] = transp
-
-        self.loop._sig_chld()
-        self.assertFalse(transp._process_exited.called)
-        self.assertFalse(m_WIFSIGNALED.called)
-        self.assertFalse(m_WIFEXITED.called)
-        self.assertFalse(m_WTERMSIG.called)
-        self.assertFalse(m_WEXITSTATUS.called)
-
-    @unittest.mock.patch('os.WTERMSIG')
-    @unittest.mock.patch('os.WEXITSTATUS')
-    @unittest.mock.patch('os.WIFSIGNALED')
-    @unittest.mock.patch('os.WIFEXITED')
-    @unittest.mock.patch('os.waitpid')
-    def test__sig_chld_not_registered_subprocess(self, m_waitpid,
-                                                 m_WIFEXITED, m_WIFSIGNALED,
-                                                 m_WEXITSTATUS, m_WTERMSIG):
-        m_waitpid.side_effect = [(7, object()), ChildProcessError]
-        m_WIFEXITED.return_value = True
-        m_WIFSIGNALED.return_value = False
-        m_WEXITSTATUS.return_value = 3
-
-        self.loop._sig_chld()
-        self.assertFalse(m_WTERMSIG.called)
-
-    @unittest.mock.patch('os.WTERMSIG')
-    @unittest.mock.patch('os.WEXITSTATUS')
-    @unittest.mock.patch('os.WIFSIGNALED')
-    @unittest.mock.patch('os.WIFEXITED')
-    @unittest.mock.patch('os.waitpid')
-    def test__sig_chld_unknown_status(self, m_waitpid,
-                                      m_WIFEXITED, m_WIFSIGNALED,
-                                      m_WEXITSTATUS, m_WTERMSIG):
-        m_waitpid.side_effect = [(7, object()), ChildProcessError]
-        m_WIFEXITED.return_value = False
-        m_WIFSIGNALED.return_value = False
-        transp = unittest.mock.Mock()
-        self.loop._subprocesses[7] = transp
-
-        self.loop._sig_chld()
-        self.assertTrue(transp._process_exited.called)
-        self.assertFalse(m_WEXITSTATUS.called)
-        self.assertFalse(m_WTERMSIG.called)
-
-    @unittest.mock.patch('asyncio.unix_events.logger')
-    @unittest.mock.patch('os.WTERMSIG')
-    @unittest.mock.patch('os.WEXITSTATUS')
-    @unittest.mock.patch('os.WIFSIGNALED')
-    @unittest.mock.patch('os.WIFEXITED')
-    @unittest.mock.patch('os.waitpid')
-    def test__sig_chld_unknown_status_in_handler(self, m_waitpid,
-                                                 m_WIFEXITED, m_WIFSIGNALED,
-                                                 m_WEXITSTATUS, m_WTERMSIG,
-                                                 m_log):
-        m_waitpid.side_effect = Exception
-        transp = unittest.mock.Mock()
-        self.loop._subprocesses[7] = transp
-
-        self.loop._sig_chld()
-        self.assertFalse(transp._process_exited.called)
-        self.assertFalse(m_WIFSIGNALED.called)
-        self.assertFalse(m_WIFEXITED.called)
-        self.assertFalse(m_WTERMSIG.called)
-        self.assertFalse(m_WEXITSTATUS.called)
-        m_log.exception.assert_called_with(
-            'Unknown exception in SIGCHLD handler')
-
-    @unittest.mock.patch('os.waitpid')
-    def test__sig_chld_process_error(self, m_waitpid):
-        m_waitpid.side_effect = ChildProcessError
-        self.loop._sig_chld()
-        self.assertTrue(m_waitpid.called)
-
 
 class UnixReadPipeTransportTests(unittest.TestCase):
 
@@ -777,5 +661,872 @@
         self.assertFalse(self.protocol.connection_lost.called)
 
 
+class AbstractChildWatcherTests(unittest.TestCase):
+
+    def test_not_implemented(self):
+        f = unittest.mock.Mock()
+        watcher = unix_events.AbstractChildWatcher()
+        self.assertRaises(
+            NotImplementedError, watcher.add_child_handler, f, f)
+        self.assertRaises(
+            NotImplementedError, watcher.remove_child_handler, f)
+        self.assertRaises(
+            NotImplementedError, watcher.set_loop, f)
+        self.assertRaises(
+            NotImplementedError, watcher.close)
+        self.assertRaises(
+            NotImplementedError, watcher.__enter__)
+        self.assertRaises(
+            NotImplementedError, watcher.__exit__, f, f, f)
+
+
+class BaseChildWatcherTests(unittest.TestCase):
+
+    def test_not_implemented(self):
+        f = unittest.mock.Mock()
+        watcher = unix_events.BaseChildWatcher(None)
+        self.assertRaises(
+            NotImplementedError, watcher._do_waitpid, f)
+
+
+class ChildWatcherTestsMixin:
+    instance = None
+
+    ignore_warnings = unittest.mock.patch.object(unix_events.logger, "warning")
+
+    def setUp(self):
+        self.loop = test_utils.TestLoop()
+        self.running = False
+        self.zombies = {}
+
+        assert ChildWatcherTestsMixin.instance is None
+        ChildWatcherTestsMixin.instance = self
+
+        with unittest.mock.patch.object(
+                self.loop, "add_signal_handler") as self.m_add_signal_handler:
+            self.watcher = self.create_watcher(self.loop)
+
+    def tearDown(self):
+        ChildWatcherTestsMixin.instance = None
+
+    def waitpid(pid, flags):
+        self = ChildWatcherTestsMixin.instance
+        if isinstance(self.watcher, unix_events.SafeChildWatcher) or pid != -1:
+            self.assertGreater(pid, 0)
+        try:
+            if pid < 0:
+                return self.zombies.popitem()
+            else:
+                return pid, self.zombies.pop(pid)
+        except KeyError:
+            pass
+        if self.running:
+            return 0, 0
+        else:
+            raise ChildProcessError()
+
+    def add_zombie(self, pid, returncode):
+        self.zombies[pid] = returncode + 32768
+
+    def WIFEXITED(status):
+        return status >= 32768
+
+    def WIFSIGNALED(status):
+        return 32700 < status < 32768
+
+    def WEXITSTATUS(status):
+        self = ChildWatcherTestsMixin.instance
+        self.assertTrue(type(self).WIFEXITED(status))
+        return status - 32768
+
+    def WTERMSIG(status):
+        self = ChildWatcherTestsMixin.instance
+        self.assertTrue(type(self).WIFSIGNALED(status))
+        return 32768 - status
+
+    def test_create_watcher(self):
+        self.m_add_signal_handler.assert_called_once_with(
+            signal.SIGCHLD, self.watcher._sig_chld)
+
+    @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+    @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+    @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+    @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+    @unittest.mock.patch('os.waitpid', wraps=waitpid)
+    def test_sigchld(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED,
+                           m_WEXITSTATUS, m_WTERMSIG):
+        # register a child
+        callback = unittest.mock.Mock()
+
+        with self.watcher:
+            self.running = True
+            self.watcher.add_child_handler(42, callback, 9, 10, 14)
+
+        self.assertFalse(callback.called)
+        self.assertFalse(m_WIFEXITED.called)
+        self.assertFalse(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        # child is running
+        self.watcher._sig_chld()
+
+        self.assertFalse(callback.called)
+        self.assertFalse(m_WIFEXITED.called)
+        self.assertFalse(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        # child terminates (returncode 12)
+        self.running = False
+        self.add_zombie(42, 12)
+        self.watcher._sig_chld()
+
+        self.assertTrue(m_WIFEXITED.called)
+        self.assertTrue(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+        callback.assert_called_once_with(42, 12, 9, 10, 14)
+
+        m_WIFSIGNALED.reset_mock()
+        m_WIFEXITED.reset_mock()
+        m_WEXITSTATUS.reset_mock()
+        callback.reset_mock()
+
+        # ensure that the child is effectively reaped
+        self.add_zombie(42, 13)
+        with self.ignore_warnings:
+            self.watcher._sig_chld()
+
+        self.assertFalse(callback.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        m_WIFSIGNALED.reset_mock()
+        m_WIFEXITED.reset_mock()
+        m_WEXITSTATUS.reset_mock()
+
+        # sigchld called again
+        self.zombies.clear()
+        self.watcher._sig_chld()
+
+        self.assertFalse(callback.called)
+        self.assertFalse(m_WIFEXITED.called)
+        self.assertFalse(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+    @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+    @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+    @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+    @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+    @unittest.mock.patch('os.waitpid', wraps=waitpid)
+    def test_sigchld_two_children(self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED,
+                                        m_WEXITSTATUS, m_WTERMSIG):
+        callback1 = unittest.mock.Mock()
+        callback2 = unittest.mock.Mock()
+
+        # register child 1
+        with self.watcher:
+            self.running = True
+            self.watcher.add_child_handler(43, callback1, 7, 8)
+
+        self.assertFalse(callback1.called)
+        self.assertFalse(callback2.called)
+        self.assertFalse(m_WIFEXITED.called)
+        self.assertFalse(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        # register child 2
+        with self.watcher:
+            self.watcher.add_child_handler(44, callback2, 147, 18)
+
+        self.assertFalse(callback1.called)
+        self.assertFalse(callback2.called)
+        self.assertFalse(m_WIFEXITED.called)
+        self.assertFalse(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        # childen are running
+        self.watcher._sig_chld()
+
+        self.assertFalse(callback1.called)
+        self.assertFalse(callback2.called)
+        self.assertFalse(m_WIFEXITED.called)
+        self.assertFalse(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        # child 1 terminates (signal 3)
+        self.add_zombie(43, -3)
+        self.watcher._sig_chld()
+
+        callback1.assert_called_once_with(43, -3, 7, 8)
+        self.assertFalse(callback2.called)
+        self.assertTrue(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertTrue(m_WTERMSIG.called)
+
+        m_WIFSIGNALED.reset_mock()
+        m_WIFEXITED.reset_mock()
+        m_WTERMSIG.reset_mock()
+        callback1.reset_mock()
+
+        # child 2 still running
+        self.watcher._sig_chld()
+
+        self.assertFalse(callback1.called)
+        self.assertFalse(callback2.called)
+        self.assertFalse(m_WIFEXITED.called)
+        self.assertFalse(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        # child 2 terminates (code 108)
+        self.add_zombie(44, 108)
+        self.running = False
+        self.watcher._sig_chld()
+
+        callback2.assert_called_once_with(44, 108, 147, 18)
+        self.assertFalse(callback1.called)
+        self.assertTrue(m_WIFEXITED.called)
+        self.assertTrue(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        m_WIFSIGNALED.reset_mock()
+        m_WIFEXITED.reset_mock()
+        m_WEXITSTATUS.reset_mock()
+        callback2.reset_mock()
+
+        # ensure that the children are effectively reaped
+        self.add_zombie(43, 14)
+        self.add_zombie(44, 15)
+        with self.ignore_warnings:
+            self.watcher._sig_chld()
+
+        self.assertFalse(callback1.called)
+        self.assertFalse(callback2.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        m_WIFSIGNALED.reset_mock()
+        m_WIFEXITED.reset_mock()
+        m_WEXITSTATUS.reset_mock()
+
+        # sigchld called again
+        self.zombies.clear()
+        self.watcher._sig_chld()
+
+        self.assertFalse(callback1.called)
+        self.assertFalse(callback2.called)
+        self.assertFalse(m_WIFEXITED.called)
+        self.assertFalse(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+    @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+    @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+    @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+    @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+    @unittest.mock.patch('os.waitpid', wraps=waitpid)
+    def test_sigchld_two_children_terminating_together(
+            self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS,
+            m_WTERMSIG):
+        callback1 = unittest.mock.Mock()
+        callback2 = unittest.mock.Mock()
+
+        # register child 1
+        with self.watcher:
+            self.running = True
+            self.watcher.add_child_handler(45, callback1, 17, 8)
+
+        self.assertFalse(callback1.called)
+        self.assertFalse(callback2.called)
+        self.assertFalse(m_WIFEXITED.called)
+        self.assertFalse(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        # register child 2
+        with self.watcher:
+            self.watcher.add_child_handler(46, callback2, 1147, 18)
+
+        self.assertFalse(callback1.called)
+        self.assertFalse(callback2.called)
+        self.assertFalse(m_WIFEXITED.called)
+        self.assertFalse(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        # childen are running
+        self.watcher._sig_chld()
+
+        self.assertFalse(callback1.called)
+        self.assertFalse(callback2.called)
+        self.assertFalse(m_WIFEXITED.called)
+        self.assertFalse(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        # child 1 terminates (code 78)
+        # child 2 terminates (signal 5)
+        self.add_zombie(45, 78)
+        self.add_zombie(46, -5)
+        self.running = False
+        self.watcher._sig_chld()
+
+        callback1.assert_called_once_with(45, 78, 17, 8)
+        callback2.assert_called_once_with(46, -5, 1147, 18)
+        self.assertTrue(m_WIFSIGNALED.called)
+        self.assertTrue(m_WIFEXITED.called)
+        self.assertTrue(m_WEXITSTATUS.called)
+        self.assertTrue(m_WTERMSIG.called)
+
+        m_WIFSIGNALED.reset_mock()
+        m_WIFEXITED.reset_mock()
+        m_WTERMSIG.reset_mock()
+        m_WEXITSTATUS.reset_mock()
+        callback1.reset_mock()
+        callback2.reset_mock()
+
+        # ensure that the children are effectively reaped
+        self.add_zombie(45, 14)
+        self.add_zombie(46, 15)
+        with self.ignore_warnings:
+            self.watcher._sig_chld()
+
+        self.assertFalse(callback1.called)
+        self.assertFalse(callback2.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+    @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+    @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+    @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+    @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+    @unittest.mock.patch('os.waitpid', wraps=waitpid)
+    def test_sigchld_race_condition(
+            self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS,
+            m_WTERMSIG):
+        # register a child
+        callback = unittest.mock.Mock()
+
+        with self.watcher:
+            # child terminates before being registered
+            self.add_zombie(50, 4)
+            self.watcher._sig_chld()
+
+            self.watcher.add_child_handler(50, callback, 1, 12)
+
+        callback.assert_called_once_with(50, 4, 1, 12)
+        callback.reset_mock()
+
+        # ensure that the child is effectively reaped
+        self.add_zombie(50, -1)
+        with self.ignore_warnings:
+            self.watcher._sig_chld()
+
+        self.assertFalse(callback.called)
+
+    @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+    @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+    @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+    @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+    @unittest.mock.patch('os.waitpid', wraps=waitpid)
+    def test_sigchld_replace_handler(
+            self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS,
+            m_WTERMSIG):
+        callback1 = unittest.mock.Mock()
+        callback2 = unittest.mock.Mock()
+
+        # register a child
+        with self.watcher:
+            self.running = True
+            self.watcher.add_child_handler(51, callback1, 19)
+
+        self.assertFalse(callback1.called)
+        self.assertFalse(callback2.called)
+        self.assertFalse(m_WIFEXITED.called)
+        self.assertFalse(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        # register the same child again
+        with self.watcher:
+            self.watcher.add_child_handler(51, callback2, 21)
+
+        self.assertFalse(callback1.called)
+        self.assertFalse(callback2.called)
+        self.assertFalse(m_WIFEXITED.called)
+        self.assertFalse(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        # child terminates (signal 8)
+        self.running = False
+        self.add_zombie(51, -8)
+        self.watcher._sig_chld()
+
+        callback2.assert_called_once_with(51, -8, 21)
+        self.assertFalse(callback1.called)
+        self.assertTrue(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertTrue(m_WTERMSIG.called)
+
+        m_WIFSIGNALED.reset_mock()
+        m_WIFEXITED.reset_mock()
+        m_WTERMSIG.reset_mock()
+        callback2.reset_mock()
+
+        # ensure that the child is effectively reaped
+        self.add_zombie(51, 13)
+        with self.ignore_warnings:
+            self.watcher._sig_chld()
+
+        self.assertFalse(callback1.called)
+        self.assertFalse(callback2.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+    @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+    @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+    @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+    @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+    @unittest.mock.patch('os.waitpid', wraps=waitpid)
+    def test_sigchld_remove_handler(self, m_waitpid, m_WIFEXITED,
+                                    m_WIFSIGNALED, m_WEXITSTATUS, m_WTERMSIG):
+        callback = unittest.mock.Mock()
+
+        # register a child
+        with self.watcher:
+            self.running = True
+            self.watcher.add_child_handler(52, callback, 1984)
+
+        self.assertFalse(callback.called)
+        self.assertFalse(m_WIFEXITED.called)
+        self.assertFalse(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        # unregister the child
+        self.watcher.remove_child_handler(52)
+
+        self.assertFalse(callback.called)
+        self.assertFalse(m_WIFEXITED.called)
+        self.assertFalse(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        # child terminates (code 99)
+        self.running = False
+        self.add_zombie(52, 99)
+        with self.ignore_warnings:
+            self.watcher._sig_chld()
+
+        self.assertFalse(callback.called)
+
+    @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+    @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+    @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+    @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+    @unittest.mock.patch('os.waitpid', wraps=waitpid)
+    def test_sigchld_unknown_status(self, m_waitpid, m_WIFEXITED,
+                                    m_WIFSIGNALED, m_WEXITSTATUS, m_WTERMSIG):
+        callback = unittest.mock.Mock()
+
+        # register a child
+        with self.watcher:
+            self.running = True
+            self.watcher.add_child_handler(53, callback, -19)
+
+        self.assertFalse(callback.called)
+        self.assertFalse(m_WIFEXITED.called)
+        self.assertFalse(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        # terminate with unknown status
+        self.zombies[53] = 1178
+        self.running = False
+        self.watcher._sig_chld()
+
+        callback.assert_called_once_with(53, 1178, -19)
+        self.assertTrue(m_WIFEXITED.called)
+        self.assertTrue(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        callback.reset_mock()
+        m_WIFEXITED.reset_mock()
+        m_WIFSIGNALED.reset_mock()
+
+        # ensure that the child is effectively reaped
+        self.add_zombie(53, 101)
+        with self.ignore_warnings:
+            self.watcher._sig_chld()
+
+        self.assertFalse(callback.called)
+
+    @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+    @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+    @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+    @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+    @unittest.mock.patch('os.waitpid', wraps=waitpid)
+    def test_remove_child_handler(self, m_waitpid, m_WIFEXITED,
+                                  m_WIFSIGNALED, m_WEXITSTATUS, m_WTERMSIG):
+        callback1 = unittest.mock.Mock()
+        callback2 = unittest.mock.Mock()
+        callback3 = unittest.mock.Mock()
+
+        # register children
+        with self.watcher:
+            self.running = True
+            self.watcher.add_child_handler(54, callback1, 1)
+            self.watcher.add_child_handler(55, callback2, 2)
+            self.watcher.add_child_handler(56, callback3, 3)
+
+        # remove child handler 1
+        self.assertTrue(self.watcher.remove_child_handler(54))
+
+        # remove child handler 2 multiple times
+        self.assertTrue(self.watcher.remove_child_handler(55))
+        self.assertFalse(self.watcher.remove_child_handler(55))
+        self.assertFalse(self.watcher.remove_child_handler(55))
+
+        # all children terminate
+        self.add_zombie(54, 0)
+        self.add_zombie(55, 1)
+        self.add_zombie(56, 2)
+        self.running = False
+        with self.ignore_warnings:
+            self.watcher._sig_chld()
+
+        self.assertFalse(callback1.called)
+        self.assertFalse(callback2.called)
+        callback3.assert_called_once_with(56, 2, 3)
+
+    @unittest.mock.patch('os.waitpid', wraps=waitpid)
+    def test_sigchld_unhandled_exception(self, m_waitpid):
+        callback = unittest.mock.Mock()
+
+        # register a child
+        with self.watcher:
+            self.running = True
+            self.watcher.add_child_handler(57, callback)
+
+        # raise an exception
+        m_waitpid.side_effect = ValueError
+
+        with unittest.mock.patch.object(unix_events.logger,
+                                        "exception") as m_exception:
+
+            self.assertEqual(self.watcher._sig_chld(), None)
+            self.assertTrue(m_exception.called)
+
+    @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+    @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+    @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+    @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+    @unittest.mock.patch('os.waitpid', wraps=waitpid)
+    def test_sigchld_child_reaped_elsewhere(
+            self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS,
+            m_WTERMSIG):
+
+        # register a child
+        callback = unittest.mock.Mock()
+
+        with self.watcher:
+            self.running = True
+            self.watcher.add_child_handler(58, callback)
+
+        self.assertFalse(callback.called)
+        self.assertFalse(m_WIFEXITED.called)
+        self.assertFalse(m_WIFSIGNALED.called)
+        self.assertFalse(m_WEXITSTATUS.called)
+        self.assertFalse(m_WTERMSIG.called)
+
+        # child terminates
+        self.running = False
+        self.add_zombie(58, 4)
+
+        # waitpid is called elsewhere
+        os.waitpid(58, os.WNOHANG)
+
+        m_waitpid.reset_mock()
+
+        # sigchld
+        with self.ignore_warnings:
+            self.watcher._sig_chld()
+
+        callback.assert_called(m_waitpid)
+        if isinstance(self.watcher, unix_events.FastChildWatcher):
+            # here the FastChildWatche enters a deadlock
+            # (there is no way to prevent it)
+            self.assertFalse(callback.called)
+        else:
+            callback.assert_called_once_with(58, 255)
+
+    @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+    @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+    @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+    @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+    @unittest.mock.patch('os.waitpid', wraps=waitpid)
+    def test_sigchld_unknown_pid_during_registration(
+            self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS,
+            m_WTERMSIG):
+
+        # register two children
+        callback1 = unittest.mock.Mock()
+        callback2 = unittest.mock.Mock()
+
+        with self.ignore_warnings, self.watcher:
+            self.running = True
+            # child 1 terminates
+            self.add_zombie(591, 7)
+            # an unknown child terminates
+            self.add_zombie(593, 17)
+
+            self.watcher._sig_chld()
+
+            self.watcher.add_child_handler(591, callback1)
+            self.watcher.add_child_handler(592, callback2)
+
+        callback1.assert_called_once_with(591, 7)
+        self.assertFalse(callback2.called)
+
+    @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+    @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+    @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+    @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+    @unittest.mock.patch('os.waitpid', wraps=waitpid)
+    def test_set_loop(
+            self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS,
+            m_WTERMSIG):
+
+        # register a child
+        callback = unittest.mock.Mock()
+
+        with self.watcher:
+            self.running = True
+            self.watcher.add_child_handler(60, callback)
+
+        # attach a new loop
+        old_loop = self.loop
+        self.loop = test_utils.TestLoop()
+
+        with unittest.mock.patch.object(
+                old_loop,
+                "remove_signal_handler") as m_old_remove_signal_handler, \
+             unittest.mock.patch.object(
+                self.loop,
+                "add_signal_handler") as m_new_add_signal_handler:
+
+            self.watcher.set_loop(self.loop)
+
+            m_old_remove_signal_handler.assert_called_once_with(
+                signal.SIGCHLD)
+            m_new_add_signal_handler.assert_called_once_with(
+                signal.SIGCHLD, self.watcher._sig_chld)
+
+        # child terminates
+        self.running = False
+        self.add_zombie(60, 9)
+        self.watcher._sig_chld()
+
+        callback.assert_called_once_with(60, 9)
+
+    @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+    @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+    @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+    @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+    @unittest.mock.patch('os.waitpid', wraps=waitpid)
+    def test_set_loop_race_condition(
+            self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS,
+            m_WTERMSIG):
+
+        # register 3 children
+        callback1 = unittest.mock.Mock()
+        callback2 = unittest.mock.Mock()
+        callback3 = unittest.mock.Mock()
+
+        with self.watcher:
+            self.running = True
+            self.watcher.add_child_handler(61, callback1)
+            self.watcher.add_child_handler(62, callback2)
+            self.watcher.add_child_handler(622, callback3)
+
+        # detach the loop
+        old_loop = self.loop
+        self.loop = None
+
+        with unittest.mock.patch.object(
+                old_loop, "remove_signal_handler") as m_remove_signal_handler:
+
+            self.watcher.set_loop(None)
+
+            m_remove_signal_handler.assert_called_once_with(
+                signal.SIGCHLD)
+
+        # child 1 & 2 terminate
+        self.add_zombie(61, 11)
+        self.add_zombie(62, -5)
+
+        # SIGCHLD was not catched
+        self.assertFalse(callback1.called)
+        self.assertFalse(callback2.called)
+        self.assertFalse(callback3.called)
+
+        # attach a new loop
+        self.loop = test_utils.TestLoop()
+
+        with unittest.mock.patch.object(
+                self.loop, "add_signal_handler") as m_add_signal_handler:
+
+            self.watcher.set_loop(self.loop)
+
+            m_add_signal_handler.assert_called_once_with(
+                signal.SIGCHLD, self.watcher._sig_chld)
+            callback1.assert_called_once_with(61, 11)  # race condition!
+            callback2.assert_called_once_with(62, -5)  # race condition!
+            self.assertFalse(callback3.called)
+
+        callback1.reset_mock()
+        callback2.reset_mock()
+
+        # child 3 terminates
+        self.running = False
+        self.add_zombie(622, 19)
+        self.watcher._sig_chld()
+
+        self.assertFalse(callback1.called)
+        self.assertFalse(callback2.called)
+        callback3.assert_called_once_with(622, 19)
+
+    @unittest.mock.patch('os.WTERMSIG', wraps=WTERMSIG)
+    @unittest.mock.patch('os.WEXITSTATUS', wraps=WEXITSTATUS)
+    @unittest.mock.patch('os.WIFSIGNALED', wraps=WIFSIGNALED)
+    @unittest.mock.patch('os.WIFEXITED', wraps=WIFEXITED)
+    @unittest.mock.patch('os.waitpid', wraps=waitpid)
+    def test_close(
+            self, m_waitpid, m_WIFEXITED, m_WIFSIGNALED, m_WEXITSTATUS,
+            m_WTERMSIG):
+
+        # register two children
+        callback1 = unittest.mock.Mock()
+        callback2 = unittest.mock.Mock()
+
+        with self.watcher:
+            self.running = True
+            # child 1 terminates
+            self.add_zombie(63, 9)
+            # other child terminates
+            self.add_zombie(65, 18)
+            self.watcher._sig_chld()
+
+            self.watcher.add_child_handler(63, callback1)
+            self.watcher.add_child_handler(64, callback1)
+
+            self.assertEqual(len(self.watcher._callbacks), 1)
+            if isinstance(self.watcher, unix_events.FastChildWatcher):
+                self.assertEqual(len(self.watcher._zombies), 1)
+
+            with unittest.mock.patch.object(
+                    self.loop,
+                    "remove_signal_handler") as m_remove_signal_handler:
+
+                self.watcher.close()
+
+                m_remove_signal_handler.assert_called_once_with(
+                    signal.SIGCHLD)
+                self.assertFalse(self.watcher._callbacks)
+                if isinstance(self.watcher, unix_events.FastChildWatcher):
+                    self.assertFalse(self.watcher._zombies)
+
+
+class SafeChildWatcherTests (ChildWatcherTestsMixin, unittest.TestCase):
+    def create_watcher(self, loop):
+        return unix_events.SafeChildWatcher(loop)
+
+
+class FastChildWatcherTests (ChildWatcherTestsMixin, unittest.TestCase):
+    def create_watcher(self, loop):
+        return unix_events.FastChildWatcher(loop)
+
+
+class PolicyTests(unittest.TestCase):
+
+    def create_policy(self):
+        return unix_events.DefaultEventLoopPolicy()
+
+    def test_get_child_watcher(self):
+        policy = self.create_policy()
+        self.assertIsNone(policy._watcher)
+
+        watcher = policy.get_child_watcher()
+        self.assertIsInstance(watcher, unix_events.SafeChildWatcher)
+
+        self.assertIs(policy._watcher, watcher)
+
+        self.assertIs(watcher, policy.get_child_watcher())
+        self.assertIsNone(watcher._loop)
+
+    def test_get_child_watcher_after_set(self):
+        policy = self.create_policy()
+        watcher = unix_events.FastChildWatcher(None)
+
+        policy.set_child_watcher(watcher)
+        self.assertIs(policy._watcher, watcher)
+        self.assertIs(watcher, policy.get_child_watcher())
+
+    def test_get_child_watcher_with_mainloop_existing(self):
+        policy = self.create_policy()
+        loop = policy.get_event_loop()
+
+        self.assertIsNone(policy._watcher)
+        watcher = policy.get_child_watcher()
+
+        self.assertIsInstance(watcher, unix_events.SafeChildWatcher)
+        self.assertIs(watcher._loop, loop)
+
+        loop.close()
+
+    def test_get_child_watcher_thread(self):
+
+        def f():
+            policy.set_event_loop(policy.new_event_loop())
+
+            self.assertIsInstance(policy.get_event_loop(),
+                                  events.AbstractEventLoop)
+            watcher = policy.get_child_watcher()
+
+            self.assertIsInstance(watcher, unix_events.SafeChildWatcher)
+            self.assertIsNone(watcher._loop)
+
+            policy.get_event_loop().close()
+
+        policy = self.create_policy()
+
+        th = threading.Thread(target=f)
+        th.start()
+        th.join()
+
+    def test_child_watcher_replace_mainloop_existing(self):
+        policy = self.create_policy()
+        loop = policy.get_event_loop()
+
+        watcher = policy.get_child_watcher()
+
+        self.assertIs(watcher._loop, loop)
+
+        new_loop = policy.new_event_loop()
+        policy.set_event_loop(new_loop)
+
+        self.assertIs(watcher._loop, new_loop)
+
+        policy.set_event_loop(None)
+
+        self.assertIs(watcher._loop, None)
+
+        loop.close()
+        new_loop.close()
+
+
 if __name__ == '__main__':
     unittest.main()

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list