[pypy-commit] pypy py3tests: First step, collecting and running apptest_*.py works

Floris Bruynooghe pypy.commits at gmail.com
Mon Mar 19 13:24:28 EDT 2018


Author: Floris Bruynooghe <flub at google.com>
Branch: py3tests
Changeset: r94011:eea5b6bc977d
Date: 2018-03-19 11:28 +0100
http://bitbucket.org/pypy/pypy/changeset/eea5b6bc977d/

Log:	First step, collecting and running apptest_*.py works

diff --git a/pypy/conftest.py b/pypy/conftest.py
--- a/pypy/conftest.py
+++ b/pypy/conftest.py
@@ -43,11 +43,10 @@
     def py3k_skip(message):
         py.test.skip('[py3k] %s' % message)
     py.test.py3k_skip = py3k_skip
+    if config.getoption('runappdirect'):
+        config.addinivalue_line('python_files', 'apptest_*.py')
 
 def pytest_addoption(parser):
-    from rpython.conftest import pytest_addoption
-    pytest_addoption(parser)
-
     group = parser.getgroup("pypy options")
     group.addoption('-A', '--runappdirect', action="store_true",
            default=False, dest="runappdirect",
@@ -94,7 +93,8 @@
     ensure_pytest_builtin_helpers()
 
 def pytest_pycollect_makemodule(path, parent):
-    return PyPyModule(path, parent)
+    if not parent.config.getoption('runappdirect'):
+        return PyPyModule(path, parent)
 
 def is_applevel(item):
     from pypy.tool.pytest.apptest import AppTestFunction
@@ -193,5 +193,7 @@
             appclass.obj.runappdirect = option.runappdirect
 
 
-def pytest_ignore_collect(path):
+def pytest_ignore_collect(path, config):
+    if config.getoption('runappdirect') and not path.fnmatch('apptest_*.py'):
+        return True
     return path.check(link=1)
diff --git a/pypy/interpreter/test/apptest_coroutine.py b/pypy/interpreter/test/apptest_coroutine.py
new file mode 100644
--- /dev/null
+++ b/pypy/interpreter/test/apptest_coroutine.py
@@ -0,0 +1,204 @@
+import pytest
+
+
+def test_cannot_iterate():
+    async def f(x):
+        pass
+    pytest.raises(TypeError, "for i in f(5): pass")
+    pytest.raises(TypeError, iter, f(5))
+    pytest.raises(TypeError, next, f(5))
+
+
+def test_async_for():
+    class X:
+        def __aiter__(self):
+            return MyAIter()
+    class MyAIter:
+        async def __anext__(self):
+            return 42
+    async def f(x):
+        sum = 0
+        async for a in x:
+            sum += a
+            if sum > 100:
+                break
+        return sum
+    cr = f(X())
+    try:
+        cr.send(None)
+    except StopIteration as e:
+        assert e.value == 42 * 3
+    else:
+        assert False, "should have raised"
+
+
+def test_StopAsyncIteration():
+    class X:
+        def __aiter__(self):
+            return MyAIter()
+    class MyAIter:
+        count = 0
+        async def __anext__(self):
+            if self.count == 3:
+                raise StopAsyncIteration
+            self.count += 1
+            return 42
+    async def f(x):
+        sum = 0
+        async for a in x:
+            sum += a
+        return sum
+    cr = f(X())
+    try:
+        cr.send(None)
+    except StopIteration as e:
+        assert e.value == 42 * 3
+    else:
+        assert False, "should have raised"
+
+
+def test_async_for_old_style():
+    class X:
+        def __aiter__(self):
+            return MyAIter()
+    class MyAIter:
+        def __await__(self):
+            return iter([20, 30])
+    async def f(x):
+        sum = 0
+        async for a in x:
+            sum += a
+            if sum > 100:
+                break
+        return sum
+    cr = f(X())
+    assert next(cr.__await__()) == 20
+
+
+def test_set_coroutine_wrapper():
+    import sys
+    async def f():
+        pass
+    seen = []
+    def my_wrapper(cr):
+        seen.append(cr)
+        return 42
+    assert sys.get_coroutine_wrapper() is None
+    sys.set_coroutine_wrapper(my_wrapper)
+    assert sys.get_coroutine_wrapper() is my_wrapper
+    cr = f()
+    assert cr == 42
+    sys.set_coroutine_wrapper(None)
+    assert sys.get_coroutine_wrapper() is None
+
+
+def test_async_with():
+    seen = []
+    class X:
+        async def __aenter__(self):
+            seen.append('aenter')
+        async def __aexit__(self, *args):
+            seen.append('aexit')
+    async def f(x):
+        async with x:
+            return 42
+    c = f(X())
+    try:
+        c.send(None)
+    except StopIteration as e:
+        assert e.value == 42
+    else:
+        assert False, "should have raised"
+    assert seen == ['aenter', 'aexit']
+
+
+def test_await():
+    class X:
+        def __await__(self):
+            i1 = yield 40
+            assert i1 == 82
+            i2 = yield 41
+            assert i2 == 93
+    async def f():
+        await X()
+        await X()
+    c = f()
+    assert c.send(None) == 40
+    assert c.send(82) == 41
+    assert c.send(93) == 40
+    assert c.send(82) == 41
+    pytest.raises(StopIteration, c.send, 93)
+
+
+def test_await_error():
+    async def f():
+        await [42]
+    c = f()
+    try:
+        c.send(None)
+    except TypeError as e:
+        assert str(e) == "object list can't be used in 'await' expression"
+    else:
+        assert False, "should have raised"
+
+
+def test_async_with_exception_context():
+    class CM:
+        async def __aenter__(self):
+            pass
+        async def __aexit__(self, *e):
+            1/0
+    async def f():
+        async with CM():
+            raise ValueError
+    c = f()
+    try:
+        c.send(None)
+    except ZeroDivisionError as e:
+        assert e.__context__ is not None
+        assert isinstance(e.__context__, ValueError)
+    else:
+        assert False, "should have raised"
+
+
+def test_runtime_warning():
+    import gc, warnings
+    async def foobaz():
+        pass
+    with warnings.catch_warnings(record=True) as l:
+        foobaz()
+        gc.collect()
+        gc.collect()
+        gc.collect()
+
+    assert len(l) == 1, repr(l)
+    w = l[0].message
+    assert isinstance(w, RuntimeWarning)
+    assert str(w).startswith("coroutine ")
+    assert str(w).endswith("foobaz' was never awaited")
+
+
+def test_async_for_with_tuple_subclass():
+    class Done(Exception): pass
+
+    class AIter(tuple):
+        i = 0
+        def __aiter__(self):
+            return self
+        async def __anext__(self):
+            if self.i >= len(self):
+                raise StopAsyncIteration
+            self.i += 1
+            return self[self.i - 1]
+
+    result = []
+    async def foo():
+        async for i in AIter([42]):
+            result.append(i)
+        raise Done
+
+    try:
+        foo().send(None)
+    except Done:
+        pass
+    assert result == [42]


More information about the pypy-commit mailing list