[Mailman-Developers] Patches to mailman-2.1.18 to use FreeBSD "kqueue"

Phil Budne phil at ultimate.com
Fri Oct 31 19:53:25 CET 2014


This may be of greatest interest to FreeBSD & MacOS "port"
maintainers..  I've been running these patches to mailman
mailman-2.1.18.1 to use "kqueue" to watch the queue directories since
August.

As you can see the idle components don't accumulate any CPU time
(they don't have to poll their queue directory):

mail% ps axwww | grep mailman
 1067  -  IWs      0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/mailmanctl -s -q start
 1068  -  IW       0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=ArchRunner:0:1 -s
 1069  -  I        0:02.76 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=BounceRunner:0:1 -s
 1070  -  IW       0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=CommandRunner:0:1 -s
 1071  -  I        0:02.85 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=IncomingRunner:0:1 -s
 1072  -  IW       0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=NewsRunner:0:1 -s
 1073  -  I        0:04.67 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=OutgoingRunner:0:1 -s
 1074  -  IW       0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=VirginRunner:0:1 -s
 1075  -  IW       0:00.00 /usr/local/bin/python2.7 /usr/local/mailman/bin/qrunner --runner=RetryRunner:0:1 -s

[this particular set of processes have been running since Oct 5th]

It looks like the "inotify" facility on Linux might do the job, but
it's not among Python's "included batteries" (yet)

I did a little bit of jiggering to periodic wakeups, I can't remember
at this point if it was all "strictly necessary".. I'd be happy to
have the work picked up (but would appreciate credit for the initial work).



--- Runner.py.orig	2014-07-11 15:01:26.000000000 -0400
+++ Runner.py	2014-08-15 11:24:02.000000000 -0400
@@ -15,9 +15,14 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
 # USA.
 
-"""Generic queue runner class.
+"""Generic queue runner class. (hacked for FreeBSD by phil at ultimate.com)
 """
 
+USE_KQUEUE = True
+
+import os                       # for kqueue
+import select                   # for kqueue
+import errno                    # for kqueue
 import time
 import traceback
 from cStringIO import StringIO
@@ -44,6 +49,7 @@
 class Runner:
     QDIR = None
     SLEEPTIME = mm_cfg.QRUNNER_SLEEP_TIME
+    PERIODIC = 0
 
     def __init__(self, slice=None, numslices=1):
         self._kids = {}
@@ -53,6 +59,21 @@
         # Create the shunt switchboard
         self._shunt = Switchboard(mm_cfg.SHUNTQUEUE_DIR)
         self._stop = False
+        # BEGIN phil at ultimate.com
+        self.kq = None
+        # See if BSD/OSX 'kqueue' can be used for dir watching:
+        if USE_KQUEUE and getattr(select, 'kqueue', None):
+            self._kqueue_init()
+        # Linux has inotify, but not among included batteries
+        # if other mechanisms implemented, should be hidden in a
+        # WatchDir() class, maybe should be in any case?!
+
+        # pulled up from BounceMixin
+        if self.PERIODIC > 0:
+            self._nextaction = time.time() + self.PERIODIC
+        else:
+            self._nextaction = -1
+        # END phil at ultimate.com
 
     def __repr__(self):
         return '<%s at %s>' % (self.__class__.__name__, id(self))
@@ -70,8 +91,8 @@
                     filecnt = self._oneloop()
                     # Do the periodic work for the subclass.  BAW: this
                     # shouldn't be called here.  There should be one more
-                    # _doperiodic() call at the end of the _oneloop() loop.
-                    self._doperiodic()
+                    # _checkperiodic() call at the end of the _oneloop() loop.
+                    self._checkperiodic()
                     # If the stop flag is set, we're done.
                     if self._stop:
                         break
@@ -146,7 +167,7 @@
                     self._switchboard.finish(filebase, preserve=True)
             # Other work we want to do each time through the loop
             Utils.reap(self._kids, once=True)
-            self._doperiodic()
+            self._checkperiodic()
             if self._shortcircuit():
                 break
         return len(files)
@@ -243,15 +264,31 @@
         """
         raise NotImplementedError
 
-    def _doperiodic(self):
-        """Do some processing `every once in a while'.
+    # BEGIN phil at ultimate.com
+    def _checkperiodic(self):
+        """Called every once in a while both from the Runner's main loop, and
+        from the Runner's hash slice processing loop. To check if time
+        to run periodic processing.
 
-        Called every once in a while both from the Runner's main loop, and
-        from the Runner's hash slice processing loop.  You can do whatever
+        If self.PERIODIC is positive, calls subclass _doperiodic() method
+        every self.PERIODIC seconds.
+        """
+        if self.PERIODIC <= 0:
+            return
+        now = time.time()
+        if self._nextaction > now:
+            return
+        self._nextaction = now + self.PERIODIC
+        # Run the subclass _myperiodic method:
+        self._myperiodic()
+
+    def _myperiodic(self):
+        """Do some processing `every once in a while'
+        (every self.PERIODIC seconds) You can do whatever
         special periodic processing you want here, and the return value is
         irrelevant.
         """
-        pass
+        syslog('error', 'empty _myperiodic called')
 
     def _snooze(self, filecnt):
         """Sleep for a little while.
@@ -263,8 +300,68 @@
         """
         if filecnt or self.SLEEPTIME <= 0:
             return
+        if USE_KQUEUE and self.kq:
+            self._kqueue_snooze()
+            return
         time.sleep(self.SLEEPTIME)
 
+    # BEGIN phil at ultimate.com
+    def _kqueue_init(self):
+        """Setup a kqueue fd listening for VNODE WRITE events in self.QDIR"""
+        self.kq = None
+        self.kq_dirfd = -1
+        try:
+            self.kq = select.kqueue()
+            self.kq_dirfd = os.open(self.QDIR, os.O_RDONLY)
+            ev = select.kevent(self.kq_dirfd,
+                               filter=select.KQ_FILTER_VNODE,
+                               flags =select.KQ_EV_ADD|select.KQ_EV_CLEAR,
+                               fflags=select.KQ_NOTE_WRITE)
+            # install ev, return no events, don't sleep
+            self.kq.control([ev], 0, 0)
+            return True
+        except Exception, e:
+            if self.kq:
+                self.kq.close()
+                self.kq = None
+            if self.kq_dirfd != -1:
+                os.close(self.kq_dirfd)
+                self.kq_dirfd  = -1
+        return False
+
+    def _kqueue_snooze(self):
+        """Sleep until activity in self.QDIR
+           or time to call _myperiodic()
+        """
+        wait = None             # default to indefinite
+        if self.PERIODIC > 0 and self._nextaction > 0:
+            wait = self._nextaction - time.time()
+            # periodic processing overdue, just return
+            if wait < 0:
+                return
+        if USE_KQUEUE and self.kq:
+            try:
+                syslog('phil', 'calling kq.control wait=%s', wait)
+                # no changes, return at most 10 events (should only return 1)
+                events = self.kq.control([], 10, wait)
+                syslog('phil', 'kq.control returned %d', len(events))
+                # ignore events!!
+                time.sleep(5.0) # TEMP!!!!
+                return
+            except KeyboardInterrupt:
+                raise
+            except OSError, exc:
+                # suppress EINTR, like time.sleep
+                if exc.errno == errno.EINTR:
+                    return
+                syslog('phil', 'kq.control exception: %s', exc)
+                #raise
+                # fall thru.....
+        if wait is None or wait > self.SLEEPTIME:
+            wait = self.SLEEPTIME
+        time.sleep(wait)
+    # END phil at ultimate.com
+
     def _shortcircuit(self):
         """Return a true value if the individual file processing loop should
         exit before it's finished processing each message in the current slice
--- BounceRunner.py.orig	2014-07-11 15:01:26.000000000 -0400
+++ BounceRunner.py	2014-08-14 09:55:00.000000000 -0400
@@ -46,8 +46,18 @@
 
 
 
-class BounceMixin:
-    def __init__(self):
+# phil at ultimate.com: the only users of BounceMixin
+# used "Runner" as a base, so refactoring as BounceRunnerBase
+# to avoid MRO end-runs (and added pain for PERIODIC),
+# seems to require fewer explicit parent class calls
+# (__init__, _cleanup) too!
+
+class BounceRunnerBase(Runner):
+    PERIODIC = mm_cfg.REGISTER_BOUNCES_EVERY
+
+    def __init__(self, slice=None, numslices=1):
+        Runner.__init__(self, slice, numslices)
+
         # Registering a bounce means acquiring the list lock, and it would be
         # too expensive to do this for each message.  Instead, each bounce
         # runner maintains an event log which is essentially a file with
@@ -84,7 +94,6 @@
             mm_cfg.DATA_DIR, 'bounce-events-%05d.pck' % os.getpid())
         self._bounce_events_fp = None
         self._bouncecnt = 0
-        self._nextaction = time.time() + mm_cfg.REGISTER_BOUNCES_EVERY
 
     def _queue_bounces(self, listname, addrs, msg):
         today = time.localtime()[:3]
@@ -135,14 +144,12 @@
     def _cleanup(self):
         if self._bouncecnt > 0:
             self._register_bounces()
+        Runner._cleanup(self)
 
-    def _doperiodic(self):
-        now = time.time()
-        if self._nextaction > now or self._bouncecnt == 0:
-            return
-        # Let's go ahead and register the bounces we've got stored up
-        self._nextaction = now + mm_cfg.REGISTER_BOUNCES_EVERY
-        self._register_bounces()
+    def _myperiodic(self):
+        """called every self.PERIODIC seconds"""
+        if self._bouncecnt > 0:
+            self._register_bounces()
 
     def _probe_bounce(self, mlist, token):
         locked = mlist.Locked()
@@ -161,13 +168,9 @@
 
 
 
-class BounceRunner(Runner, BounceMixin):
+class BounceRunner(BounceRunnerBase):
     QDIR = mm_cfg.BOUNCEQUEUE_DIR
 
-    def __init__(self, slice=None, numslices=1):
-        Runner.__init__(self, slice, numslices)
-        BounceMixin.__init__(self)
-
     def _dispose(self, mlist, msg, msgdata):
         # Make sure we have the most up-to-date state
         mlist.Load()
@@ -258,14 +261,6 @@
         # addrs = filter(None, addrs)
         # MAS above filter moved up so we don't try to queue an empty list.
         self._queue_bounces(mlist.internal_name(), addrs, msg)
-
-    _doperiodic = BounceMixin._doperiodic
-
-    def _cleanup(self):
-        BounceMixin._cleanup(self)
-        Runner._cleanup(self)
-
-
 
 def verp_bounce(mlist, msg):
     bmailbox, bdomain = Utils.ParseEmail(mlist.GetBouncesEmail())
--- OutgoingRunner.py.orig	2014-07-11 15:01:26.000000000 -0400
+++ OutgoingRunner.py	2014-07-26 11:42:18.000000000 -0400
@@ -31,12 +31,12 @@
 from Mailman import LockFile
 from Mailman.Queue.Runner import Runner
 from Mailman.Queue.Switchboard import Switchboard
-from Mailman.Queue.BounceRunner import BounceMixin
+from Mailman.Queue.BounceRunner import BounceRunnerBase
 from Mailman.Logging.Syslog import syslog
 
 # This controls how often _doperiodic() will try to deal with deferred
 # permanent failures.  It is a count of calls to _doperiodic()
-DEAL_WITH_PERMFAILURES_EVERY = 10
+DEAL_WITH_PERMFAILURES_EVERY = 10 # PLB not used?
 
 try:
     True, False
@@ -46,12 +46,11 @@
 
 
 
-class OutgoingRunner(Runner, BounceMixin):
+class OutgoingRunner(BounceRunnerBase):
     QDIR = mm_cfg.OUTQUEUE_DIR
 
     def __init__(self, slice=None, numslices=1):
-        Runner.__init__(self, slice, numslices)
-        BounceMixin.__init__(self)
+        BounceRunnerBase.__init__(self, slice, numslices)
         # We look this function up only at startup time
         modname = 'Mailman.Handlers.' + mm_cfg.DELIVERY_MODULE
         mod = __import__(modname)
@@ -131,9 +130,3 @@
                     self.__retryq.enqueue(msg, msgdata)
         # We've successfully completed handling of this message
         return False
-
-    _doperiodic = BounceMixin._doperiodic
-
-    def _cleanup(self):
-        BounceMixin._cleanup(self)
-        Runner._cleanup(self)


More information about the Mailman-Developers mailing list