[pypy-commit] pypy py3tests: First step, collecting and running apptest_*.py works
Floris Bruynooghe
pypy.commits at gmail.com
Mon Mar 19 13:24:28 EDT 2018
Author: Floris Bruynooghe <flub at google.com>
Branch: py3tests
Changeset: r94011:eea5b6bc977d
Date: 2018-03-19 11:28 +0100
http://bitbucket.org/pypy/pypy/changeset/eea5b6bc977d/
Log: First step, collecting and running apptest_*.py works
diff --git a/pypy/conftest.py b/pypy/conftest.py
--- a/pypy/conftest.py
+++ b/pypy/conftest.py
@@ -43,11 +43,10 @@
def py3k_skip(message):
py.test.skip('[py3k] %s' % message)
py.test.py3k_skip = py3k_skip
+ if config.getoption('runappdirect'):
+ config.addinivalue_line('python_files', 'apptest_*.py')
def pytest_addoption(parser):
- from rpython.conftest import pytest_addoption
- pytest_addoption(parser)
-
group = parser.getgroup("pypy options")
group.addoption('-A', '--runappdirect', action="store_true",
default=False, dest="runappdirect",
@@ -94,7 +93,8 @@
ensure_pytest_builtin_helpers()
def pytest_pycollect_makemodule(path, parent):
- return PyPyModule(path, parent)
+ if not parent.config.getoption('runappdirect'):
+ return PyPyModule(path, parent)
def is_applevel(item):
from pypy.tool.pytest.apptest import AppTestFunction
@@ -193,5 +193,7 @@
appclass.obj.runappdirect = option.runappdirect
-def pytest_ignore_collect(path):
+def pytest_ignore_collect(path, config):
+ if config.getoption('runappdirect') and not path.fnmatch('apptest_*.py'):
+ return True
return path.check(link=1)
diff --git a/pypy/interpreter/test/apptest_coroutine.py b/pypy/interpreter/test/apptest_coroutine.py
new file mode 100644
--- /dev/null
+++ b/pypy/interpreter/test/apptest_coroutine.py
@@ -0,0 +1,204 @@
+import pytest
+
+
+def test_cannot_iterate():
+ async def f(x):
+ pass
+ pytest.raises(TypeError, "for i in f(5): pass")
+ pytest.raises(TypeError, iter, f(5))
+ pytest.raises(TypeError, next, f(5))
+
+
+def test_async_for():
+ class X:
+ def __aiter__(self):
+ return MyAIter()
+ class MyAIter:
+ async def __anext__(self):
+ return 42
+ async def f(x):
+ sum = 0
+ async for a in x:
+ sum += a
+ if sum > 100:
+ break
+ return sum
+ cr = f(X())
+ try:
+ cr.send(None)
+ except StopIteration as e:
+ assert e.value == 42 * 3
+ else:
+ assert False, "should have raised"
+
+
+def test_StopAsyncIteration():
+ class X:
+ def __aiter__(self):
+ return MyAIter()
+ class MyAIter:
+ count = 0
+ async def __anext__(self):
+ if self.count == 3:
+ raise StopAsyncIteration
+ self.count += 1
+ return 42
+ async def f(x):
+ sum = 0
+ async for a in x:
+ sum += a
+ return sum
+ cr = f(X())
+ try:
+ cr.send(None)
+ except StopIteration as e:
+ assert e.value == 42 * 3
+ else:
+ assert False, "should have raised"
+
+
+def test_async_for_old_style():
+ class X:
+ def __aiter__(self):
+ return MyAIter()
+ class MyAIter:
+ def __await__(self):
+ return iter([20, 30])
+ async def f(x):
+ sum = 0
+ async for a in x:
+ sum += a
+ if sum > 100:
+ break
+ return sum
+ cr = f(X())
+ assert next(cr.__await__()) == 20
+
+
+def test_set_coroutine_wrapper():
+ import sys
+ async def f():
+ pass
+ seen = []
+ def my_wrapper(cr):
+ seen.append(cr)
+ return 42
+ assert sys.get_coroutine_wrapper() is None
+ sys.set_coroutine_wrapper(my_wrapper)
+ assert sys.get_coroutine_wrapper() is my_wrapper
+ cr = f()
+ assert cr == 42
+ sys.set_coroutine_wrapper(None)
+ assert sys.get_coroutine_wrapper() is None
+
+
+def test_async_with():
+ seen = []
+ class X:
+ async def __aenter__(self):
+ seen.append('aenter')
+ async def __aexit__(self, *args):
+ seen.append('aexit')
+ async def f(x):
+ async with x:
+ return 42
+ c = f(X())
+ try:
+ c.send(None)
+ except StopIteration as e:
+ assert e.value == 42
+ else:
+ assert False, "should have raised"
+ assert seen == ['aenter', 'aexit']
+
+
+def test_await():
+ class X:
+ def __await__(self):
+ i1 = yield 40
+ assert i1 == 82
+ i2 = yield 41
+ assert i2 == 93
+ async def f():
+ await X()
+ await X()
+ c = f()
+ assert c.send(None) == 40
+ assert c.send(82) == 41
+ assert c.send(93) == 40
+ assert c.send(82) == 41
+ pytest.raises(StopIteration, c.send, 93)
+
+
+def test_await_error():
+ async def f():
+ await [42]
+ c = f()
+ try:
+ c.send(None)
+ except TypeError as e:
+ assert str(e) == "object list can't be used in 'await' expression"
+ else:
+ assert False, "should have raised"
+
+
+def test_async_with_exception_context():
+ class CM:
+ async def __aenter__(self):
+ pass
+ async def __aexit__(self, *e):
+ 1/0
+ async def f():
+ async with CM():
+ raise ValueError
+ c = f()
+ try:
+ c.send(None)
+ except ZeroDivisionError as e:
+ assert e.__context__ is not None
+ assert isinstance(e.__context__, ValueError)
+ else:
+ assert False, "should have raised"
+
+
+def test_runtime_warning():
+ import gc, warnings
+ async def foobaz():
+ pass
+ with warnings.catch_warnings(record=True) as l:
+ foobaz()
+ gc.collect()
+ gc.collect()
+ gc.collect()
+
+ assert len(l) == 1, repr(l)
+ w = l[0].message
+ assert isinstance(w, RuntimeWarning)
+ assert str(w).startswith("coroutine ")
+ assert str(w).endswith("foobaz' was never awaited")
+
+
+def test_async_for_with_tuple_subclass():
+ class Done(Exception): pass
+
+ class AIter(tuple):
+ i = 0
+ def __aiter__(self):
+ return self
+ async def __anext__(self):
+ if self.i >= len(self):
+ raise StopAsyncIteration
+ self.i += 1
+ return self[self.i - 1]
+
+ result = []
+ async def foo():
+ async for i in AIter([42]):
+ result.append(i)
+ raise Done
+
+ try:
+ foo().send(None)
+ except Done:
+ pass
+ assert result == [42]
More information about the pypy-commit
mailing list