[Python-checkins] r71377 - python/trunk/Lib/test/test_telnetlib.py

jack.diederich python-checkins at python.org
Tue Apr 7 22:22:59 CEST 2009


Author: jack.diederich
Date: Tue Apr  7 22:22:59 2009
New Revision: 71377

Log:
eliminate more race conditions in telnetlib tests

Modified:
   python/trunk/Lib/test/test_telnetlib.py

Modified: python/trunk/Lib/test/test_telnetlib.py
==============================================================================
--- python/trunk/Lib/test/test_telnetlib.py	(original)
+++ python/trunk/Lib/test/test_telnetlib.py	Tue Apr  7 22:22:59 2009
@@ -13,8 +13,8 @@
 def server(evt, serv, dataq=None):
     """ Open a tcp server in three steps
         1) set evt to true to let the parent know we are ready
-        2) [optional] write all the data in dataq to the socket
-           terminate when dataq.get() returns EOF_sigil
+        2) [optional] if is not False, write the list of data from dataq.get()
+           to the socket.
         3) set evt to true to let the parent know we're done
     """
     serv.listen(5)
@@ -23,15 +23,19 @@
         conn, addr = serv.accept()
         if dataq:
             data = ''
-            new_data = dataq.get(True, 0.5)
-            while new_data is not EOF_sigil:
-                if type(new_data) == str:
-                    data += new_data
-                elif type(new_data) in [int, float]:
+            done = False
+            for new_data in dataq.get(True, 0.5):
+                if not done:
+                    dataq.task_done()
+                    done = True
+                if new_data == EOF_sigil:
+                    break
+                if type(new_data) in [int, float]:
                     time.sleep(new_data)
+                else:
+                    data += new_data
                 written = conn.send(data)
                 data = data[written:]
-                new_data = dataq.get(True, 0.5)
     except socket.timeout:
         pass
     finally:
@@ -98,7 +102,7 @@
 
 def _read_setUp(self):
     # the blocking constant should be tuned!
-    self.blocking_timeout = 0.0
+    self.blocking_timeout = 0.3
     self.evt = threading.Event()
     self.dataq = Queue.Queue()
     self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -114,19 +118,10 @@
     self.evt.wait()
     self.thread.join()
 
-
 class ReadTests(TestCase):
     setUp = _read_setUp
     tearDown = _read_tearDown
 
-    def _test_blocking(self, func):
-        start = time.time()
-        self.dataq.put(self.blocking_timeout)
-        self.dataq.put(EOF_sigil)
-        data = func()
-        low, high = wibble_float(self.blocking_timeout)
-        self.assertTrue(time.time() - start >= low)
-
     def test_read_until_A(self):
         """
         read_until(expected, [timeout])
@@ -134,23 +129,19 @@
           hit (default is no timeout); may block.
         """
         want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
-        for item in want:
-            self.dataq.put(item)
+        self.dataq.put(want)
         telnet = telnetlib.Telnet(HOST, self.port)
+        self.dataq.join()
         data = telnet.read_until('match')
         self.assertEqual(data, ''.join(want[:-2]))
 
     def test_read_until_B(self):
         # test the timeout - it does NOT raise socket.timeout
         want = ['hello', self.blocking_timeout, EOF_sigil]
-        for item in want:
-            self.dataq.put(item)
+        self.dataq.put(want)
         telnet = telnetlib.Telnet(HOST, self.port)
-        start = time.time()
-        timeout = self.blocking_timeout / 2
-        data = telnet.read_until('not seen', timeout)
-        low, high = wibble_float(timeout)
-        self.assertTrue(low <= time.time() - high)
+        self.dataq.join()
+        data = telnet.read_until('not seen')
         self.assertEqual(data, want[0])
 
     def test_read_all_A(self):
@@ -159,16 +150,31 @@
           Read all data until EOF; may block.
         """
         want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
-        for item in want:
-            self.dataq.put(item)
+        self.dataq.put(want)
         telnet = telnetlib.Telnet(HOST, self.port)
+        self.dataq.join()
         data = telnet.read_all()
         self.assertEqual(data, ''.join(want[:-1]))
         return
 
+    def _test_blocking(self, func):
+        self.dataq.put([self.blocking_timeout, EOF_sigil])
+        self.dataq.join()
+        start = time.time()
+        data = func()
+        low, high = wibble_float(self.blocking_timeout)
+        self.assertTrue(low <= time.time() - start)
+
     def test_read_all_B(self):
         self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
 
+    def test_read_all_C(self):
+        self.dataq.put([EOF_sigil])
+        telnet = telnetlib.Telnet(HOST, self.port)
+        self.dataq.join()
+        telnet.read_all()
+        telnet.read_all() # shouldn't raise
+
     def test_read_some_A(self):
         """
         read_some()
