[Python-checkins] cpython (merge 3.4 -> default): Merge 3.4 (asyncio)

victor.stinner python-checkins at python.org
Tue Feb 17 23:14:58 CET 2015


https://hg.python.org/cpython/rev/6f75c7c6e260
changeset:   94666:6f75c7c6e260
parent:      94662:c0b2dacef35c
parent:      94665:0f6ddf944521
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Tue Feb 17 22:55:36 2015 +0100
summary:
  Merge 3.4 (asyncio)

files:
  Lib/asyncio/base_subprocess.py           |    2 +
  Lib/asyncio/queues.py                    |  104 ++++------
  Lib/test/test_asyncio/test_queues.py     |   10 +-
  Lib/test/test_asyncio/test_subprocess.py |   12 +-
  4 files changed, 61 insertions(+), 67 deletions(-)


diff --git a/Lib/asyncio/base_subprocess.py b/Lib/asyncio/base_subprocess.py
--- a/Lib/asyncio/base_subprocess.py
+++ b/Lib/asyncio/base_subprocess.py
@@ -57,6 +57,8 @@
         info.append('pid=%s' % self._pid)
         if self._returncode is not None:
             info.append('returncode=%s' % self._returncode)
+        else:
+            info.append('running')
 
         stdin = self._pipes.get(0)
         if stdin is not None:
diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py
--- a/Lib/asyncio/queues.py
+++ b/Lib/asyncio/queues.py
@@ -1,7 +1,7 @@
 """Queues"""
 
-__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue',
-           'QueueFull', 'QueueEmpty']
+__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty',
+           'JoinableQueue']
 
 import collections
 import heapq
@@ -49,6 +49,9 @@
         self._getters = collections.deque()
         # Pairs of (item, Future).
         self._putters = collections.deque()
+        self._unfinished_tasks = 0
+        self._finished = locks.Event(loop=self._loop)
+        self._finished.set()
         self._init(maxsize)
 
     def _init(self, maxsize):
@@ -59,6 +62,8 @@
 
     def _put(self, item):
         self._queue.append(item)
+        self._unfinished_tasks += 1
+        self._finished.clear()
 
     def __repr__(self):
         return '<{} at {:#x} {}>'.format(
@@ -75,6 +80,8 @@
             result += ' _getters[{}]'.format(len(self._getters))
         if self._putters:
             result += ' _putters[{}]'.format(len(self._putters))
+        if self._unfinished_tasks:
+            result += ' tasks={}'.format(self._unfinished_tasks)
         return result
 
     def _consume_done_getters(self):
@@ -126,9 +133,6 @@
                 'queue non-empty, why are getters waiting?')
 
             getter = self._getters.popleft()
-
-            # Use _put and _get instead of passing item straight to getter, in
-            # case a subclass has logic that must run (e.g. JoinableQueue).
             self._put(item)
 
             # getter cannot be cancelled, we just removed done getters
@@ -154,9 +158,6 @@
                 'queue non-empty, why are getters waiting?')
 
             getter = self._getters.popleft()
-
-            # Use _put and _get instead of passing item straight to getter, in
-            # case a subclass has logic that must run (e.g. JoinableQueue).
             self._put(item)
 
             # getter cannot be cancelled, we just removed done getters
@@ -219,56 +220,6 @@
         else:
             raise QueueEmpty
 
-
-class PriorityQueue(Queue):
-    """A subclass of Queue; retrieves entries in priority order (lowest first).
-
-    Entries are typically tuples of the form: (priority number, data).
-    """
-
-    def _init(self, maxsize):
-        self._queue = []
-
-    def _put(self, item, heappush=heapq.heappush):
-        heappush(self._queue, item)
-
-    def _get(self, heappop=heapq.heappop):
-        return heappop(self._queue)
-
-
-class LifoQueue(Queue):
-    """A subclass of Queue that retrieves most recently added entries first."""
-
-    def _init(self, maxsize):
-        self._queue = []
-
-    def _put(self, item):
-        self._queue.append(item)
-
-    def _get(self):
-        return self._queue.pop()
-
-
-class JoinableQueue(Queue):
-    """A subclass of Queue with task_done() and join() methods."""
-
-    def __init__(self, maxsize=0, *, loop=None):
-        super().__init__(maxsize=maxsize, loop=loop)
-        self._unfinished_tasks = 0
-        self._finished = locks.Event(loop=self._loop)
-        self._finished.set()
-
-    def _format(self):
-        result = Queue._format(self)
-        if self._unfinished_tasks:
-            result += ' tasks={}'.format(self._unfinished_tasks)
-        return result
-
-    def _put(self, item):
-        super()._put(item)
-        self._unfinished_tasks += 1
-        self._finished.clear()
-
     def task_done(self):
         """Indicate that a formerly enqueued task is complete.
 
@@ -294,9 +245,42 @@
         """Block until all items in the queue have been gotten and processed.
 
         The count of unfinished tasks goes up whenever an item is added to the
