Queue.Queue-like class without the busy-wait

David Bolen db3l at fitlinxx.com
Fri Apr 1 18:05:42 EST 2005


"Paul L. Du Bois" <polytope at gmail.com> writes:

> Has anyone written a Queue.Queue replacement that avoids busy-waiting?
> It doesn't matter if it uses os-specific APIs (eg
> WaitForMultipleObjects).  I did some googling around and haven't found
> anything so far.

This isn't a Queue.Queue replacement, but it implements a buffer
intended for inter-thread transmission, so it could be adjusted to
mimic Queue semantics fairly easily.  In fact, internally it actually
keeps write chunks in a list until read for better performance, so
just removing the coalesce process would be the first step.

It was written specifically to minimize latency (which is a
significant issue with the polling loop in the normal Python Queue
implementation) and CPU usage in support of a higher level
Win32-specific serial I/O class, so it uses Win32 events to handle the
signaling for the key events when waiting.

The fundamental issue with the native Python lock is that to be
minimalistic in what it requires from each OS, it doesn't impose a
model of being able to wait on an event signal - that's the key thing
you need to have (a timed blocking wait on some signalable construct)
to be most efficient for these operations - which is what I use the
Win32 Event for.

-- David

          - - - - - - - - - - - - - - - - - - - - - - - - -

import thread
import win32event as we

class Buffer:
    """A thread safe unidirectional data buffer used to represent data
    traveling to or from the application and serial port handling threads.

    This class is used as an underlying implementation mechanism by SerialIO.
    Application code should not typically need to access this directly, but
    can handle I/O through SerialIO.

    Note that we use Windows event objects rather than Python's because
    Python's OS-independent versions are not very efficient with timed waits,
    imposing internal latencies and CPU usage due to looping around a basic
    non-blocking construct.  We also use the lower layer thread lock rather
    than threading's to minimize overhead.
    """

    def __init__(self, notify=None):
        self.lock = thread.allocate_lock()
        self.has_data = we.CreateEvent(None,1,0,None)
        self.clear()
        self.notify = notify

    def _coalesce(self):
        if self.buflist:
            self.buffer += ''.join(self.buflist)
            self.buflist = []

    def __len__(self):
        self.lock.acquire()
        self._coalesce()
        result = len(self.buffer)
        self.lock.release()
        return result

    def clear(self):
        self.lock.acquire()
        self.buffer = ''
        self.buflist = []
        self.lock.release()

    def get(self, size=0, timeout=None):
        """Retrieve data from the buffer, up to 'size' bytes (unlimited if
        0), but potentially less based on what is available.  If no
        data is currently available, it will wait up to 'timeout' seconds
        (forever if None, no blocking if 0) for some data to arrive"""

        self.lock.acquire()
        self._coalesce()

        if not self.buffer:
            # Nothing buffered, wait until something shows up (timeout
            # rules match that of threading.Event)
            self.lock.release()
            if timeout is None:
                win_timeout = we.INFINITE
            else:
                win_timeout = int(timeout * 1000)
            rc = we.WaitForSingleObject(self.has_data, win_timeout)
            self.lock.acquire()
            self._coalesce()
        
        if not size:
            size = len(self.buffer)
            
        result_len = min(size,len(self.buffer))
        result = self.buffer[:result_len]
        self.buffer = self.buffer[result_len:]
        we.ResetEvent(self.has_data)
        self.lock.release()
        return result

    def put_back(self,data):
        self.lock.acquire()
        self.buffer = data + self.buffer
        self.lock.release()
        we.SetEvent(self.has_data)
        if self.notify:
            self.notify()

    def put(self, data):
        self.lock.acquire()
        self.buflist.append(data)
        self.lock.release()
        we.SetEvent(self.has_data)
        if self.notify:
            self.notify()



More information about the Python-list mailing list