[py-svn] r57169 - in py/branch/event/py/test2: . dist dist/testing testing
hpk at codespeak.net
hpk at codespeak.net
Sun Aug 10 17:40:02 CEST 2008
Author: hpk
Date: Sun Aug 10 17:40:02 2008
New Revision: 57169
Added:
py/branch/event/py/test2/dist/dsession.py
- copied, changed from r57168, py/branch/event/py/test2/dist/async.py
py/branch/event/py/test2/dist/testing/test_dsession.py
- copied, changed from r57168, py/branch/event/py/test2/dist/testing/test_async.py
py/branch/event/py/test2/dist/testing/test_functional_dsession.py
- copied, changed from r57165, py/branch/event/py/test2/dist/testing/test_async_functional.py
Removed:
py/branch/event/py/test2/dist/async.py
py/branch/event/py/test2/dist/testing/test_async.py
py/branch/event/py/test2/dist/testing/test_async_functional.py
Modified:
py/branch/event/py/test2/async.py
py/branch/event/py/test2/config.py
py/branch/event/py/test2/repevent.py
py/branch/event/py/test2/testing/test_config.py
Log:
rename the somewhat odd "async" to DSession
Modified: py/branch/event/py/test2/async.py
==============================================================================
--- py/branch/event/py/test2/async.py (original)
+++ py/branch/event/py/test2/async.py Sun Aug 10 17:40:02 2008
@@ -1,6 +1,6 @@
"""
- EXPERIMENTAL async session (for dist/non-dist unification)
+ EXPERIMENTAL dsession session (for dist/non-dist unification)
"""
@@ -16,7 +16,7 @@
from runner import basic_run_report, basic_collect_report, skip_report
import Queue
-class AsyncSession(object):
+class DSession(object):
"""
Session drives the collection and running of tests
and generates test events for reporters.
Modified: py/branch/event/py/test2/config.py
==============================================================================
--- py/branch/event/py/test2/config.py (original)
+++ py/branch/event/py/test2/config.py Sun Aug 10 17:40:02 2008
@@ -178,7 +178,7 @@
""" return default session name as determined from options. """
name = 'Session'
if self.option.dist:
- name = 'AsyncSession'
+ name = 'DSession'
else:
if self.option.looponfailing or self.option.executable:
name = 'RemoteTerminalSession'
@@ -217,7 +217,7 @@
Session = 'py.__.test2.session'
RemoteTerminalSession = 'py.__.test2.terminal.remote'
-AsyncSession = 'py.__.test2.dist.async'
+DSession = 'py.__.test2.dist.dsession'
#
# helpers
Deleted: /py/branch/event/py/test2/dist/async.py
==============================================================================
--- /py/branch/event/py/test2/dist/async.py Sun Aug 10 17:40:02 2008
+++ (empty file)
@@ -1,237 +0,0 @@
-"""
-
- EXPERIMENTAL async session (for dist/non-dist unification)
-
-"""
-
-import py
-from py.__.test2 import repevent
-from py.__.test2.eventbus import EventBus
-import py.__.test2.custompdb
-from py.__.test2.dist.hostmanage import HostManager
-
-# used for genitems()
-from py.__.test2.outcome import Exit
-Item = (py.test.collect.Item, py.test2.collect.Item)
-Collector = (py.test.collect.Collector, py.test2.collect.Collector)
-from py.__.test2.runner import basic_run_report, basic_collect_report, skip_report
-from py.__.test2.session import Session
-
-import Queue
-
-class LoopState(object):
- def __init__(self, colitems):
- self.colitems = colitems
- self.exitstatus = None
- # loopstate.dowork is False after reschedule events
- # because otherwise we might very busily loop
- # waiting for a host to become ready.
- self.dowork = True
- self.shuttingdown = False
- self.testsfailed = False
-
-class AsyncSession(Session):
- """
- Session drives the collection and running of tests
- and generates test events for reporters.
- """
- MAXITEMSPERHOST = 10
- EXIT_OK = 0
- EXIT_TESTSFAILED = 1
- EXIT_INTERRUPTED = 2
- EXIT_INTERNALERROR = 3
- EXIT_NOHOSTS = 4
-
- def __init__(self, config):
- super(AsyncSession, self).__init__(config=config)
-
- self.queue = py.std.Queue.Queue()
- self.host2pending = {}
- self.item2host = {}
- self.hosts = []
- self._testsfailed = False
-
- def fixoptions(self):
- """ check, fix and determine conflicting options. """
- option = self.config.option
- if option.runbrowser and not option.startserver:
- #print "--runbrowser implies --startserver"
- option.startserver = True
- if self.config.getvalue("dist_boxed") and option.dist:
- option.boxed = True
- # conflicting options
- if option.looponfailing and option.usepdb:
- raise ValueError, "--looponfailing together with --pdb not supported."
- if option.looponfailing and option.dist:
- raise ValueError, "--looponfailing together with --dist not supported."
- if option.executable and option.usepdb:
- raise ValueError, "--exec together with --pdb not supported."
- if option.keyword_oneshot and not option.keyword:
- raise ValueError, "--keyword-oneshot makes sense only when --keyword is supplied"
-
- config = self.config
- try:
- config.getvalue('dist_hosts')
- except KeyError:
- print "For running ad-hoc distributed tests you need to specify"
- print "dist_hosts in a local conftest.py file, for example:"
- print "for example:"
- print
- print " dist_hosts = ['localhost'] * 4 # for 3 processors"
- print " dist_hosts = ['you at remote.com', '...'] # for testing on ssh accounts"
- print " # with your remote ssh accounts"
- print
- print "see also: http://codespeak.net/py/current/doc/test.html#automated-distributed-testing"
- raise SystemExit
-
- def main(self):
- colitems = [self.config.getfsnode(arg) for arg in self.config.args]
- self.sessionstarts()
- exitstatus = self.loop(colitems)
- self.sessionfinishes(exitstatus=exitstatus)
-
- def loop_once(self, loopstate):
- if loopstate.shuttingdown:
- return self.loop_once_shutdown(loopstate)
- colitems = loopstate.colitems
- if loopstate.dowork and loopstate.colitems:
- self.triggertesting(colitems)
- colitems[:] = []
- ev = self.queue.get()
- loopstate.dowork = True
- self.bus.notify(ev)
- if isinstance(ev, repevent.HostDown):
- pending = self.removehost(ev.host)
- colitems.extend(pending)
- elif isinstance(ev, repevent.RescheduleItems):
- colitems.extend(ev.items)
- loopstate.dowork = False # avoid busywait
- elif isinstance(ev, repevent.ItemTestReport):
- self.removeitem(ev.colitem)
- if ev.failed:
- loopstate.testsfailed = True
- elif isinstance(ev, repevent.CollectionReport):
- if ev.passed:
- colitems.extend(ev.result)
-
- # termination conditions
- if ((loopstate.testsfailed and self.config.option.exitfirst) or
- (not self.item2host and not colitems and not self.queue.qsize())):
- self.triggershutdown()
- loopstate.shuttingdown = True
- elif not self.host2pending:
- loopstate.exitstatus = self.EXIT_NOHOSTS
-
- def loop_once_shutdown(self, loopstate):
- # once we are in shutdown mode we dont send
- # events other than HostDown upstream
- ev = self.queue.get()
- if isinstance(ev, repevent.HostDown):
- self.bus.notify(ev)
- self.removehost(ev.host)
- if not self.host2pending:
- # finished
- if loopstate.testsfailed:
- loopstate.exitstatus = self.EXIT_TESTSFAILED
- else:
- loopstate.exitstatus = self.EXIT_OK
-
- def loop(self, colitems):
- try:
- loopstate = LoopState(colitems)
- while 1:
- self.loop_once(loopstate)
- if loopstate.exitstatus is not None:
- exitstatus = loopstate.exitstatus
- break
- except KeyboardInterrupt:
- exitstatus = self.EXIT_INTERRUPTED
- except:
- self.bus.notify(repevent.InternalException())
- exitstatus = self.EXIT_INTERNALERROR
- if exitstatus == 0 and self._testsfailed:
- exitstatus = self.EXIT_TESTSFAILED
- return exitstatus
-
- def triggershutdown(self):
- for host in self.hosts:
- host.node.shutdown()
-
- def addhost(self, host):
- assert host not in self.host2pending
- assert host not in self.hosts
- self.host2pending[host] = []
- self.hosts.append(host)
-
- def removehost(self, host):
- self.hosts.remove(host)
- return self.host2pending.pop(host)
-
- def triggertesting(self, colitems):
- colitems = self.filteritems(colitems)
- senditems = []
- for next in colitems:
- if isinstance(next, Item):
- senditems.append(next)
- else:
- ev = basic_collect_report(next)
- self.queue.put(ev)
- self.senditems(senditems)
-
- def senditems(self, tosend):
- if not tosend:
- return
- for host, pending in self.host2pending.items():
- room = self.MAXITEMSPERHOST - len(pending)
- if room > 0:
- sending = tosend[:room]
- host.node.sendlist(sending)
- for item in sending:
- self.item2host[item] = host
- pending.extend(sending)
- tosend[:] = tosend[room:] # update inplace
- if not tosend:
- break
- if tosend:
- # we have some left, give it to the main loop
- self.queue.put(repevent.RescheduleItems(tosend))
-
- def removeitem(self, item):
- host = self.item2host.pop(item)
- self.host2pending[host].remove(item)
-
- def filteritems(self, colitems):
- """ return items to process (some may be deselected)"""
- keywordexpr = self.config.option.keyword
- if not keywordexpr:
- return colitems
- remaining = []
- deselected = []
- for colitem in colitems:
- if isinstance(colitem, Item):
- if colitem._skipbykeyword(keywordexpr):
- deselected.append(colitem)
- continue
- remaining.append(colitem)
- if deselected:
- self.queue.put(repevent.Deselected(deselected))
- return remaining
-
-
- def sessionstarts(self):
- """ setup any neccessary resources ahead of the test run. """
- self.reporter = self.config.initreporter(self.bus)
- self.bus.notify(repevent.SessionStart(self))
- self.hm = HostManager(self)
- self.hm.setup_hosts(notify=self.queue.put)
- for host in self.hm.hosts:
- self.addhost(host)
-
- def sessionfinishes(self, exitstatus):
- """ teardown any resources after a test run. """
- self.bus.notify(repevent.SessionFinish(self, exitstatus=exitstatus))
- self.reporter.deactivate()
- #for host in self.hosts:
- # host.shutdown()
- for host in self.hosts:
- host.gw.exit()
Copied: py/branch/event/py/test2/dist/dsession.py (from r57168, py/branch/event/py/test2/dist/async.py)
==============================================================================
--- py/branch/event/py/test2/dist/async.py (original)
+++ py/branch/event/py/test2/dist/dsession.py Sun Aug 10 17:40:02 2008
@@ -1,6 +1,6 @@
"""
- EXPERIMENTAL async session (for dist/non-dist unification)
+ EXPERIMENTAL dsession session (for dist/non-dist unification)
"""
@@ -30,7 +30,7 @@
self.shuttingdown = False
self.testsfailed = False
-class AsyncSession(Session):
+class DSession(Session):
"""
Session drives the collection and running of tests
and generates test events for reporters.
@@ -43,7 +43,7 @@
EXIT_NOHOSTS = 4
def __init__(self, config):
- super(AsyncSession, self).__init__(config=config)
+ super(DSession, self).__init__(config=config)
self.queue = py.std.Queue.Queue()
self.host2pending = {}
Deleted: /py/branch/event/py/test2/dist/testing/test_async.py
==============================================================================
--- /py/branch/event/py/test2/dist/testing/test_async.py Sun Aug 10 17:40:02 2008
+++ (empty file)
@@ -1,321 +0,0 @@
-from py.__.test2.testing.suptest import InlineCollection
-from py.__.test2.dist.async import AsyncSession, LoopState
-from py.__.test2.dist.hostmanage import Host
-from py.__.test2.runner import basic_collect_report
-from py.__.test2 import repevent
-import py
-
-def run(item):
- runner = item._getrunner()
- return runner(item)
-
-class MockNode:
- def __init__(self):
- self.sent = []
-
- def sendlist(self, items):
- self.sent.append(items)
-
- def shutdown(self):
- self._shutdown=True
-
-def dumpqueue(queue):
- while queue.qsize():
- print queue.get()
-
-class TestAsyncSession(InlineCollection):
- def test_add_remove_host(self):
- item = self.getitem("def test_func(): pass")
- rep = run(item)
- session = AsyncSession(item._config)
- host = Host("localhost")
- assert not session.hosts
- session.addhost(host)
- assert len(session.hosts) == 1
- session.host2pending[host].append(item)
- pending = session.removehost(host)
- assert pending == [item]
- py.test2.raises(Exception, "session.removehost(host)")
-
- def test_senditems_removeitems(self):
- item = self.getitem("def test_func(): pass")
- rep = run(item)
- session = AsyncSession(item._config)
- host = Host("localhost")
- host.node = MockNode()
- session.addhost(host)
- session.senditems([item])
- assert session.host2pending[host] == [item]
- assert session.item2host[item] == host
- session.removeitem(item)
- assert not session.host2pending[host]
- assert not session.item2host
-
- def test_triggertesting_collect(self):
- modcol = self.getmodulecol("""
- def test_func():
- pass
- """)
- session = AsyncSession(modcol._config)
- session.triggertesting([modcol])
- rep = session.queue.get(block=False)
- assert isinstance(rep, repevent.CollectionReport)
- assert len(rep.result) == 1
-
- def test_triggertesting_item(self):
- item = self.getitem("def test_func(): pass")
- session = AsyncSession(item._config)
- host1 = Host("localhost")
- host1.node = MockNode()
- host2 = Host("localhost")
- host2.node = MockNode()
- session.addhost(host1)
- session.addhost(host2)
- session.triggertesting([item] * (session.MAXITEMSPERHOST*2 + 1))
- host1_sent = host1.node.sent[0]
- host2_sent = host2.node.sent[0]
- assert host1_sent == [item] * session.MAXITEMSPERHOST
- assert host2_sent == [item] * session.MAXITEMSPERHOST
- assert session.host2pending[host1] == host1_sent
- assert session.host2pending[host2] == host2_sent
- ev = session.queue.get(block=False)
- assert isinstance(ev, repevent.RescheduleItems)
- assert ev.items == [item]
-
- def test_keyboardinterrupt(self):
- item = self.getitem("def test_func(): pass")
- session = AsyncSession(item._config)
- def raise_(): raise KeyboardInterrupt()
- session.queue.get = raise_
- exitstatus = session.loop([])
- assert exitstatus == session.EXIT_INTERRUPTED
-
- def test_internalerror(self):
- item = self.getitem("def test_func(): pass")
- session = AsyncSession(item._config)
- def raise_(): raise ValueError()
- session.queue.get = raise_
- exitstatus = session.loop([])
- assert exitstatus == session.EXIT_INTERNALERROR
-
- def test_rescheduleevent(self):
- item = self.getitem("def test_func(): pass")
- session = AsyncSession(item._config)
- host1 = Host("localhost")
- host1.node = MockNode()
- session.addhost(host1)
- ev = repevent.RescheduleItems([item])
- loopstate = LoopState([])
- session.queue.put(ev)
- session.loop_once(loopstate)
- # check that RescheduleEvents are not immediately
- # rescheduled if there are no hosts
- assert loopstate.dowork == False
- session.queue.put(repevent.NOP())
- session.loop_once(loopstate)
- session.queue.put(repevent.NOP())
- session.loop_once(loopstate)
- assert host1.node.sent == [[item]]
- session.queue.put(run(item))
- session.loop_once(loopstate)
- assert loopstate.shuttingdown
- assert not loopstate.testsfailed
-
- def test_no_hosts_remaining_for_tests(self):
- item = self.getitem("def test_func(): pass")
- # setup a session with one host
- session = AsyncSession(item._config)
- host1 = Host("localhost")
- host1.node = MockNode()
- session.addhost(host1)
-
- # setup a HostDown event
- ev = repevent.HostDown(host1, None)
- session.queue.put(ev)
-
- # call the loop which should come back
- # because it doesn't have any hosts left
- exitstatus = session.loop([item])
- dumpqueue(session.queue)
- assert exitstatus == session.EXIT_NOHOSTS
-
- def test_hostdown_causes_reschedule_pending(self):
- item = self.getitem("def test_func(): pass")
-
- # setup a session with two hosts
- session = AsyncSession(item._config)
- host1 = Host("localhost")
- host1.node = MockNode()
- session.addhost(host1)
- host2 = Host("localhost")
- host2.node = MockNode()
- session.addhost(host2)
-
- # have one test pending for a host that goes down
- session.host2pending[host1].append(item)
- ev = repevent.HostDown(host1, None)
- session.queue.put(ev)
-
- loopstate = LoopState([])
- session.loop_once(loopstate)
-
- assert loopstate.colitems == [item]
-
- def test_event_propagation(self):
- item = self.getitem("def test_func(): pass")
- session = AsyncSession(item._config)
-
- ev = repevent.HostDown(Host("localhost"), None)
- events = [] ; session.bus.subscribe(events.append)
- session.queue.put(ev)
- py.test.raises(ValueError, "session.loop_once(LoopState([]))")
- assert events[0] == ev
-
- def runthrough(self, item):
- session = AsyncSession(item._config)
- host1 = Host("localhost")
- host1.node = MockNode()
- session.addhost(host1)
- loopstate = LoopState([item])
-
- session.queue.put(repevent.NOP())
- session.loop_once(loopstate)
-
- assert host1.node.sent == [[item]]
- ev = run(item)
- session.queue.put(ev)
- session.loop_once(loopstate)
- assert loopstate.shuttingdown
- session.queue.put(repevent.HostDown(host1, None))
- session.loop_once(loopstate)
- dumpqueue(session.queue)
- return session, loopstate.exitstatus
-
- def test_exit_completed_tests_ok(self):
- item = self.getitem("def test_func(): pass")
- session, exitstatus = self.runthrough(item)
- assert exitstatus == session.EXIT_OK
-
- def test_exit_completed_tests_fail(self):
- item = self.getitem("def test_func(): 0/0")
- session, exitstatus = self.runthrough(item)
- assert exitstatus == session.EXIT_TESTSFAILED
-
- def test_exit_on_first_failing(self):
- modcol = self.getmodulecol("""
- def test_fail():
- assert 0
- def test_pass():
- pass
- """)
- modcol._config.option.exitfirst = True
- session = AsyncSession(modcol._config)
- host1 = Host("localhost")
- host1.node = MockNode()
- session.addhost(host1)
- items = basic_collect_report(modcol).result
-
- # trigger testing - this sends tests to host1
- session.triggertesting(items)
-
- # run tests ourselves and produce reports
- ev1 = run(items[0])
- ev2 = run(items[1])
- session.queue.put(ev1) # a failing one
- session.queue.put(ev2)
- # now call the loop
- loopstate = LoopState(items)
- session.loop_once(loopstate)
- assert loopstate.testsfailed
- assert loopstate.shuttingdown
-
- def test_shuttingdown_filters_events(self):
- item = self.getitem("def test_func(): pass")
- session = AsyncSession(item._config)
- host = Host("localhost")
- session.addhost(host)
- loopstate = LoopState([])
- loopstate.shuttingdown = True
- l = []
- session.bus.subscribe(l.append)
- session.queue.put(run(item))
- session.loop_once(loopstate)
- assert not l
- ev = repevent.HostDown(host)
- session.queue.put(ev)
- session.loop_once(loopstate)
- assert l == [ev]
-
- def test_filteritems(self):
- modcol = self.getmodulecol("""
- def test_fail():
- assert 0
- def test_pass():
- pass
- """)
- session = AsyncSession(modcol._config)
-
- modcol._config.option.keyword = modcol.name
- dsel = session.filteritems([modcol])
- assert dsel == [modcol]
- items = [modcol.join(x) for x in modcol.listdir()]
- remaining = session.filteritems(items)
- assert remaining == []
- ev = session.queue.get(block=False)
- assert isinstance(ev, repevent.Deselected)
- assert ev.items == items
-
- modcol._config.option.keyword = "test_fail"
- remaining = session.filteritems(items)
- assert remaining == [items[0]]
- ev = session.queue.get(block=False)
- assert isinstance(ev, repevent.Deselected)
- assert ev.items == [items[1]]
-
- def test_hostdown_shutdown_after_completion(self):
- item = self.getitem("def test_func(): pass")
- session = AsyncSession(item._config)
-
- host = Host("localhost")
- host.node = MockNode()
- session.addhost(host)
- session.senditems([item])
- session.queue.put(run(item))
- loopstate = LoopState([])
- session.loop_once(loopstate)
- assert host.node._shutdown is True
- assert loopstate.exitstatus is None, "loop did not wait for hostdown"
- assert loopstate.shuttingdown
- session.queue.put(repevent.HostDown(host, None))
- session.loop_once(loopstate)
- assert loopstate.exitstatus == 0
-
- def test_nopending_but_collection_remains(self):
- modcol = self.getmodulecol("""
- def test_fail():
- assert 0
- def test_pass():
- pass
- """)
- session = AsyncSession(modcol._config)
- host = Host("localhost")
- host.node = MockNode()
- session.addhost(host)
-
- colreport = basic_collect_report(modcol)
- item1, item2 = colreport.result
- session.senditems([item1])
- # host2pending will become empty when the loop sees
- # the report
- session.queue.put(run(item1))
-
- # but we have a collection pending
- session.queue.put(colreport)
-
- loopstate = LoopState([])
- session.loop_once(loopstate)
- assert loopstate.exitstatus is None, "loop did not care for collection report"
- assert not loopstate.colitems
- session.loop_once(loopstate)
- assert loopstate.colitems == colreport.result
- assert loopstate.exitstatus is None, "loop did not care for colitems"
Deleted: /py/branch/event/py/test2/dist/testing/test_async_functional.py
==============================================================================
--- /py/branch/event/py/test2/dist/testing/test_async_functional.py Sun Aug 10 17:40:02 2008
+++ (empty file)
@@ -1,122 +0,0 @@
-
-""" Tests various aspects of dist, like ssh hosts setup/teardown
-"""
-
-import py
-from py.__.test2 import repevent
-from py.__.test2.dist.async import AsyncSession
-from py.__.test2.dist.hostmanage import HostManager, Host
-from basetest import BasicRsessionTest, DirSetup
-from py.__.test2.testing import suptest
-
-def eventreader(session):
- queue = py.std.Queue.Queue()
- session.bus.subscribe(queue.put)
- def readevent(eventtype=repevent.ItemTestReport, timeout=2.0):
- events = []
- while 1:
- try:
- ev = queue.get(timeout=timeout)
- except py.std.Queue.Empty:
- print "seen events", events
- raise IOError("did not see %r events" % (eventtype))
- else:
- if isinstance(ev, eventtype):
- #print "other events seen", events
- return ev
- events.append(ev)
- return readevent
-
-class TestAsyncFunctional(suptest.InlineCollection):
- def test_dist_no_disthost(self):
- config = self.parseconfig(self.tmpdir, '-d')
- py.test2.raises(SystemExit, "config.initsession()")
-
- def test_session_eventlog_dist(self):
- self.makepyfile(conftest="dist_hosts=['localhost']\n")
- eventlog = self.tmpdir.join("mylog")
- config = self.parseconfig(self.tmpdir, '-d', '--eventlog=%s' % eventlog)
- session = config.initsession()
- session.bus.notify(repevent.SessionStart(session))
- s = eventlog.read()
- assert s.find("SessionStart") != -1
-
- def test_dist_some_tests(self):
- self.makepyfile(conftest="dist_hosts=['localhost']\n")
- p1 = self.makepyfile(test_one="""
- def test_1():
- pass
- def test_x():
- import py
- py.test2.skip("aaa")
- def test_fail():
- assert 0
- """)
- config = self.parseconfig('-d', p1)
- dist = AsyncSession(config)
- readevent = eventreader(dist)
- dist.main()
- ev = readevent(repevent.ItemTestReport)
- assert ev.passed
- ev = readevent(repevent.ItemTestReport)
- assert ev.skipped
- ev = readevent(repevent.ItemTestReport)
- assert ev.failed
- # see that the host is really down
- ev = readevent(repevent.HostDown)
- assert ev.host.hostname == "localhost"
- ev = readevent(repevent.SessionFinish)
-
- def test_distribution_rsync_roots_example(self):
- py.test.skip("testing for root rsync needs rework")
- destdir = py.test2.ensuretemp("example_dist_destdir")
- subdir = "sub_example_dist"
- sourcedir = self.tmpdir.mkdir("source")
- sourcedir.ensure(subdir, "conftest.py").write(py.code.Source("""
- dist_hosts = ["localhost:%s"]
- dist_rsync_roots = ["%s", "../py"]
- """ % (destdir, tmpdir.join(subdir), )))
- tmpdir.ensure(subdir, "__init__.py")
- tmpdir.ensure(subdir, "test_one.py").write(py.code.Source("""
- def test_1():
- pass
- def test_2():
- assert 0
- def test_3():
- raise ValueError(23)
- def test_4(someargs):
- pass
- def test_5():
- assert __file__ != '%s'
- #def test_6():
- # import py
- # assert py.__file__ != '%s'
- """ % (tmpdir.join(subdir), py.__file__)))
- destdir.join("py").mksymlinkto(py.path.local(py.__file__).dirpath())
- config = py.test2.config._reparse([tmpdir.join(subdir)])
- assert config.topdir == tmpdir
- assert not tmpdir.join("__init__.py").check()
- dist = AsyncSession(config)
- sorter = suptest.events_from_session(dist)
- testevents = sorter.get(repevent.ItemTestReport)
- assert len([x for x in testevents if x.passed]) == 2
- assert len([x for x in testevents if x.failed]) == 3
- assert len([x for x in testevents if x.skipped]) == 0
-
- def test_nice_level(self):
- """ Tests if nice level behaviour is ok """
- self.makepyfile(conftest="""
- dist_hosts=['localhost']
- dist_nicelevel = 10
- """)
- p1 = self.makepyfile(test_nice="""
- def test_nice():
- import os
- assert os.nice(0) == 10
- """)
- config = self.parseconfig('-d', p1)
- session = config.initsession()
- events = suptest.events_from_session(session)
- ev = events.getreport('test_nice')
- assert ev.passed
-
Copied: py/branch/event/py/test2/dist/testing/test_dsession.py (from r57168, py/branch/event/py/test2/dist/testing/test_async.py)
==============================================================================
--- py/branch/event/py/test2/dist/testing/test_async.py (original)
+++ py/branch/event/py/test2/dist/testing/test_dsession.py Sun Aug 10 17:40:02 2008
@@ -1,5 +1,5 @@
from py.__.test2.testing.suptest import InlineCollection
-from py.__.test2.dist.async import AsyncSession, LoopState
+from py.__.test2.dist.dsession import DSession, LoopState
from py.__.test2.dist.hostmanage import Host
from py.__.test2.runner import basic_collect_report
from py.__.test2 import repevent
@@ -23,11 +23,11 @@
while queue.qsize():
print queue.get()
-class TestAsyncSession(InlineCollection):
+class TestDSession(InlineCollection):
def test_add_remove_host(self):
item = self.getitem("def test_func(): pass")
rep = run(item)
- session = AsyncSession(item._config)
+ session = DSession(item._config)
host = Host("localhost")
assert not session.hosts
session.addhost(host)
@@ -40,7 +40,7 @@
def test_senditems_removeitems(self):
item = self.getitem("def test_func(): pass")
rep = run(item)
- session = AsyncSession(item._config)
+ session = DSession(item._config)
host = Host("localhost")
host.node = MockNode()
session.addhost(host)
@@ -56,7 +56,7 @@
def test_func():
pass
""")
- session = AsyncSession(modcol._config)
+ session = DSession(modcol._config)
session.triggertesting([modcol])
rep = session.queue.get(block=False)
assert isinstance(rep, repevent.CollectionReport)
@@ -64,7 +64,7 @@
def test_triggertesting_item(self):
item = self.getitem("def test_func(): pass")
- session = AsyncSession(item._config)
+ session = DSession(item._config)
host1 = Host("localhost")
host1.node = MockNode()
host2 = Host("localhost")
@@ -84,7 +84,7 @@
def test_keyboardinterrupt(self):
item = self.getitem("def test_func(): pass")
- session = AsyncSession(item._config)
+ session = DSession(item._config)
def raise_(): raise KeyboardInterrupt()
session.queue.get = raise_
exitstatus = session.loop([])
@@ -92,7 +92,7 @@
def test_internalerror(self):
item = self.getitem("def test_func(): pass")
- session = AsyncSession(item._config)
+ session = DSession(item._config)
def raise_(): raise ValueError()
session.queue.get = raise_
exitstatus = session.loop([])
@@ -100,7 +100,7 @@
def test_rescheduleevent(self):
item = self.getitem("def test_func(): pass")
- session = AsyncSession(item._config)
+ session = DSession(item._config)
host1 = Host("localhost")
host1.node = MockNode()
session.addhost(host1)
@@ -124,7 +124,7 @@
def test_no_hosts_remaining_for_tests(self):
item = self.getitem("def test_func(): pass")
# setup a session with one host
- session = AsyncSession(item._config)
+ session = DSession(item._config)
host1 = Host("localhost")
host1.node = MockNode()
session.addhost(host1)
@@ -143,7 +143,7 @@
item = self.getitem("def test_func(): pass")
# setup a session with two hosts
- session = AsyncSession(item._config)
+ session = DSession(item._config)
host1 = Host("localhost")
host1.node = MockNode()
session.addhost(host1)
@@ -163,7 +163,7 @@
def test_event_propagation(self):
item = self.getitem("def test_func(): pass")
- session = AsyncSession(item._config)
+ session = DSession(item._config)
ev = repevent.HostDown(Host("localhost"), None)
events = [] ; session.bus.subscribe(events.append)
@@ -172,7 +172,7 @@
assert events[0] == ev
def runthrough(self, item):
- session = AsyncSession(item._config)
+ session = DSession(item._config)
host1 = Host("localhost")
host1.node = MockNode()
session.addhost(host1)
@@ -209,7 +209,7 @@
pass
""")
modcol._config.option.exitfirst = True
- session = AsyncSession(modcol._config)
+ session = DSession(modcol._config)
host1 = Host("localhost")
host1.node = MockNode()
session.addhost(host1)
@@ -231,7 +231,7 @@
def test_shuttingdown_filters_events(self):
item = self.getitem("def test_func(): pass")
- session = AsyncSession(item._config)
+ session = DSession(item._config)
host = Host("localhost")
session.addhost(host)
loopstate = LoopState([])
@@ -253,7 +253,7 @@
def test_pass():
pass
""")
- session = AsyncSession(modcol._config)
+ session = DSession(modcol._config)
modcol._config.option.keyword = modcol.name
dsel = session.filteritems([modcol])
@@ -274,7 +274,7 @@
def test_hostdown_shutdown_after_completion(self):
item = self.getitem("def test_func(): pass")
- session = AsyncSession(item._config)
+ session = DSession(item._config)
host = Host("localhost")
host.node = MockNode()
@@ -297,7 +297,7 @@
def test_pass():
pass
""")
- session = AsyncSession(modcol._config)
+ session = DSession(modcol._config)
host = Host("localhost")
host.node = MockNode()
session.addhost(host)
Copied: py/branch/event/py/test2/dist/testing/test_functional_dsession.py (from r57165, py/branch/event/py/test2/dist/testing/test_async_functional.py)
==============================================================================
--- py/branch/event/py/test2/dist/testing/test_async_functional.py (original)
+++ py/branch/event/py/test2/dist/testing/test_functional_dsession.py Sun Aug 10 17:40:02 2008
@@ -4,7 +4,7 @@
import py
from py.__.test2 import repevent
-from py.__.test2.dist.async import AsyncSession
+from py.__.test2.dist.dsession import DSession
from py.__.test2.dist.hostmanage import HostManager, Host
from basetest import BasicRsessionTest, DirSetup
from py.__.test2.testing import suptest
@@ -53,7 +53,7 @@
assert 0
""")
config = self.parseconfig('-d', p1)
- dist = AsyncSession(config)
+ dist = DSession(config)
readevent = eventreader(dist)
dist.main()
ev = readevent(repevent.ItemTestReport)
@@ -96,7 +96,7 @@
config = py.test2.config._reparse([tmpdir.join(subdir)])
assert config.topdir == tmpdir
assert not tmpdir.join("__init__.py").check()
- dist = AsyncSession(config)
+ dist = DSession(config)
sorter = suptest.events_from_session(dist)
testevents = sorter.get(repevent.ItemTestReport)
assert len([x for x in testevents if x.passed]) == 2
Modified: py/branch/event/py/test2/repevent.py
==============================================================================
--- py/branch/event/py/test2/repevent.py (original)
+++ py/branch/event/py/test2/repevent.py Sun Aug 10 17:40:02 2008
@@ -100,7 +100,7 @@
self.roots = roots
class HostDown(BaseEvent):
- def __init__(self, host, error):
+ def __init__(self, host, error=None):
self.host = host
self.error = error
Modified: py/branch/event/py/test2/testing/test_config.py
==============================================================================
--- py/branch/event/py/test2/testing/test_config.py (original)
+++ py/branch/event/py/test2/testing/test_config.py Sun Aug 10 17:40:02 2008
@@ -194,7 +194,7 @@
def test_sessionname_dist(self):
config = py.test2.config._reparse([self.tmpdir, '--dist'])
- assert config._getsessionname() == 'AsyncSession'
+ assert config._getsessionname() == 'DSession'
def test_session_eventlog(self):
eventlog = self.tmpdir.join("test_session_eventlog")
@@ -213,7 +213,7 @@
for x in 'startserver runbrowser rest'.split():
config = py.test2.config._reparse([self.tmpdir, '--dist', '--%s' % x])
- assert config._getsessionname() == 'AsyncSession'
+ assert config._getsessionname() == 'DSession'
def test_implied_different_sessions(self):
config = py.test2.config._reparse([self.tmpdir, '--looponfailing'])
@@ -221,7 +221,7 @@
config = py.test2.config._reparse([self.tmpdir, '--exec=x'])
assert config._getsessionname() == 'RemoteTerminalSession'
config = py.test2.config._reparse([self.tmpdir, '--dist', '--exec=x'])
- assert config._getsessionname() == 'AsyncSession'
+ assert config._getsessionname() == 'DSession'
def test_sessionname_lookup_custom(self):
self.tmpdir.join("conftest.py").write(py.code.Source("""
More information about the pytest-commit
mailing list