@@ -176,19 +182,20 @@
         """
         # test 'at least one byte'
         want = ['x' * 500, EOF_sigil]
-        for item in want:
-            self.dataq.put(item)
+        self.dataq.put(want)
         telnet = telnetlib.Telnet(HOST, self.port)
+        self.dataq.join()
         data = telnet.read_all()
         self.assertTrue(len(data) >= 1)
 
     def test_read_some_B(self):
         # test EOF
-        self.dataq.put(EOF_sigil)
+        self.dataq.put([EOF_sigil])
         telnet = telnetlib.Telnet(HOST, self.port)
+        self.dataq.join()
         self.assertEqual('', telnet.read_some())
 
-    def test_read_all_C(self):
+    def test_read_some_C(self):
         self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
 
     def _test_read_any_eager_A(self, func_name):
@@ -200,26 +207,24 @@
         # this never blocks so it should return eat part in turn
         want = ['x' * 100, self.blocking_timeout/2, 'y' * 100, EOF_sigil]
         expects = want[0] + want[2]
-        for item in want:
-            self.dataq.put(item)
+        self.dataq.put(want)
         telnet = telnetlib.Telnet(HOST, self.port)
+        self.dataq.join()
         func = getattr(telnet, func_name)
-        time.sleep(self.blocking_timeout/10)
         data = ''
         while True:
             try:
                 data += func()
                 self.assertTrue(expects.startswith(data))
-                time.sleep(self.blocking_timeout)
             except EOFError:
                 break
         self.assertEqual(expects, data)
 
     def _test_read_any_eager_B(self, func_name):
         # test EOF
-        self.dataq.put(EOF_sigil)
-        time.sleep(self.blocking_timeout / 10)
+        self.dataq.put([EOF_sigil])
         telnet = telnetlib.Telnet(HOST, self.port)
+        self.dataq.join()
         func = getattr(telnet, func_name)
         self.assertRaises(EOFError, func)
 
@@ -238,14 +243,14 @@
 
     def _test_read_any_lazy_A(self, func_name):
         want = [self.blocking_timeout/2, 'x' * 100, EOF_sigil]
-        for item in want:
-            self.dataq.put(item)
+        self.dataq.put(want)
         telnet = telnetlib.Telnet(HOST, self.port)
+        self.dataq.join()
         func = getattr(telnet, func_name)
         self.assertEqual('', func())
         data = ''
         while True:
-            time.sleep(self.blocking_timeout)
+            time.sleep(0.0)
             try:
                 telnet.fill_rawq()
                 data += func()
@@ -257,10 +262,11 @@
         return data, want[1]
 
     def _test_read_any_lazy_B(self, func_name):
-        self.dataq.put(EOF_sigil)
+        self.dataq.put([EOF_sigil])
         telnet = telnetlib.Telnet(HOST, self.port)
+        self.dataq.join()
         func = getattr(telnet, func_name)
-        time.sleep(self.blocking_timeout/10)
+        time.sleep(0.0)
         telnet.fill_rawq()
         self.assertRaises(EOFError, func)
 
@@ -298,12 +304,11 @@
     def _test_command(self, data):
         """ helper for testing IAC + cmd """
         self.setUp()
-        for item in data:
-            self.dataq.put(item)
+        self.dataq.put(data)
         telnet = telnetlib.Telnet(HOST, self.port)
+        self.dataq.join()
         nego = nego_collector()
         telnet.set_option_negotiation_callback(nego.do_nego)
-        time.sleep(self.blocking_timeout/10)
         txt = telnet.read_all()
         cmd = nego.seen
         self.assertTrue(len(cmd) > 0) # we expect at least one command
@@ -314,8 +319,9 @@
 
     def test_IAC_commands(self):
         # reset our setup
-        self.dataq.put(EOF_sigil)
+        self.dataq.put([EOF_sigil])
         telnet = telnetlib.Telnet(HOST, self.port)
+        self.dataq.join()
         self.tearDown()
 
         for cmd in self.cmds:
@@ -324,6 +330,7 @@
             self._test_command([tl.IAC + cmd, EOF_sigil])
         # all at once
         self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
+        self.assertEqual('', telnet.read_sb_data())
 
     def test_SB_commands(self):
         # RFC 855, subnegotiations portion
@@ -334,16 +341,16 @@
                 tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
                 EOF_sigil,
                ]
-        for item in send:
-            self.dataq.put(item)
+        self.dataq.put(send)
         telnet = telnetlib.Telnet(HOST, self.port)
+        self.dataq.join()
         nego = nego_collector(telnet.read_sb_data)
         telnet.set_option_negotiation_callback(nego.do_nego)
-        time.sleep(self.blocking_timeout/10)
         txt = telnet.read_all()
         self.assertEqual(txt, '')
         want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
         self.assertEqual(nego.sb_seen, want_sb_data)
+        self.assertEqual('', telnet.read_sb_data())
 
 def test_main(verbose=None):
     test_support.run_unittest(GeneralTests, ReadTests, OptionTests)


More information about the Python-checkins mailing list