[py-svn] r10907 - in py/dist/py: . test/tkinter test/tkinter/testing test/tkinter/testing/data

jan at codespeak.net jan at codespeak.net
Wed Apr 20 12:10:45 CEST 2005


Author: jan
Date: Wed Apr 20 12:10:45 2005
New Revision: 10907

Added:
   py/dist/py/test/tkinter/backend.py
   py/dist/py/test/tkinter/testing/data/
   py/dist/py/test/tkinter/testing/data/__init__.py
   py/dist/py/test/tkinter/testing/data/filetest.py
   py/dist/py/test/tkinter/testing/test_backend.py
   py/dist/py/test/tkinter/testing/test_tixgui.py
   py/dist/py/test/tkinter/tixgui.py
Modified:
   py/dist/py/__init__.py
   py/dist/py/test/tkinter/util.py
Log:
Complete rewrite of the Tkinter Frontend
uses the new channel callback
start with py.test --tkinter



Modified: py/dist/py/__init__.py
==============================================================================
--- py/dist/py/__init__.py	(original)
+++ py/dist/py/__init__.py	Wed Apr 20 12:10:45 2005
@@ -16,7 +16,7 @@
     # for customization of collecting/running tests
     'test.Session'           : ('./test/session.py', 'Session'),
     'test.TerminalSession'   : ('./test/terminal/terminal.py', 'TerminalSession'),
-    'test.TkinterSession'    : ('./test/tkinter/tkgui.py', 'TkinterSession'),
+    'test.TkinterSession'    : ('./test/tkinter/tixgui.py', 'TixSession'),
     'test.collect.Collector' : ('./test/collect.py', 'Collector'),
     'test.collect.Directory' : ('./test/collect.py', 'Directory'),
     'test.collect.Module'    : ('./test/collect.py', 'Module'),

Added: py/dist/py/test/tkinter/backend.py
==============================================================================
--- (empty file)
+++ py/dist/py/test/tkinter/backend.py	Wed Apr 20 12:10:45 2005
@@ -0,0 +1,142 @@
+'''Backend for running tests and creating a Repository of testreports.'''
+import py
+import repository
+import util
+
+Null = util.Null
+
+class TestRepository(repository.Repository):
+    '''stores only TestReport'''
+    disabled = True
+    ReportClass = util.TestReport2
+    failed_id = [repr(ReportClass.Status.Failed())]
+    skipped_id = [repr(ReportClass.Status.Skipped())]    
+
+    def __init__(self):
+        super(TestRepository, self).__init__()
+        self.add([], self.ReportClass())
+        failedreport = self.ReportClass()
+        failedreport.full_id = self.failed_id
+        failedreport.label = 'Failed Tests'
+        self.add(self.failed_id, failedreport)
+        skippedreport = self.ReportClass()
+        skippedreport.full_id = self.skipped_id
+        skippedreport.label = 'Skipped Tests'
+        self.add(self.skipped_id, skippedreport)
+
+    def root_status(self):
+        root_status = self.ReportClass.Status.NotExecuted()
+        if len(self.keys()) > 2:
+            root_status = self.ReportClass.Status.Passed()
+        if len(self.find_children(self.skipped_id)):
+            root_status = self.ReportClass.Status.Skipped()
+        if len(self.find_children(self.failed_id)):
+            root_status = self.ReportClass.Status.Failed()
+        return root_status
+
+    def delete_all(self, key):
+        super(TestRepository, self).delete_all(key)
+        new_repos = TestRepository()
+        for new_key in new_repos.keys():
+            if not self.haskey(new_key):
+                self.add(new_key, new_repos.find(new_key))
+
+    def delete(self, key):
+        super(TestRepository, self).delete(key)
+        new_repos = TestRepository()
+        if new_repos.haskey(key):
+            self.add(key, new_repos.find(key))
+
+    def add_report(self, report):
+        if not report.full_id:
+            self.add([], report)
+            return
+        if report.error_report:
+            if report.status == self.ReportClass.Status.Failed():
+                self.add(self.failed_id + [report.id], report)
+            elif report.status == self.ReportClass.Status.Skipped():
+                self.add(self.skipped_id + [report.id], report)
+        self.add(report.full_id, report)
+
+    def add_report_from_channel(self, report_str):
+        report = self.ReportClass.fromChannel(report_str)
+        self.add_report(report)
+        return report.full_id[:]
+
+class RepositoryBackend:
+
+    def __init__(self, config = Null()):
+        self.testrepository = TestRepository()
+        self.channel = Null()
+        self.queue = py.std.Queue.Queue()
+        self._message_callback = Null()
+        self._messages_callback = Null()
+        self.config = config
+
+    def running(self):
+        '''are there tests running?'''
+        if self.channel:
+            return not self.channel.isclosed()
+        return False
+    running = property(running)
+
+    def shutdown(self):
+        if self.running:
+            self.channel.close()
+    
+    def get_repository(self):
+        '''return the repository'''
+        return self.testrepository
+
+    def set_message_callback(self, callback = Null()):
+        self._message_callback = callback
+
+    def set_messages_callback(self, callback = Null()):
+        self._messages_callback = callback
+
+    def update(self):
+        """Check if there is something new in the queue."""
+        changed_report_ids = []
+        while self.queue.qsize():
+            try:
+                report_str = self.queue.get(0)
+                id = report_str
+                if report_str is not None:
+                    id = self.testrepository.add_report_from_channel(report_str) 
+                changed_report_ids.append(id)
+                self._message_callback(id)
+            except py.std.Queue.Empty:
+                pass
+        self._messages_callback(changed_report_ids)
+
+    def start_tests(self, config = None, args = [], tests = []):
+        if config is None:
+            config = self.config
+        self.testrepository = TestRepository()
+        gateway = py.execnet.PopenGateway(config.option.executable) 
+        self.channel = gateway.newchannel(receiver = self.queue.put)
+        gateway.remote_exec(channel = self.channel, source = '''
+        import py
+        from py.__impl__.test.tkinter.backend import remote
+
+        args, tests = channel.receive()
+        remote(channel, tests = tests, args = args)
+        ''')
+        self.channel.send((args, tests))
+                
+def remote(channel, tests = [], args = []):
+    import py
+    from py.__impl__.test.tkinter.guisession import GuiSession
+    from py.__impl__.test.terminal.remote import getfailureitems
+    
+    config, testfiles = py.test.Config.parse(args)
+    if tests: 
+        cols = getfailureitems(tests)
+    else:
+        cols = testfiles
+    print 'cols:', cols
+    #session = config.getsessionclass()(config)
+    session = GuiSession(config = config, channel=channel) 
+    session.shouldclose = channel.isclosed
+    session.main(cols)
+    

