[Python-checkins] cpython (3.4): Issue #22685: Fix test_pause_reading() of asyncio/test_subprocess

victor.stinner python-checkins at python.org
Thu Jan 15 23:01:28 CET 2015


https://hg.python.org/cpython/rev/992ce0dcfb29
changeset:   94186:992ce0dcfb29
branch:      3.4
parent:      94184:ecbde0b31f6f
user:        Victor Stinner <victor.stinner at gmail.com>
date:        Thu Jan 15 22:52:59 2015 +0100
summary:
  Issue #22685: Fix test_pause_reading() of asyncio/test_subprocess

Override the connect_read_pipe() method of the loop to mock immediatly
pause_reading() and resume_reading() methods.

The test failed randomly on FreeBSD 9 buildbot and on Windows using trollius.

files:
  Lib/test/test_asyncio/test_subprocess.py |  14 ++++++++++-
  1 files changed, 12 insertions(+), 2 deletions(-)


diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py
--- a/Lib/test/test_asyncio/test_subprocess.py
+++ b/Lib/test/test_asyncio/test_subprocess.py
@@ -179,6 +179,18 @@
                 'sys.stdout.write("x" * %s)' % size,
                 'sys.stdout.flush()',
             ))
+
+            connect_read_pipe = self.loop.connect_read_pipe
+
+            @asyncio.coroutine
+            def connect_read_pipe_mock(*args, **kw):
+                transport, protocol = yield from connect_read_pipe(*args, **kw)
+                transport.pause_reading = mock.Mock()
+                transport.resume_reading = mock.Mock()
+                return (transport, protocol)
+
+            self.loop.connect_read_pipe = connect_read_pipe_mock
+
             proc = yield from asyncio.create_subprocess_exec(
                                          sys.executable, '-c', code,
                                          stdin=asyncio.subprocess.PIPE,
@@ -186,8 +198,6 @@
                                          limit=limit,
                                          loop=self.loop)
             stdout_transport = proc._transport.get_pipe_transport(1)
-            stdout_transport.pause_reading = mock.Mock()
-            stdout_transport.resume_reading = mock.Mock()
 
             stdout, stderr = yield from proc.communicate()
 

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list