[pypy-commit] pypy stmgc-c8: task_done(), join() on stm.queue

arigo noreply at buildbot.pypy.org
Thu Jun 18 14:56:28 CEST 2015


Author: Armin Rigo <arigo at tunes.org>
Branch: stmgc-c8
Changeset: r78178:5a114ddbc08d
Date: 2015-06-18 14:56 +0200
http://bitbucket.org/pypy/pypy/changeset/5a114ddbc08d/

Log:	task_done(), join() on stm.queue

diff --git a/pypy/module/pypystm/queue.py b/pypy/module/pypystm/queue.py
--- a/pypy/module/pypystm/queue.py
+++ b/pypy/module/pypystm/queue.py
@@ -32,14 +32,8 @@
     @unwrap_spec(block=int)
     def get_w(self, space, block=1, w_timeout=None):
         """Remove and return an item from the queue.
-
-        If optional args 'block' is true and 'timeout' is None (the default),
-        block if necessary until an item is available. If 'timeout' is
-        a non-negative number, it blocks at most 'timeout' seconds and raises
-        the Empty exception if no item was available within that time.
-        Otherwise ('block' is false), return an item if one is immediately
-        available, else raise the Empty exception ('timeout' is ignored
-        in that case).
+        The 'block' and 'timeout' arguments are like Queue.Queue.get().
+        Note that using them is inefficient so far.
         """
         if block == 0:
             timeout = 0.0     # 'w_timeout' ignored in this case
@@ -62,6 +56,28 @@
                                  space.w_None)
         return cast_gcref_to_instance(W_Root, gcref)
 
+    def task_done_w(self, space):
+        """Indicate that a formerly enqueued task is complete.
+        See Queue.Queue.task_done().
+
+        Note that we cannot easily detect if task_done() is called more
+        times than there were items placed in the queue.  This situation
+        is detect by join() instead.
+        """
+        self.q.task_done()
+
+    def join_w(self, space):
+        """Blocks until all items in the Queue have been gotten and processed.
+        See Queue.Queue.join().
+
+        Raises ValueError if we detect that task_done() has been called
+        more times than there were items placed in the queue.
+        """
+        res = self.q.join()
+        if res != 0:
+            raise oefmt('task_done() called too many times (%d more than '
+                        'there were items placed in the queue)', -res)
+
 
 def W_Queue___new__(space, w_subtype):
     r = space.allocate_instance(W_Queue, w_subtype)
@@ -73,4 +89,6 @@
     __new__ = interp2app(W_Queue___new__),
     get = interp2app(W_Queue.get_w),
     put = interp2app(W_Queue.put_w),
+    task_done = interp2app(W_Queue.task_done_w),
+    join = interp2app(W_Queue.join_w),
 )
diff --git a/pypy/module/pypystm/test/test_queue.py b/pypy/module/pypystm/test/test_queue.py
--- a/pypy/module/pypystm/test/test_queue.py
+++ b/pypy/module/pypystm/test/test_queue.py
@@ -18,3 +18,12 @@
         q.put(obj)
         obj1 = q.get(timeout=0.01)
         assert obj1 is obj
+
+    def test_task_done(self):
+        import pypystm
+        q = pypystm.queue()
+        q.put([])
+        q.get()
+        # --q.join() here would cause deadlock, but hard to test--
+        q.task_done()
+        q.join()


More information about the pypy-commit mailing list