[Jython-checkins] jython: Make select and socket tests more robust

jim.baker jython-checkins at python.org
Fri Jan 9 23:20:06 CET 2015


https://hg.python.org/jython/rev/68b3da7765c9
changeset:   7524:68b3da7765c9
user:        Jim Baker <jim.baker at rackspace.com>
date:        Fri Jan 09 15:18:05 2015 -0700
summary:
  Make select and socket tests more robust

Better handle TIME_WAIT issues when repeatedly reusing sockets in
select and socket tests. Also parse java.net.SocketException for
EADDRINUSE when used by a client socket to bind to a local socket.

files:
  Lib/_socket.py              |  39 ++++++++++++++++++++----
  Lib/ssl.py                  |   8 +---
  Lib/test/test_select.py     |  10 ++++-
  Lib/test/test_select_new.py |  26 ++++++++++------
  Lib/test/test_socket.py     |  38 ++++++++++++++++++++---
  Lib/test/test_support.py    |  25 ++++++++++++++++
  6 files changed, 114 insertions(+), 32 deletions(-)


diff --git a/Lib/_socket.py b/Lib/_socket.py
--- a/Lib/_socket.py
+++ b/Lib/_socket.py
@@ -275,6 +275,8 @@
         return _add_exception_attrs(
             error(errno.EAFNOSUPPORT, 
                   'Address family not supported by protocol family: See http://wiki.python.org/jython/NewSocketModule#IPV6_address_support'))
+    if exc.message.startswith('Address already in use'):
+        return error(errno.EADDRINUSE, 'Address already in use')
     return _unmapped_exception(exc)
 
 
@@ -546,7 +548,8 @@
                 result = self._handle_poll(partial(self.queue.poll, timeout_in_ns, TimeUnit.NANOSECONDS))
                 if result:
                     return result
-                timeout = timeout - (time.time() - started)
+                timeout -= time.time() - started
+                log.debug("Spurious wakeup, retrying with timeout=%s", timeout, extra={"sock": "*"})
             return []
 
 
@@ -764,7 +767,8 @@
         elif self.timeout:
             self._handle_timeout(future.await, reason)
             if not future.isSuccess():
-                log.exception("Got this failure %s during %s", future.cause(), reason, extra={"sock": self})
+                log.debug("Got this failure %s during %s", future.cause(), reason, extra={"sock": self})
+                print "Got this failure %s during %s (%s)" % (future.cause(), reason, self)
                 raise future.cause()
             return future
         else:
@@ -916,10 +920,10 @@
         self.child_handler = ChildSocketHandler(self)
         b.childHandler(self.child_handler)
 
-        future = b.bind(self.bind_addr.getAddress(), self.bind_addr.getPort())
-        self._handle_channel_future(future, "listen")
+        self.bind_future = b.bind(self.bind_addr.getAddress(), self.bind_addr.getPort())
+        self._handle_channel_future(self.bind_future, "listen")
         self.bind_timestamp = time.time()
-        self.channel = future.channel()
+        self.channel = self.bind_future.channel()
         log.debug("Bound server socket to %s", self.bind_addr, extra={"sock": self})
 
     def accept(self):
@@ -1060,6 +1064,23 @@
         else:
             return False
 
+    def _pending(self):
+        # Used by ssl.py for an undocumented function used in tests
+        # and of course some user code. Note that with Netty,
+        # readableBytes() in incoming or incoming_head are guaranteed
+        # to be plaintext because of the way pipelines work.  However
+        # this is a terrible function to call because it's trying to
+        # do something synchronous in the async setting of sockets.
+        if self.socket_type == CLIENT_SOCKET or self.socket_type == DATAGRAM_SOCKET:
+            if self.incoming_head is not None:
+                pending = self.incoming_head.readableBytes()
+            else:
+                pending = 0
+            for msg in self.incoming:
+                pending += msg.readableBytes()
+            return pending
+        return 0
+
     def _writable(self):
         return self.channel and self.channel.isActive() and self.channel.isWritable()
 
@@ -1217,13 +1238,17 @@
         while True:
             local_addr = self.channel.localAddress()
             if local_addr:
-                break
+                if hasattr(self, "bind_future"):
+                    if self.bind_future.isDone():
+                        break
+                else:
+                    break
             if time.time() - self.bind_timestamp > 1:
                 # Presumably after a second something is completely wrong,
                 # so punt
                 raise error(errno.ENOTCONN, "Socket is not connected")
             log.debug("Poll for local address", extra={"sock": self})
-            time.sleep(0.1)  # completely arbitrary
+            time.sleep(0.01)  # completely arbitrary
         if local_addr.getAddress().isAnyLocalAddress():
             # Netty 4 will default to an IPv6 "any" address from a channel even if it was originally bound to an IPv4 "any" address
             # so, as a workaround, let's construct a new "any" address using the port information gathered above
diff --git a/Lib/ssl.py b/Lib/ssl.py
--- a/Lib/ssl.py
+++ b/Lib/ssl.py
@@ -26,8 +26,7 @@
     SSL_ERROR_WANT_CONNECT,
     SSL_ERROR_EOF,
     SSL_ERROR_INVALID_ERROR_CODE,
-    error as socket_error,
-    CLIENT_SOCKET, DATAGRAM_SOCKET)
+    error as socket_error)
 from _sslcerts import _get_ssl_context
 
 from java.text import SimpleDateFormat
@@ -204,10 +203,7 @@
     def pending(self):
         # undocumented function, used by some tests
         # see also http://bugs.python.org/issue21430
-        if self._sock.socket_type == CLIENT_SOCKET or self._sock.socket_type == DATAGRAM_SOCKET:
-            if self._sock.incoming_head is not None:
-                return self._sock.incoming_head.readableBytes()
-        return 0
+        return self._sock._pending()
 
     def _readable(self):
         return self._sock._readable()
diff --git a/Lib/test/test_select.py b/Lib/test/test_select.py
--- a/Lib/test/test_select.py
+++ b/Lib/test/test_select.py
@@ -140,6 +140,7 @@
     def testSocketRegisteredBeforeConnected(self):
         pass
 
+    @test_support.retry(Exception)
     def _testSocketRegisteredBeforeConnected(self):
         timeout = 1000 # milliseconds
         poll_object = select.poll()
@@ -147,14 +148,14 @@
         poll_object.register(self.cli, select.POLLOUT)
         result_list = poll_object.poll(timeout)
         result_sockets = [r[0] for r in result_list]
-        self.failUnless(self.cli in result_sockets, "Unconnected client socket should be selectable")
+        self.assertIn(self.cli, result_sockets, "Unconnected client socket should be selectable")
         # Now connect the socket, but DO NOT register it again
         self.cli.setblocking(0)
         self.cli.connect( (self.HOST, self.PORT) )
         # Now poll again, to check that the poll object has recognised that the socket is now connected
         result_list = poll_object.poll(timeout)
         result_sockets = [r[0] for r in result_list]
-        self.failUnless(self.cli in result_sockets, "Connected client socket should have been selectable")
+        self.assertIn(self.cli, result_sockets, "Connected client socket should have been selectable")
 
     def testSelectOnSocketFileno(self):
         pass
@@ -178,7 +179,10 @@
 
 def test_main():
 
