[Jython-checkins] jython: Make select and socket tests more robust
jim.baker
jython-checkins at python.org
Fri Jan 9 23:20:06 CET 2015
https://hg.python.org/jython/rev/68b3da7765c9
changeset: 7524:68b3da7765c9
user: Jim Baker <jim.baker at rackspace.com>
date: Fri Jan 09 15:18:05 2015 -0700
summary:
Make select and socket tests more robust
Better handle TIME_WAIT issues when repeatedly reusing sockets in
select and socket tests. Also parse java.net.SocketException for
EADDRINUSE when used by a client socket to bind to a local socket.
files:
Lib/_socket.py | 39 ++++++++++++++++++++----
Lib/ssl.py | 8 +---
Lib/test/test_select.py | 10 ++++-
Lib/test/test_select_new.py | 26 ++++++++++------
Lib/test/test_socket.py | 38 ++++++++++++++++++++---
Lib/test/test_support.py | 25 ++++++++++++++++
6 files changed, 114 insertions(+), 32 deletions(-)
diff --git a/Lib/_socket.py b/Lib/_socket.py
--- a/Lib/_socket.py
+++ b/Lib/_socket.py
@@ -275,6 +275,8 @@
return _add_exception_attrs(
error(errno.EAFNOSUPPORT,
'Address family not supported by protocol family: See http://wiki.python.org/jython/NewSocketModule#IPV6_address_support'))
+ if exc.message.startswith('Address already in use'):
+ return error(errno.EADDRINUSE, 'Address already in use')
return _unmapped_exception(exc)
@@ -546,7 +548,8 @@
result = self._handle_poll(partial(self.queue.poll, timeout_in_ns, TimeUnit.NANOSECONDS))
if result:
return result
- timeout = timeout - (time.time() - started)
+ timeout -= time.time() - started
+ log.debug("Spurious wakeup, retrying with timeout=%s", timeout, extra={"sock": "*"})
return []
@@ -764,7 +767,8 @@
elif self.timeout:
self._handle_timeout(future.await, reason)
if not future.isSuccess():
- log.exception("Got this failure %s during %s", future.cause(), reason, extra={"sock": self})
+ log.debug("Got this failure %s during %s", future.cause(), reason, extra={"sock": self})
+ print "Got this failure %s during %s (%s)" % (future.cause(), reason, self)
raise future.cause()
return future
else:
@@ -916,10 +920,10 @@
self.child_handler = ChildSocketHandler(self)
b.childHandler(self.child_handler)
- future = b.bind(self.bind_addr.getAddress(), self.bind_addr.getPort())
- self._handle_channel_future(future, "listen")
+ self.bind_future = b.bind(self.bind_addr.getAddress(), self.bind_addr.getPort())
+ self._handle_channel_future(self.bind_future, "listen")
self.bind_timestamp = time.time()
- self.channel = future.channel()
+ self.channel = self.bind_future.channel()
log.debug("Bound server socket to %s", self.bind_addr, extra={"sock": self})
def accept(self):
@@ -1060,6 +1064,23 @@
else:
return False
+ def _pending(self):
+ # Used by ssl.py for an undocumented function used in tests
+ # and of course some user code. Note that with Netty,
+ # readableBytes() in incoming or incoming_head are guaranteed
+ # to be plaintext because of the way pipelines work. However
+ # this is a terrible function to call because it's trying to
+ # do something synchronous in the async setting of sockets.
+ if self.socket_type == CLIENT_SOCKET or self.socket_type == DATAGRAM_SOCKET:
+ if self.incoming_head is not None:
+ pending = self.incoming_head.readableBytes()
+ else:
+ pending = 0
+ for msg in self.incoming:
+ pending += msg.readableBytes()
+ return pending
+ return 0
+
def _writable(self):
return self.channel and self.channel.isActive() and self.channel.isWritable()
@@ -1217,13 +1238,17 @@
while True:
local_addr = self.channel.localAddress()
if local_addr:
- break
+ if hasattr(self, "bind_future"):
+ if self.bind_future.isDone():
+ break
+ else:
+ break
if time.time() - self.bind_timestamp > 1:
# Presumably after a second something is completely wrong,
# so punt
raise error(errno.ENOTCONN, "Socket is not connected")
log.debug("Poll for local address", extra={"sock": self})
- time.sleep(0.1) # completely arbitrary
+ time.sleep(0.01) # completely arbitrary
if local_addr.getAddress().isAnyLocalAddress():
# Netty 4 will default to an IPv6 "any" address from a channel even if it was originally bound to an IPv4 "any" address
# so, as a workaround, let's construct a new "any" address using the port information gathered above
diff --git a/Lib/ssl.py b/Lib/ssl.py
--- a/Lib/ssl.py
+++ b/Lib/ssl.py
@@ -26,8 +26,7 @@
SSL_ERROR_WANT_CONNECT,
SSL_ERROR_EOF,
SSL_ERROR_INVALID_ERROR_CODE,
- error as socket_error,
- CLIENT_SOCKET, DATAGRAM_SOCKET)
+ error as socket_error)
from _sslcerts import _get_ssl_context
from java.text import SimpleDateFormat
@@ -204,10 +203,7 @@
def pending(self):
# undocumented function, used by some tests
# see also http://bugs.python.org/issue21430
- if self._sock.socket_type == CLIENT_SOCKET or self._sock.socket_type == DATAGRAM_SOCKET:
- if self._sock.incoming_head is not None:
- return self._sock.incoming_head.readableBytes()
- return 0
+ return self._sock._pending()
def _readable(self):
return self._sock._readable()
diff --git a/Lib/test/test_select.py b/Lib/test/test_select.py
--- a/Lib/test/test_select.py
+++ b/Lib/test/test_select.py
@@ -140,6 +140,7 @@
def testSocketRegisteredBeforeConnected(self):
pass
+ @test_support.retry(Exception)
def _testSocketRegisteredBeforeConnected(self):
timeout = 1000 # milliseconds
poll_object = select.poll()
@@ -147,14 +148,14 @@
poll_object.register(self.cli, select.POLLOUT)
result_list = poll_object.poll(timeout)
result_sockets = [r[0] for r in result_list]
- self.failUnless(self.cli in result_sockets, "Unconnected client socket should be selectable")
+ self.assertIn(self.cli, result_sockets, "Unconnected client socket should be selectable")
# Now connect the socket, but DO NOT register it again
self.cli.setblocking(0)
self.cli.connect( (self.HOST, self.PORT) )
# Now poll again, to check that the poll object has recognised that the socket is now connected
result_list = poll_object.poll(timeout)
result_sockets = [r[0] for r in result_list]
- self.failUnless(self.cli in result_sockets, "Connected client socket should have been selectable")
+ self.assertIn(self.cli, result_sockets, "Connected client socket should have been selectable")
def testSelectOnSocketFileno(self):
pass
@@ -178,7 +179,10 @@
def test_main():
- tests = [TestSelectInvalidParameters, TestSelectClientSocket, TestPollClientSocket, ThreadedPollClientSocket,
+ tests = [TestSelectInvalidParameters,
+ TestSelectClientSocket,
+ TestPollClientSocket,
+ ThreadedPollClientSocket,
TestJythonSelect]
suites = [unittest.makeSuite(klass, 'test') for klass in tests]
test_support._run_suite(unittest.TestSuite(suites))
diff --git a/Lib/test/test_select_new.py b/Lib/test/test_select_new.py
--- a/Lib/test/test_select_new.py
+++ b/Lib/test/test_select_new.py
@@ -10,7 +10,7 @@
import socket
import select
-SERVER_ADDRESS = ("localhost", 54321)
+SERVER_ADDRESS = ("localhost", 0)
DATA_CHUNK_SIZE = 1000
DATA_CHUNK = "." * DATA_CHUNK_SIZE
@@ -22,9 +22,7 @@
# The fundamental problem is that there is no reliable way to fill a socket with bytes
# To address this for running on Netty, we arbitrarily send 10000 bytes
-# zero select timeout fails these tests on cpython (on windows 2003 anyway);
-# on Jython with Netty it will result in flaky test runs
-SELECT_TIMEOUT = 0.001
+SELECT_TIMEOUT = 0
READ_TIMEOUT = 5
class AsynchronousServer:
@@ -35,6 +33,7 @@
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind(SERVER_ADDRESS)
self.server_socket.listen(5)
+ self.server_addr = self.server_socket.getsockname()
try:
self.server_socket.accept()
except socket.error:
@@ -142,16 +141,18 @@
class AsynchronousClient(AsynchronousHandler):
- def __init__(self):
+ def __init__(self, server_addr):
+ self.server_addr = server_addr
AsynchronousHandler.__init__(self, socket.socket(socket.AF_INET, socket.SOCK_STREAM))
self.connected = 0
def start_connect(self):
- result = self.socket.connect_ex(SERVER_ADDRESS)
+ result = self.socket.connect_ex(self.server_addr)
if result == errno.EISCONN:
self.connected = True
else:
- assert result == errno.EINPROGRESS
+ assert result == errno.EINPROGRESS, \
+ "connect_ex returned %s (%s)" % (result, errno.errorcode.get(result, "Unknown errno"))
def finish_connect(self):
if self.connected:
@@ -168,9 +169,10 @@
class TestSelectOnAccept(unittest.TestCase):
def setUp(self):
self.server = AsynchronousServer()
- self.client = AsynchronousClient()
+ self.client = AsynchronousClient(self.server.server_addr)
self.handler = None
+ @test_support.retry(Exception)
def testSelectOnAccept(self):
self.server.verify_not_acceptable()
self.client.start_connect()
@@ -186,9 +188,10 @@
self.server.close()
class TestSelect(unittest.TestCase):
+ @test_support.retry(Exception)
def setUp(self):
self.server = AsynchronousServer()
- self.client = AsynchronousClient()
+ self.client = AsynchronousClient(self.server.server_addr)
self.client.start_connect()
self.handler = self.server.accept()
self.client.finish_connect()
@@ -198,19 +201,21 @@
self.handler.close()
self.server.close()
+ @test_support.retry(Exception)
def testClientOut(self):
self.client.verify_only_writable()
self.handler.verify_only_writable()
written = self.client.write()
self.handler.verify_readable()
-
+
self.handler.read(written/2)
self.handler.verify_readable()
self.handler.read(written/2)
self.handler.verify_not_readable()
+ @test_support.retry(Exception)
def testHandlerOut(self):
written = self.handler.write()
self.client.verify_readable()
@@ -221,6 +226,7 @@
self.client.read(written/2)
self.client.verify_not_readable()
+ @test_support.retry(Exception)
def testBothOut(self):
client_written = self.client.write()
handler_written = self.handler.write()
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -111,7 +111,16 @@
it wants the client thread to proceed. This is useful if the
server is about to execute a blocking routine that is
dependent upon the client thread during its setup routine."""
- self.server_ready.set()
+
+ def be_ready():
+ # Because of socket reuse, old server sockets may still be
+ # accepting client connections as they get shutdown, but
+ # before they accept with the new server socket.
+ #
+ # Avoid race by ensuring accept is started before clients
+ # attempt to connect.
+ self.server_ready.set()
+ threading.Timer(0.1, be_ready).start()
def _setUp(self):
self.server_ready = threading.Event()
@@ -1342,10 +1351,20 @@
def testNonBlockingConnect(self):
# Testing non-blocking connect
- conn, addr = self.serv.accept()
+ # this can potentially race with the client, so we need to loop
+ while True:
+ read, write, err = select.select([self.serv], [], [], 0.1)
+ if read or write or err:
+ break
+ if self.serv in read:
+ conn, addr = self.serv.accept()
+ conn.close()
+ else:
+ self.fail("Error trying to do accept after select: server socket was not in 'read'able list")
def _testNonBlockingConnect(self):
# Testing non-blocking connect
+ time.sleep(0.1)
self.cli.setblocking(0)
result = self.cli.connect_ex((self.HOST, self.PORT))
while True:
@@ -1366,6 +1385,7 @@
def _testConnectWithLocalBind(self):
# Testing blocking connect with local bind
cli_port = self.PORT - 1
+ start = time.time()
while True:
# Keep trying until a local port is available
self.cli.settimeout(1)
@@ -1378,12 +1398,17 @@
# previous test run). reset the client socket and try
# again
self.failUnlessEqual(se[0], errno.EADDRINUSE)
+ print "Got an error in connect, will retry", se
try:
self.cli.close()
except socket.error:
pass
self.clientSetUp()
cli_port -= 1
+ # Make sure we have no tests currently holding open this socket
+ test_support.gc_collect()
+ if time.time() - start > 5:
+ self.fail("Timed out after 5 seconds")
bound_host, bound_port = self.cli.getsockname()
self.failUnlessEqual(bound_port, cli_port)
@@ -1726,7 +1751,8 @@
self.failUnlessRaises(socket.timeout, raise_timeout,
"TCP socket recv failed to generate a timeout exception (TCP)")
- def estSendTimeout(self):
+ @unittest.skipIf(test_support.is_jython, "This test takes a very long time")
+ def testSendTimeout(self):
def raise_timeout(*args, **kwargs):
cli_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
cli_sock.connect( (self.HOST, self.PORT) )
@@ -1889,7 +1915,7 @@
# Maybe no IPv6 configured on the test machine.
return
ipv6_address_tuple = addrinfo[0][4]
- self.failUnless (ipv6_address_tuple[0] in ["::1", "0:0:0:0:0:0:0:1"])
+ self.assertIn(ipv6_address_tuple[0], ["::1", "0:0:0:0:0:0:0:1"])
self.failUnlessEqual(ipv6_address_tuple[1], 80)
self.failUnlessEqual(ipv6_address_tuple[2], 0)
# Can't have an expectation for scope
@@ -1900,8 +1926,8 @@
self.failUnlessRaises(IndexError, lambda: ipv6_address_tuple[4])
# These str/repr tests may fail on some systems: the scope element of the tuple may be non-zero
# In this case, we'll have to change the test to use .startswith() or .split() to exclude the scope element
- self.failUnless(str(ipv6_address_tuple) in ["('::1', 80, 0, 0)", "('0:0:0:0:0:0:0:1', 80, 0, 0)"])
- self.failUnless(repr(ipv6_address_tuple) in ["('::1', 80, 0, 0)", "('0:0:0:0:0:0:0:1', 80, 0, 0)"])
+ self.assertIn(str(ipv6_address_tuple), ["('::1', 80, 0, 0)", "('0:0:0:0:0:0:0:1', 80, 0, 0)"])
+ self.assertIn(repr(ipv6_address_tuple), ["('::1', 80, 0, 0)", "('0:0:0:0:0:0:0:1', 80, 0, 0)"])
def testNonIntPort(self):
hostname = "localhost"
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -1441,3 +1441,28 @@
"""
stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
return stderr
+
+def retry(exceptions, tries=6, delay=3, backoff=1.2):
+ # modified from https://wiki.python.org/moin/PythonDecoratorLibrary#Retry
+ def deco_retry(f):
+
+ def wrapper(*args, **kwds):
+ mtries, mdelay = tries, delay
+ while mtries > 1:
+ try:
+ return f(*args, **kwds)
+ except exceptions as e:
+ print "Got %s, retrying in %.2f seconds..." % (str(e), mdelay)
+ # FIXME resource cleanup continues to be an issue
+ # in terms of tests we use from CPython. This only
+ # represents a bandaid - useful as it might be -
+ # and it should be revisited.
+ gc_collect()
+ time.sleep(mdelay)
+ mtries -= 1
+ mdelay *= backoff
+ return f(*args, **kwds)
+
+ return wrapper
+
+ return deco_retry
--
Repository URL: https://hg.python.org/jython
More information about the Jython-checkins
mailing list