-        queue. The count goes down whenever a consumer thread calls task_done()
-        to indicate that the item was retrieved and all work on it is complete.
+        queue. The count goes down whenever a consumer calls task_done() to
+        indicate that the item was retrieved and all work on it is complete.
         When the count of unfinished tasks drops to zero, join() unblocks.
         """
         if self._unfinished_tasks > 0:
             yield from self._finished.wait()
+
+
+class PriorityQueue(Queue):
+    """A subclass of Queue; retrieves entries in priority order (lowest first).
+
+    Entries are typically tuples of the form: (priority number, data).
+    """
+
+    def _init(self, maxsize):
+        self._queue = []
+
+    def _put(self, item, heappush=heapq.heappush):
+        heappush(self._queue, item)
+
+    def _get(self, heappop=heapq.heappop):
+        return heappop(self._queue)
+
+
+class LifoQueue(Queue):
+    """A subclass of Queue that retrieves most recently added entries first."""
+
+    def _init(self, maxsize):
+        self._queue = []
+
+    def _put(self, item):
+        self._queue.append(item)
+
+    def _get(self):
+        return self._queue.pop()
+
+
+JoinableQueue = Queue
+"""Deprecated alias for Queue."""
diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py
--- a/Lib/test/test_asyncio/test_queues.py
+++ b/Lib/test/test_asyncio/test_queues.py
@@ -408,14 +408,14 @@
         self.assertEqual([1, 2, 3], items)
 
 
-class JoinableQueueTests(_QueueTestBase):
+class QueueJoinTests(_QueueTestBase):
 
     def test_task_done_underflow(self):
-        q = asyncio.JoinableQueue(loop=self.loop)
+        q = asyncio.Queue(loop=self.loop)
         self.assertRaises(ValueError, q.task_done)
 
     def test_task_done(self):
-        q = asyncio.JoinableQueue(loop=self.loop)
+        q = asyncio.Queue(loop=self.loop)
         for i in range(100):
             q.put_nowait(i)
 
@@ -452,7 +452,7 @@
         self.loop.run_until_complete(asyncio.wait(tasks, loop=self.loop))
 
     def test_join_empty_queue(self):
-        q = asyncio.JoinableQueue(loop=self.loop)
+        q = asyncio.Queue(loop=self.loop)
 
         # Test that a queue join()s successfully, and before anything else
         # (done twice for insurance).
@@ -465,7 +465,7 @@
         self.loop.run_until_complete(join())
 
     def test_format(self):
-        q = asyncio.JoinableQueue(loop=self.loop)
+        q = asyncio.Queue(loop=self.loop)
         self.assertEqual(q._format(), 'maxsize=0')
 
         q._unfinished_tasks = 2
diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -355,11 +355,19 @@
             create = self.loop.subprocess_exec(asyncio.SubprocessProtocol,
                                                *PROGRAM_BLOCKED)
             transport, protocol = yield from create
+
+            kill_called = False
+            def kill():
+                nonlocal kill_called
+                kill_called = True
+                orig_kill()
+
             proc = transport.get_extra_info('subprocess')
-            proc.kill = mock.Mock()
+            orig_kill = proc.kill
+            proc.kill = kill
             returncode = transport.get_returncode()
             transport.close()
-            return (returncode, proc.kill.called)
+            return (returncode, kill_called)
 
         # Ignore "Close running child process: kill ..." log
         with test_utils.disable_logger():

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list