Added: py/dist/py/test/tkinter/testing/data/__init__.py
==============================================================================

Added: py/dist/py/test/tkinter/testing/data/filetest.py
==============================================================================
--- (empty file)
+++ py/dist/py/test/tkinter/testing/data/filetest.py	Wed Apr 20 12:10:45 2005
@@ -0,0 +1,7 @@
+
+def test_one(): 
+    assert 42 == 42
+
+class TestClass(object): 
+    def test_method_one(self): 
+        assert 42 == 42 

Added: py/dist/py/test/tkinter/testing/test_backend.py
==============================================================================
--- (empty file)
+++ py/dist/py/test/tkinter/testing/test_backend.py	Wed Apr 20 12:10:45 2005
@@ -0,0 +1,104 @@
+import py
+from py.__impl__.test.tkinter import backend
+
+RepositoryBackend = backend.RepositoryBackend
+TestRepository = backend.TestRepository
+
+datadir = py.magic.autopath().dirpath('data')
+from cStringIO import StringIO 
+
+
+class Test_TestRepository:
+    
+    def check_repository_has_failed_and_skipped_folder(self, repos):
+        assert repos.find([repr(TestRepository.ReportClass.Status.Failed())])
+        assert repos.find([repr(TestRepository.ReportClass.Status.Skipped())])
+
+    def test_repository_has_failed_and_skipped_folder(self):
+        repos = TestRepository()
+        self.check_repository_has_failed_and_skipped_folder(repos)
+
+    def test_repository_has_failed_and_skipped_folder_after_delete_all(self):
+        repos = TestRepository()
+        self.check_repository_has_failed_and_skipped_folder(repos)
+        repos.delete_all([])
+        self.check_repository_has_failed_and_skipped_folder(repos)
+
+    def test_repository_has_failed_and_skipped_folder_after_delete(self):
+        repos = TestRepository()
+        self.check_repository_has_failed_and_skipped_folder(repos)
+        repos.delete([str(TestRepository.ReportClass.Status.Failed())])
+        self.check_repository_has_failed_and_skipped_folder(repos)
+        repos.delete([str(TestRepository.ReportClass.Status.Failed())])
+        self.check_repository_has_failed_and_skipped_folder(repos)
+
+    def test_add_report_from_channel(self):
+        full_id = ['start', 'next', 'end']
+        report = TestRepository.ReportClass()
+        report.full_id = full_id
+
+        repos = TestRepository()
+        id = repos.add_report_from_channel(report.to_channel())
+        assert id == full_id
+        assert repos.haskey(full_id)
+    
+class TestRepositoryBackend:
+
+    def setup_method(self, method):
+        self.backend = RepositoryBackend()
+
+    def test_get_repository(self):
+        assert isinstance(self.backend.get_repository(), TestRepository)
+
+    def test_running_property(self):
+        backend = RepositoryBackend()
+        assert not self.backend.running
+
+    def test_update_callback(self):
+        l = []
+        self.backend.set_message_callback(l.append)
+        self.backend.queue.put(None)
+        self.backend.update()
+        assert len(l) == 1
+        assert l[0] is None
+
+    def test_processs_messeges_callback(self):
+        l = []
+        self.backend.set_message_callback(l.append)
+        self.backend.queue.put(None)
+        self.backend.update()
+        assert len(l) == 1
+        assert l[0] is None
+
+    def test_start_tests(self):
+        config, args = py.test.Config.parse([])
+        self.backend.start_tests(config = config,
+                                 args = [str(datadir / 'filetest.py')],
+                                 tests = [])
+        while self.backend.running:
+            self.backend.update()
+        repos = self.backend.get_repository()
+        print repos.keys()
+        print repos.find()
+        assert repos.find(['py',
+                           'test',
+                           'tkinter',
+                           'testing',
+                           'data',
+                           'filetest.py',
+                           'TestClass'])
+        
+def test_remote():
+    class ChannelMock:
+        def __init__(self):
+            self.sendlist = []
+        def send(self, value):
+            self.sendlist.append(value)
+        def isclosed(self):
+            return False
+        
+    channel = ChannelMock()
+    backend.remote(channel, args = [str(datadir / 'filetest.py')], tests = [])
+    #py.std.pprint.pprint(channel.sendlist)
+    assert channel.sendlist
+    

