[issue16853] add a Selector to the select module

Charles-François Natali report at bugs.python.org
Sat Jan 5 21:23:27 CET 2013


Charles-François Natali added the comment:

> I think that this needs extensive tests that verify the behavior of many end cases, including under duress (e.g. when there are too many connections for the kernel to handle).  That would seem the only way to make sure that the code is reliable across platforms.  It is likely that you could borrow some ideas for test scenarios from Twisted.

Will do.

I'm adding a new version taking into account some of Giampaolo's remarks.

Also, the API now also allows passing a file descriptor or any object
with a `fileno()` method, since it will likely be useful.

To sum up, the API is:

    def register(self, fileobj, events, data=None):
        """Register a file object.

        Parameters:
        fileobj -- file object
        events  -- events to monitor (bitwise mask of SELECT_IN|SELECT_OUT)
        data    -- attached data
        """

    def unregister(self, fileobj):
        """Unregister a file object.

        Parameters:
        fileobj -- file object
        """

    def modify(self, fileobj, events, data=None):
        """Change a registered file object monitored events or attached data.

        Parameters:
        fileobj -- file object
        events  -- events to monitor (bitwise mask of SELECT_IN|SELECT_OUT)
        data    -- attached data
        """

    def select(self, timeout=None):
        """Perform the actual selection, until some monitored file objects are
        ready or a timeout expires.

        Parameters:
        timeout -- if timeout > 0, this specifies the maximum wait time, in
                   seconds
                   if timeout == 0, the select() call won't block, and will
                   report the currently ready file objects
                   if timeout is None, select() will block until a monitored
                   file object becomes ready

        Returns:
        list of (fileobj, events, attached data) for ready file objects
        `events` is a bitwise mask of SELECT_IN|SELECT_OUT

Selector.select() output looks a lot like poll()/epoll() except for
two details: the output is the file object, and not the file
descriptor (poll()/epoll() are unfortunately inconsistent in this
regard), and there's a third field, the attached data (will be None if
not provided in register()/modify()). I think that this optional field
is really useful to pass e.g. a callback or some context information.

----------
Added file: http://bugs.python.org/file28584/selector-5.diff

_______________________________________
Python tracker <report at bugs.python.org>
<http://bugs.python.org/issue16853>
_______________________________________
-------------- next part --------------
diff --git a/Lib/select.py b/Lib/select.py
new file mode 100644
--- /dev/null
+++ b/Lib/select.py
@@ -0,0 +1,286 @@
+"""Select module.
+
+This module supports asynchronous I/O on multiple file descriptors.
+"""
+
+
+from _select import *
+
+
+# generic events, that must be mapped to implementation-specific ones
+# read event
+SELECT_IN  = (1 << 0)
+# write event
+SELECT_OUT = (1 << 1)
+
+
+def _fileobj_to_fd(fileobj):
+    """Return a file descriptor from a file object.
+
+    Parameters:
+    fileobj -- file descriptor, or any object with a `fileno()` method
+
+    Returns:
+    corresponding file descriptor
+    """
+    if isinstance(fileobj, int):
+        fd = fileobj
+    else:
+        try:
+            fd = int(fileobj.fileno())
+        except (ValueError, TypeError):
+            raise ValueError("Invalid file object: {!r}".format(fileobj))
+    return fd
+
+
+class _Key:
+    """Object used internally to associate a file object to its backing file
+    descriptor and attached data."""
+
+    def __init__(self, fileobj, data=None):
+        self.fileobj = fileobj
+        self.data = data
+        self.fd = _fileobj_to_fd(fileobj)
+
+
+class _BaseSelector:
+    """Base selector class.
+
+    A selector supports registering file objects to be monitored for specific
+    I/O events.
+
+    A file object is a file descriptor or any object with a `fileno()` method.
+    An arbitrary object can be attached to the file object, which can be used
+    for example to store context information, a callback, etc.
+
+    A selector can use various implementations (select(), poll(), epoll()...)
+    depending on the platform. The default `Selector` class uses the most
+    performant implementation on the current platform.
+    """
+
+    def __init__(self):
+        # this maps file descriptors to keys
+        self._fd_to_key = {}
+        # this maps file objects to keys - for fast (un)registering
+        self._fileobj_to_key = {}
+
+    def register(self, fileobj, events, data=None):
+        """Register a file object.
+
+        Parameters:
+        fileobj -- file object
+        events  -- events to monitor (bitwise mask of SELECT_IN|SELECT_OUT)
+        data    -- attached data
+        """
+        if (not events) or (events & ~(SELECT_IN|SELECT_OUT)):
+            raise ValueError("Invalid events: {}".format(events))
+
+        if fileobj in self._fileobj_to_key:
+            raise ValueError("{!r} is already registered".format(fileobj))
+
+        key = _Key(fileobj, data)
+        self._fd_to_key[key.fd] = key
+        self._fileobj_to_key[fileobj] = key
+        return key
+
+    def unregister(self, fileobj):
+        """Unregister a file object.
+
+        Parameters:
+        fileobj -- file object
+        """
+        try:
+            key = self._fileobj_to_key[fileobj]
+            del self._fd_to_key[key.fd]
+            del self._fileobj_to_key[fileobj]
+        except KeyError:
+            raise ValueError("{!r} is not registered".format(fileobj))
+        return key
+
+    def modify(self, fileobj, events, data=None):
+        """Change a registered file object monitored events or attached data.
+
+        Parameters:
+        fileobj -- file object
+        events  -- events to monitor (bitwise mask of SELECT_IN|SELECT_OUT)
+        data    -- attached data
+        """
+        self.unregister(fileobj)
+        self.register(fileobj, events, data)
+
+    def select(self, timeout=None):
+        """Perform the actual selection, until some monitored file objects are
+        ready or a timeout expires.
+        
+        Parameters:
+        timeout -- if timeout > 0, this specifies the maximum wait time, in
+                   seconds
+                   if timeout == 0, the select() call won't block, and will
+                   report the currently ready file objects
+                   if timeout is None, select() will block until a monitored
+                   file object becomes ready
+
+        Returns:
+        list of (fileobj, events, attached data) for ready file objects
+        `events` is a bitwise mask of SELECT_IN|SELECT_OUT
+        """
+        raise NotImplementedError()
+
+    def close(self):
+        """Close the selector.
+
+        This must be called to make sure that any underlying resource is freed.
+        """
+        self._fd_to_key.clear()
+        self._fileobj_to_key.clear()
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args):
+        self.close()
+
+    def _key_from_fd(self, fd):
+        """Return the key associated to a given file descriptor.
+
+        Parameters:
+        fd -- file descriptor
+
+        Returns:
+        corresponding key
+        """
+        try:
+            return self._fd_to_key[fd]
+        except KeyError:
+            raise RuntimeError("No key found for fd {}".format(fd))
+
+
+class SelectSelector(_BaseSelector):
+    """Select-based selector."""
+
+    def __init__(self):
+        super().__init__()
+        self._readers = set()
+        self._writers = set()
+
+    def register(self, fileobj, events, data=None):
+        key = super().register(fileobj, events, data)
+        if events & SELECT_IN:
+            self._readers.add(key.fd)
+        if events & SELECT_OUT:
+            self._writers.add(key.fd)
+
+    def unregister(self, fileobj):
+        key = super().unregister(fileobj)
+        self._readers.discard(key.fd)
+        self._writers.discard(key.fd)
+
+    def select(self, timeout=None):
+        r, w, _ = select(self._readers, self._writers, [], timeout)
+        r = set(r)
+        w = set(w)
+        ready = []
+        for fd in r | w:
+            events = 0
+            if fd in r:
+                events |= SELECT_IN
+            if fd in w:
+                events |= SELECT_OUT
+
+            key = self._key_from_fd(fd)
+            ready.append((key.fileobj, events, key.data))
+        return ready
+
+
+class PollSelector(_BaseSelector):
+    """Poll-based selector."""
+
+    def __init__(self):
+        super().__init__()
+        self._poll = poll()
+
+    def register(self, fileobj, events, data=None):
+        key = super().register(fileobj, events, data)
+        poll_events = 0
+        if events & SELECT_IN:
+            poll_events |= POLLIN
+        if events & SELECT_OUT:
+            poll_events |= POLLOUT
+        self._poll.register(key.fd, poll_events)
+
+    def unregister(self, fileobj):
+        key = super().unregister(fileobj)
+        self._poll.unregister(key.fd)
+
+    def select(self, timeout=None):
+        timeout = None if timeout is None else int(1000 * timeout)
+        ready = []
+        for fd, event in self._poll.poll(timeout):
+            events = 0
+            if event & (POLLERR|POLLNVAL):
+                # in case of error, signal read and write ready
+                events |= SELECT_IN|SELECT_OUT
+            else:
+                if event & (POLLIN|POLLHUP):
+                    # in case of hangup, signal read ready
+                    events |= SELECT_IN
+                if event & POLLOUT:
+                    events |= SELECT_OUT
+
+            key = self._key_from_fd(fd)
+            ready.append((key.fileobj, events, key.data))
+        return ready
+
+
+class EpollSelector(_BaseSelector):
+    """Epoll-based selector."""
+
+    def __init__(self):
+        super().__init__()
+        self._epoll = epoll()
+
+    def register(self, fileobj, events, data=None):
+        key = super().register(fileobj, events, data)
+        epoll_events = 0
+        if events & SELECT_IN:
+            epoll_events |= EPOLLIN
+        if events & SELECT_OUT:
+            epoll_events |= EPOLLOUT
+        self._epoll.register(key.fd, epoll_events)
+
+    def unregister(self, fileobj):
+        key = super().unregister(fileobj)
+        self._epoll.unregister(key.fd)
+
+    def select(self, timeout=None):
+        timeout = -1 if timeout is None else timeout
+        ready = []
+        for fd, event in self._epoll.poll(timeout):
+            events = 0
+            if event & EPOLLERR:
+                # in case of error, signal read and write ready
+                events |= SELECT_IN|SELECT_OUT
+            else:
+                if event & (EPOLLIN|EPOLLHUP):
+                    # in case of hangup, signal read ready
+                    events |= SELECT_IN
+                if event & EPOLLOUT:
+                    events |= SELECT_OUT
+
+            key = self._key_from_fd(fd)
+            ready.append((key.fileobj, events, key.data))
+        return ready
+
+    def close(self):
+        super().close()
+        self._epoll.close()
+
+
+# Choose the best implementation: roughly, epoll|kqueue > poll > select.
+# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
+if 'epoll' in globals():
+    Selector = EpollSelector
+elif 'poll' in globals():
+    Selector = PollSelector
+else:
+    Selector = SelectSelector
diff --git a/Lib/test/test_select.py b/Lib/test/test_select.py
--- a/Lib/test/test_select.py
+++ b/Lib/test/test_select.py
@@ -1,10 +1,20 @@
 import errno
 import os
+import random
 import select
 import sys
 import unittest
 from test import support
 
+
+def find_ready_matching(ready, flag):
+    match = []
+    for fd, mode, data in ready:
+        if mode & flag:
+            match.append(fd)
+    return match
+
+
 @unittest.skipIf((sys.platform[:3]=='win'),
                  "can't easily test on this system")
 class SelectTestCase(unittest.TestCase):
@@ -75,9 +85,172 @@
         a[:] = [F()] * 10
         self.assertEqual(select.select([], a, []), ([], a[:5], []))
 
+
+class BasicSelectorTestCase(unittest.TestCase):
+
+    def test_constants(self):
+        select.SELECT_IN
+        select.SELECT_OUT
+
+
+class BaseSelectorTestCase(unittest.TestCase):
+
+    def test_error_conditions(self):
+        s = self.SELECTOR()
+        self.assertRaises(TypeError, s.register)
+        self.assertRaises(TypeError, s.register, 0)
+        self.assertRaises(ValueError, s.register, 0, 18)
+        self.assertRaises(TypeError, s.unregister, 0, 1)
+        self.assertRaises(TypeError, s.modify, 0)
+        self.assertRaises(TypeError, s.select, 0, 1)
+
+    def test_basic(self):
+        with self.SELECTOR() as s:
+            rd, wr = os.pipe()
+            wro = os.fdopen(os.dup(wr), "wb")
+            self.addCleanup(os.close, rd)
+            self.addCleanup(os.close, wr)
+            self.addCleanup(wro.close)
+
+            # test without attached data
+            s.register(wr, select.SELECT_OUT)
+            self.assertEqual(set(((wr, select.SELECT_OUT, None),)), set(s.select()))
+
+            # test with attached data
+            s.unregister(wr)
+            s.register(wr, select.SELECT_OUT, sys.stdin)
+            self.assertEqual(set(((wr, select.SELECT_OUT, sys.stdin),)), set(s.select()))
+
+            # test with file object
+            s.register(wro, select.SELECT_OUT)
+            self.assertEqual(set(((wro, select.SELECT_OUT, None),
+                                 (wr, select.SELECT_OUT, sys.stdin))), set(s.select()))
+            s.unregister(wro)
+
+            # modify
+            s.modify(wr, select.SELECT_OUT, sys.stdout)
+            self.assertEqual(set(((wr, select.SELECT_OUT, sys.stdout),)), set(s.select()))
+
+            # test timeout
+            s.unregister(wr)
+            s.register(rd, select.SELECT_IN)
+            self.assertFalse(s.select(0.1))
+            s.register(wr, select.SELECT_OUT)
+            self.assertEqual(set(((wr, select.SELECT_OUT, None),)),
+                             set(s.select(0.1)))
+
+            s.unregister(rd)
+            s.unregister(wr)
+            # unregistering twice should raise an error
+            self.assertRaises(ValueError, s.unregister, wr)
+
+    def test_selector(self):
+        s = self.SELECTOR()
+        self.addCleanup(s.close)
+
+        NUM_PIPES = 12
+        MSG = b" This is a test."
+        MSG_LEN = len(MSG)
+        readers = []
+        writers = []
+        r2w = {}
+        w2r = {}
+
+        for i in range(NUM_PIPES):
+            rd, wr = os.pipe()
+            s.register(rd, select.SELECT_IN)
+            s.register(wr, select.SELECT_OUT)
+            readers.append(rd)
+            writers.append(wr)
+            r2w[rd] = wr
+            w2r[wr] = rd
+
+        bufs = []
+
+        while writers:
+            ready = s.select()
+            ready_writers = find_ready_matching(ready, select.SELECT_OUT)
+            if not ready_writers:
+                self.fail("no pipes ready for writing")
+            wr = random.choice(ready_writers)
+            os.write(wr, MSG)
+
+            ready = s.select()
+            ready_readers = find_ready_matching(ready, select.SELECT_IN)
+            if not ready_readers:
+                self.fail("no pipes ready for reading")
+            self.assertEqual([w2r[wr]], ready_readers)
+            rd = ready_readers[0]
+            buf = os.read(rd, MSG_LEN)
+            self.assertEqual(len(buf), MSG_LEN)
+            bufs.append(buf)
+            os.close(r2w[rd]) ; os.close(rd)
+            s.unregister(r2w[rd])
+            s.unregister(rd)
+            writers.remove(r2w[rd])
+
+        self.assertEqual(bufs, [MSG] * NUM_PIPES)
+
+    def test_timeout(self):
+        s = self.SELECTOR()
+        self.addCleanup(s.close)
+
+        cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
+        p = os.popen(cmd, 'r')
+        s.register(p.fileno(), select.SELECT_IN, p)
+
+        for tout in (0, 1, 2, 4, 8, 16) + (None,)*10:
+            if support.verbose:
+                print('timeout =', tout)
+
+            ready = s.select(tout)
+            if not ready:
+                continue
+            if set(ready) == set(((p.fileno(), select.SELECT_IN, p),)):
+                line = p.readline()
+                if support.verbose:
+                    print(repr(line))
+                if not line:
+                    if support.verbose:
+                        print('EOF')
+                    break
+                continue
+            self.fail('Unexpected return values from select(): %r' % ready)
+        p.close()
+
+
+ at unittest.skipIf((sys.platform[:3]=='win'),
+                 "can't easily test on this system")
+class SelectorTestCase(BaseSelectorTestCase):
+
+    SELECTOR = select.Selector
+
+
+class SelectSelectorTestCase(BaseSelectorTestCase):
+
+    SELECTOR = select.SelectSelector
+
+
+ at unittest.skipUnless(hasattr(select, 'poll'), "Test needs select.poll()")
+class PollSelectorTestCase(BaseSelectorTestCase):
+
+    SELECTOR = select.PollSelector
+
+
+ at unittest.skipUnless(hasattr(select, 'epoll'), "Test needs select.epoll()")
+class EpollSelectorTestCase(BaseSelectorTestCase):
+
+    SELECTOR = select.EpollSelector
+
+
 def test_main():
-    support.run_unittest(SelectTestCase)
+    tests = [SelectTestCase]
+    tests.extend([BasicSelectorTestCase, SelectorTestCase,
+                  SelectSelectorTestCase, PollSelectorTestCase,
+                  EpollSelectorTestCase])
+    support.run_unittest(*tests)
     support.reap_children()
 
+
 if __name__ == "__main__":
     test_main()
diff --git a/Modules/selectmodule.c b/Modules/selectmodule.c
--- a/Modules/selectmodule.c
+++ b/Modules/selectmodule.c
@@ -2129,7 +2129,7 @@
 
 static struct PyModuleDef selectmodule = {
     PyModuleDef_HEAD_INIT,
-    "select",
+    "_select",
     module_doc,
     -1,
     select_methods,
@@ -2143,7 +2143,7 @@
 
 
 PyMODINIT_FUNC
-PyInit_select(void)
+PyInit__select(void)
 {
     PyObject *m;
     m = PyModule_Create(&selectmodule);
diff --git a/setup.py b/setup.py
--- a/setup.py
+++ b/setup.py
@@ -623,7 +623,7 @@
             missing.append('spwd')
 
         # select(2); not on ancient System V
-        exts.append( Extension('select', ['selectmodule.c']) )
+        exts.append( Extension('_select', ['selectmodule.c']) )
 
         # Fred Drake's interface to the Python parser
         exts.append( Extension('parser', ['parsermodule.c']) )


More information about the Python-bugs-list mailing list