-    tests = [TestSelectInvalidParameters, TestSelectClientSocket, TestPollClientSocket, ThreadedPollClientSocket,
+    tests = [TestSelectInvalidParameters,
+             TestSelectClientSocket,
+             TestPollClientSocket,
+             ThreadedPollClientSocket,
              TestJythonSelect]
     suites = [unittest.makeSuite(klass, 'test') for klass in tests]
     test_support._run_suite(unittest.TestSuite(suites))
diff --git a/Lib/test/test_select_new.py b/Lib/test/test_select_new.py
--- a/Lib/test/test_select_new.py
+++ b/Lib/test/test_select_new.py
@@ -10,7 +10,7 @@
 import socket
 import select
 
-SERVER_ADDRESS = ("localhost", 54321)
+SERVER_ADDRESS = ("localhost", 0)
 
 DATA_CHUNK_SIZE = 1000
 DATA_CHUNK = "." * DATA_CHUNK_SIZE
@@ -22,9 +22,7 @@
 # The fundamental problem is that there is no reliable way to fill a socket with bytes
 # To address this for running on Netty, we arbitrarily send 10000 bytes
 
-# zero select timeout fails these tests on cpython (on windows 2003 anyway);
-# on Jython with Netty it will result in flaky test runs
-SELECT_TIMEOUT = 0.001
+SELECT_TIMEOUT = 0
 READ_TIMEOUT = 5
 
 class AsynchronousServer:
@@ -35,6 +33,7 @@
         self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
         self.server_socket.bind(SERVER_ADDRESS)
         self.server_socket.listen(5)
+        self.server_addr = self.server_socket.getsockname()
         try:
             self.server_socket.accept()
         except socket.error:
@@ -142,16 +141,18 @@
 
 class AsynchronousClient(AsynchronousHandler):
 
-    def __init__(self):
+    def __init__(self, server_addr):
+        self.server_addr = server_addr
         AsynchronousHandler.__init__(self, socket.socket(socket.AF_INET, socket.SOCK_STREAM))
         self.connected = 0
 
     def start_connect(self):
-        result = self.socket.connect_ex(SERVER_ADDRESS)
+        result = self.socket.connect_ex(self.server_addr)
         if result == errno.EISCONN:
             self.connected = True
         else:
-            assert result == errno.EINPROGRESS
+            assert result == errno.EINPROGRESS, \
+                "connect_ex returned %s (%s)" % (result, errno.errorcode.get(result, "Unknown errno"))
 
     def finish_connect(self):
         if self.connected:
@@ -168,9 +169,10 @@
 class TestSelectOnAccept(unittest.TestCase):
     def setUp(self):
         self.server = AsynchronousServer()
-        self.client = AsynchronousClient()
+        self.client = AsynchronousClient(self.server.server_addr)
         self.handler = None
 
+    @test_support.retry(Exception)
     def testSelectOnAccept(self):
         self.server.verify_not_acceptable()
         self.client.start_connect()
@@ -186,9 +188,10 @@
         self.server.close()
 
 class TestSelect(unittest.TestCase):
+    @test_support.retry(Exception)
     def setUp(self):
         self.server = AsynchronousServer()
-        self.client = AsynchronousClient()
+        self.client = AsynchronousClient(self.server.server_addr)
         self.client.start_connect()
         self.handler = self.server.accept()
         self.client.finish_connect()
@@ -198,19 +201,21 @@
         self.handler.close()
         self.server.close()
 
+    @test_support.retry(Exception)
     def testClientOut(self):
         self.client.verify_only_writable()
         self.handler.verify_only_writable()
 
         written = self.client.write()
         self.handler.verify_readable()
-
+            
         self.handler.read(written/2)
         self.handler.verify_readable()
 
         self.handler.read(written/2)
         self.handler.verify_not_readable()
 
+    @test_support.retry(Exception)
     def testHandlerOut(self):
         written = self.handler.write()
         self.client.verify_readable()
@@ -221,6 +226,7 @@
         self.client.read(written/2)
         self.client.verify_not_readable()
 
+    @test_support.retry(Exception)
     def testBothOut(self):
         client_written = self.client.write()
         handler_written = self.handler.write()
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -111,7 +111,16 @@
         it wants the client thread to proceed. This is useful if the
         server is about to execute a blocking routine that is
         dependent upon the client thread during its setup routine."""
-        self.server_ready.set()
+
+        def be_ready():
+            # Because of socket reuse, old server sockets may still be
+            # accepting client connections as they get shutdown, but
+            # before they accept with the new server socket.
+            #
+            # Avoid race by ensuring accept is started before clients
+            # attempt to connect.
+            self.server_ready.set()
+        threading.Timer(0.1, be_ready).start()
 
     def _setUp(self):
         self.server_ready = threading.Event()
@@ -1342,10 +1351,20 @@
 
     def testNonBlockingConnect(self):
         # Testing non-blocking connect
-        conn, addr = self.serv.accept()
+        # this can potentially race with the client, so we need to loop
+        while True:
+            read, write, err = select.select([self.serv], [], [], 0.1)
+            if read or write or err:
+                break
+        if self.serv in read:
+            conn, addr = self.serv.accept()
+            conn.close()
+        else:
+            self.fail("Error trying to do accept after select: server socket was not in 'read'able list")
 
     def _testNonBlockingConnect(self):
         # Testing non-blocking connect
+        time.sleep(0.1)
         self.cli.setblocking(0)
         result = self.cli.connect_ex((self.HOST, self.PORT))
         while True:
@@ -1366,6 +1385,7 @@
     def _testConnectWithLocalBind(self):
         # Testing blocking connect with local bind
         cli_port = self.PORT - 1
+        start = time.time()
         while True:
             # Keep trying until a local port is available
             self.cli.settimeout(1)
@@ -1378,12 +1398,17 @@
                 # previous test run). reset the client socket and try
                 # again
                 self.failUnlessEqual(se[0], errno.EADDRINUSE)
+                print "Got an error in connect, will retry", se
                 try:
                     self.cli.close()
                 except socket.error:
                     pass
                 self.clientSetUp()
                 cli_port -= 1
+            # Make sure we have no tests currently holding open this socket
+            test_support.gc_collect()
+            if time.time() - start > 5:
+                self.fail("Timed out after 5 seconds")
         bound_host, bound_port = self.cli.getsockname()
         self.failUnlessEqual(bound_port, cli_port)
 
@@ -1726,7 +1751,8 @@
         self.failUnlessRaises(socket.timeout, raise_timeout,
                               "TCP socket recv failed to generate a timeout exception (TCP)")
 
-    def estSendTimeout(self):
+    @unittest.skipIf(test_support.is_jython, "This test takes a very long time")
+    def testSendTimeout(self):
         def raise_timeout(*args, **kwargs):
             cli_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
             cli_sock.connect( (self.HOST, self.PORT) )
@@ -1889,7 +1915,7 @@
             # Maybe no IPv6 configured on the test machine.
             return
         ipv6_address_tuple = addrinfo[0][4]
-        self.failUnless     (ipv6_address_tuple[0] in ["::1", "0:0:0:0:0:0:0:1"])
+        self.assertIn(ipv6_address_tuple[0], ["::1", "0:0:0:0:0:0:0:1"])
         self.failUnlessEqual(ipv6_address_tuple[1], 80)
         self.failUnlessEqual(ipv6_address_tuple[2], 0)
         # Can't have an expectation for scope
@@ -1900,8 +1926,8 @@
         self.failUnlessRaises(IndexError, lambda: ipv6_address_tuple[4])
         # These str/repr tests may fail on some systems: the scope element of the tuple may be non-zero
         # In this case, we'll have to change the test to use .startswith() or .split() to exclude the scope element
-        self.failUnless(str(ipv6_address_tuple) in ["('::1', 80, 0, 0)", "('0:0:0:0:0:0:0:1', 80, 0, 0)"])
-        self.failUnless(repr(ipv6_address_tuple) in ["('::1', 80, 0, 0)", "('0:0:0:0:0:0:0:1', 80, 0, 0)"])
+        self.assertIn(str(ipv6_address_tuple), ["('::1', 80, 0, 0)", "('0:0:0:0:0:0:0:1', 80, 0, 0)"])
+        self.assertIn(repr(ipv6_address_tuple), ["('::1', 80, 0, 0)", "('0:0:0:0:0:0:0:1', 80, 0, 0)"])
 
     def testNonIntPort(self):
         hostname = "localhost"
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -1441,3 +1441,28 @@
     """
     stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
     return stderr
+
+def retry(exceptions, tries=6, delay=3, backoff=1.2):
+    # modified from https://wiki.python.org/moin/PythonDecoratorLibrary#Retry
+    def deco_retry(f):
+
+        def wrapper(*args, **kwds):
+            mtries, mdelay = tries, delay
+            while mtries > 1:
+                try:
+                    return f(*args, **kwds)
+                except exceptions as e:
+                    print "Got %s, retrying in %.2f seconds..." % (str(e), mdelay)
+                    # FIXME resource cleanup continues to be an issue
+                    # in terms of tests we use from CPython. This only
+                    # represents a bandaid - useful as it might be -
+                    # and it should be revisited.
+                    gc_collect()
+                    time.sleep(mdelay)
+                    mtries -= 1
+                    mdelay *= backoff
+            return f(*args, **kwds)
+
+        return wrapper
+
+    return deco_retry

-- 
Repository URL: https://hg.python.org/jython


More information about the Jython-checkins mailing list