Added: py/dist/py/test/tkinter/testing/test_tixgui.py
==============================================================================
--- (empty file)
+++ py/dist/py/test/tkinter/testing/test_tixgui.py	Wed Apr 20 12:10:45 2005
@@ -0,0 +1,8 @@
+
+import py
+
+def test_import_tix():
+    import Tix
+    root = Tix.Tk()
+    root.tk.eval('package require Tix')
+

Added: py/dist/py/test/tkinter/tixgui.py
==============================================================================
--- (empty file)
+++ py/dist/py/test/tkinter/tixgui.py	Wed Apr 20 12:10:45 2005
@@ -0,0 +1,265 @@
+import py
+from py.__impl__.test.tkinter import backend
+from py.__impl__.test.tkinter import util
+Null = util.Null
+
+
+import ScrolledText
+from Tkinter import PhotoImage
+import Tix
+from Tkconstants import *
+import re
+import os
+
+class StatusBar(Tix.Frame):
+
+    def __init__(self, master=None, **kw):
+        if master is None:
+            master = Tk()
+        apply(Tix.Frame.__init__, (self, master), kw)
+        self.labels = {}
+
+    def set_label(self, name, text='', side=LEFT):
+        if not self.labels.has_key(name):
+            label = Tix.Label(self, bd=1, relief=SUNKEN, anchor=W)
+            label.pack(side=side)
+            self.labels[name] = label
+        else:
+            label = self.labels[name]
+        label.config(text='%s:\t%s' % (name,text))
+
+    def _get_label(self, name):
+        if not self.labels.has_key(name):
+            self.set_label(name, 'NoText')
+        return self.labels[name]
+
+    def update(self, testrepository):
+        failed = testrepository.keys(testrepository.failed_id)
+        self.set_label('Failed', str(len(failed)))
+        self._get_label('Failed').configure(bg = 'Red', fg = 'White')
+        skipped = testrepository.keys(testrepository.skipped_id)
+        self.set_label('Skipped', str(len(skipped)))
+        self._get_label('Skipped').configure(bg = 'Yellow')
+        self.set_label('Collectors',
+                                  str(len(testrepository.keys())-2))
+
+class ReportListBox(Tix.LabelFrame):
+
+    def __init__(self, *args, **kwargs):
+        Tix.LabelFrame.__init__(self, *args, **kwargs)
+        self.callback = Null()
+        self.data = {}
+        self.createwidgets()
+        self.label.configure(text = 'Idle')
+        
+    def createwidgets(self):
+        self.listbox = Tix.Listbox(self.frame,  foreground='red',
+                                   selectmode=SINGLE)
+
+        self.scrollbar = Tix.Scrollbar(self.frame, command=self.listbox.yview)
+        self.scrollbar.pack(side = RIGHT, fill = Y, anchor = N)
+        self.listbox.pack(side = LEFT, fill = BOTH, expand = YES, anchor = NW)
+        self.listbox.configure(yscrollcommand = self.scrollbar.set)
+
+    def set_callback(self, callback):
+        self.callback = callback
+        self.listbox.bind('<Double-1>', self.do_callback)
+
+    def do_callback(self, *args):
+        report_ids = [self.data[self.listbox.get(int(item))] for item in self.listbox.curselection()]
+        for report_id in report_ids:
+            self.callback(report_id)
+
+    def update_label(self, report_path):
+        label = report_path
+        if not label:
+            label = 'Idle'
+        self.label.configure(text = label)
+
+    def update_list(self, repository):
+        failed_childs = repository.find_children(repository.failed_id)
+        skipped_childs = repository.find_children(repository.skipped_id)
+        if len(failed_childs + skipped_childs) == len(self.data.keys()):
+            return
+        old_selection = [int(sel) for sel in self.listbox.curselection()]
+        self.listbox.delete(0, END)
+        self.data = {}
+        for report_id in failed_childs + skipped_childs:
+            report = repository.find(report_id)
+            label = '%s: %s' % (report.status, report.label)
+            self.data[label] = report.full_id
+            self.listbox.insert(END, label)
+        for index in old_selection:
+            try:
+                self.listbox.select_set(index)
+            except:
+                pass
+            
+        
+
+        
+
+class TixGui:
+
+    def __init__(self, parent, config):
+        self._parent = parent
+        self._config = config
+        self._should_stop = False
+        #XXX needed?
+        self._paths = []
+        self.backend = backend.RepositoryBackend(config)
+        self.createwidgets()
+        self.timer_update()
+
+    def createwidgets(self):
+        self._buttonframe = Tix.Frame(self._parent)
+        self._entry = Tix.LabelEntry(self._buttonframe, labelside = LEFT,
+                                     label = 'Tests:')
+        self._entry.pack(side = LEFT, fill = X, expand = YES)
+        self._entry.entry.configure(width=80)
+        self._entry.entry.bind('<Return>', self.start_tests)
+        self._stop = Tix.Button(self._buttonframe, text = 'Stop',
+                                command = self.stop)
+        self._stop.pack(side = RIGHT)
+        self._run = Tix.Button(self._buttonframe, text = 'Run',
+                               command = self.start_tests)
+        self._run.pack(side = RIGHT)
+        self._buttonframe.pack(side = TOP, fill = X)
+        self._reportlist = ReportListBox(self._parent, width = 400)
+        self._reportlist.pack(side = TOP, fill = BOTH, expand = YES)
+        self._reportlist.set_callback(self.show_error)
+        #self._tree = reporttree.ReportTree(self._parent, backend = self.backend)
+        #self._tree.pack(side = TOP, fill = BOTH, expand = YES)
+        self._statusbar = StatusBar(self._parent)
+        self._statusbar.pack(side=BOTTOM, fill=X)
+        self.update_status(self.backend.get_repository())
+
+
+    def show_error(self, report_id):
+        report = self.backend.get_repository().find(report_id)
+        window = Tix.Toplevel(self._parent)
+        window.title(report.label)
+        window.protocol('WM_DELETE_WINDOW', window.quit)
+        Tix.Label(window, text='%s: %s' % (report.status, report.label),
+                 foreground="red", justify=LEFT).pack(anchor=W)
+        text = ScrolledText.ScrolledText(window)
+        text.tag_config('sel', relief=FLAT)
+        text.insert(END, report.error_report)
+        if len(report.error_report.splitlines()) < 20:
+            text.config(height=len(report.error_report.splitlines()) + 5)
+        text.yview_pickplace(END)
+        text['state'] = DISABLED
+        text['cursor'] = window['cursor']
+        self.attacheditorhotspots(text)
+        text.pack(expand=1, fill=BOTH)
+        ##
+        b = Tix.Button(window, text="Close",
+                      command=window.quit)
+        b.pack(side=BOTTOM)
+        b.focus_set()
+        window.bind('<Key-Return>', lambda e, w=window: w.quit())
+        window.mainloop()
+        window.destroy()
+
+    def attacheditorhotspots(self, text):
+        # Attach clickable regions to a Text widget.
+        filelink = re.compile(r"""\[(?:testcode\s*:)?\s*(.+):(\d+)\]""")
+        skippedlink = re.compile(r"""in\s+(/.*):(\d+)\s+""")
+        lines = text.get('1.0', END).splitlines(1)
+        if not lines:
+            return
+        tagname = ''
+        start, end = 0,0
+        for index, line in enumerate(lines):
+            match = filelink.search(line)
+            if match is None:
+                match = skippedlink.search(line)
+            if match is None:
+                continue
+            file, line = match.group(1, 2)
+            start, end = match.span()
+            tagname = "ref%d" % index
+            text.tag_add(tagname,
+                         "%d.%d" % (index + 1, start),
+                         "%d.%d" % (index + 1, end))
+            text.tag_bind(tagname, "<Enter>",
+                          lambda e, n=tagname:
+                          e.widget.tag_config(n, underline=1))
+            text.tag_bind(tagname, "<Leave>",
+                          lambda e, n=tagname:
+                          e.widget.tag_config(n, underline=0))
+            text.tag_bind(tagname, "<Button-1>",
+                          lambda e, self=self, f=file, l=line:
+                          self.launch_editor(f, l))
+                    
+    def launch_editor(self, file, line):
+        editor = (py.std.os.environ.get('PYUNIT_EDITOR', None) or
+                  py.std.os.environ.get('EDITOR_REMOTE', None) or
+                  os.environ.get('EDITOR', None) or "emacsclient --no-wait ")
+                  #"emacsclient --no-wait ")
+        if editor:
+            print "%s +%s %s" % (editor, line, file)
+            #py.process.cmdexec('%s +%s %s' % (editor, line, file))
+            os.system('%s +%s %s' % (editor, line, file))
+
+    def timer_update(self):
+        self.backend.update()
+        self._parent.after(100, self.timer_update)
+
+    def start_tests(self, dont_care_event=None):
+        paths = self._entry.entry.get().split(',')
+        self.backend.set_messages_callback(self.messages_callback)
+        self.backend.set_message_callback(self.message_callback)
+        self.backend.start_tests(args = paths)
+
+    def update_status(self, repository):
+        self._statusbar.update(repository)
+        bgcolor = 'White'
+        fgcolor = 'Black'
+        root_status = repository.root_status()
+        if root_status == repository.ReportClass.Status.Failed():
+            bgcolor = 'Red'
+            fgcolor = 'White'
+        elif root_status == repository.ReportClass.Status.Skipped() :
+            bgcolor = 'Yellow'
+        self._entry.entry.configure(bg = bgcolor, fg = fgcolor)
+                
+    def messages_callback(self, report_ids):
+        if not report_ids:
+            return
+        repository = self.backend.get_repository()
+        self.update_status(repository)
+        self._reportlist.update_list(repository)
+
+    def message_callback(self, report_id):
+        if report_id is None:
+            report_label = None
+        else:
+            report_label = self.backend.get_repository().find(report_id).path
+        self._reportlist.update_label(report_label)
+        
+    def set_paths(self, paths):
+        self._paths = paths
+        self._entry.entry.insert(END, ', '.join(paths))
+
+    def stop(self):
+        self.backend.shutdown()
+        self.backend.update()
+        self.messages_callback([None])
+        self.message_callback(None)
+
+    def shutdown(self):
+        self.should_stop = True
+        self.backend.shutdown()
+        py.std.sys.exit()
+
+class TixSession:
+    def __init__(self, config):
+        self.config = config
+        
+    def main(self, paths):
+        root = Tix.Tk()
+        tixgui = TixGui(root, self.config)
+        tixgui.set_paths(paths)
+        root.protocol('WM_DELETE_WINDOW', tixgui.shutdown)
+        root.mainloop()

Modified: py/dist/py/test/tkinter/util.py
==============================================================================
--- py/dist/py/test/tkinter/util.py	(original)
+++ py/dist/py/test/tkinter/util.py	Wed Apr 20 12:10:45 2005
@@ -240,6 +240,26 @@
         channel_dict.update(kwargs)
         return TestReport.fromChannel(channel_dict)
 
+class TestReport2(TestReport):
+    '''Will replace TestReport.'''   
+    
+    template = {'time' : 0,
+                'label': 'Root',
+                'id': 'Root',
+                'full_id': [],
+                'status': Status.NotExecuted(),
+                'report': 'NoReport',
+                'error_report': '',
+                'finished': False,
+                'restart_params': None, # ('',('',))
+                'path' : '',
+                'modpath': '',
+                }
+    Status = Status
+
+
+
+
 class TestFileWatcher:
     '''watches files or paths'''
     def __init__(self, *paths):



More information about the pytest-commit mailing list