[Jython-checkins] jython: Change socket.connect_ex to report intermediate connection states. Fixes #2428.
jim.baker
jython-checkins at python.org
Sat Nov 14 09:17:55 EST 2015
https://hg.python.org/jython/rev/962002f7d96b
changeset: 7809:962002f7d96b
user: Jim Baker <jim.baker at rackspace.com>
date: Sat Nov 14 07:17:46 2015 -0700
summary:
Change socket.connect_ex to report intermediate connection states. Fixes #2428.
A major usage of socket.connect_ex is to poll the state of a
connection in progress. The proper reported sequence for a successful
connection is zero or one EINPROGRESS, zero or more EALREADY, followed
by EISCONN. In addition, extant code assumes that the poll will take a
nonnegible amount of time (~ 1-2 ms) or will fail in exhausting a busy
loop. With this fix, Jython gets CPython-compliant semantics.
files:
Lib/_socket.py | 58 +++++++++++------
Lib/test/test_socket_jy.py | 85 ++++++++++++++++++++++++++
2 files changed, 121 insertions(+), 22 deletions(-)
diff --git a/Lib/_socket.py b/Lib/_socket.py
--- a/Lib/_socket.py
+++ b/Lib/_socket.py
@@ -28,7 +28,8 @@
from java.util import NoSuchElementException
from java.util.concurrent import (
ArrayBlockingQueue, CopyOnWriteArrayList, CountDownLatch, LinkedBlockingQueue,
- RejectedExecutionException, ThreadFactory, TimeUnit)
+ ExecutionException, RejectedExecutionException, ThreadFactory,
+ TimeoutException, TimeUnit)
from java.util.concurrent.atomic import AtomicBoolean, AtomicLong
from javax.net.ssl import SSLPeerUnverifiedException, SSLException
@@ -852,7 +853,7 @@
if self.bind_addr:
log.debug("Connect %s to %s", self.bind_addr, addr, extra={"sock": self})
- bind_future = bootstrap.bind(self.bind_addr)
+ bind_future = bootstrap.bind(self.bind_addr).sync()
self._handle_channel_future(bind_future, "local bind")
self.channel = bind_future.channel()
else:
@@ -888,16 +889,39 @@
log.debug("Completed connection to %s", addr, extra={"sock": self})
def connect_ex(self, addr):
+ was_connecting = self.connected # actually means self.connecting if
+ # not blocking
if not self.connected:
try:
self.connect(addr)
except error as e:
return e.errno
if not self.connect_future.isDone():
- return errno.EINPROGRESS
+ if was_connecting:
+ try:
+ # Timing is based on CPython and was empirically
+ # guestimated. Of course this means user code is
+ # polling, so the the best we can do is wait like
+ # this in supposedly nonblocking mode without
+ # completely busy waiting!
+ self.connect_future.get(1500, TimeUnit.MICROSECONDS)
+ except ExecutionException:
+ # generally raised if closed; pick up the state
+ # when testing for success
+ pass
+ except TimeoutException:
+ # more than 1.5ms, will report EALREADY below
+ pass
+
+ if not self.connect_future.isDone():
+ if was_connecting:
+ return errno.EALREADY
+ else:
+ return errno.EINPROGRESS
elif self.connect_future.isSuccess():
return errno.EISCONN
else:
+ print self.connect_future.cause()
return errno.ENOTCONN
# SERVER METHODS
@@ -1241,27 +1265,17 @@
raise error(errno.ENOTCONN, "Socket is not connected")
else:
return _socktuple(self.bind_addr)
- # Netty 4 currently races between bind to ephemeral port and the availability
- # of the local address for the channel. Poll to work around this issue.
- while True:
- local_addr = self.channel.localAddress()
- if local_addr:
- if hasattr(self, "bind_future"):
- if self.bind_future.isDone():
- break
- else:
- break
- if time.time() - self.bind_timestamp > 1:
- # Presumably after a second something is completely wrong,
- # so punt
- raise error(errno.ENOTCONN, "Socket is not connected")
- log.debug("Poll for local address", extra={"sock": self})
- time.sleep(0.01) # completely arbitrary
+ if hasattr(self, "bind_future"):
+ self.bind_future.sync()
+ local_addr = self.channel.localAddress()
if local_addr.getAddress().isAnyLocalAddress():
- # Netty 4 will default to an IPv6 "any" address from a channel even if it was originally bound to an IPv4 "any" address
- # so, as a workaround, let's construct a new "any" address using the port information gathered above
+ # Netty 4 will default to an IPv6 "any" address from a
+ # channel even if it was originally bound to an IPv4 "any"
+ # address so, as a workaround, let's construct a new "any"
+ # address using the port information gathered above
if type(self.bind_addr.getAddress()) != type(local_addr.getAddress()):
- return _socktuple(java.net.InetSocketAddress(self.bind_addr.getAddress(), local_addr.getPort()))
+ return _socktuple(java.net.InetSocketAddress(
+ self.bind_addr.getAddress(), local_addr.getPort()))
return _socktuple(local_addr)
def getpeername(self):
diff --git a/Lib/test/test_socket_jy.py b/Lib/test/test_socket_jy.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_socket_jy.py
@@ -0,0 +1,85 @@
+import errno
+import socket
+import threading
+import unittest
+from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
+from SocketServer import ThreadingMixIn
+from test import test_support
+
+
+def start_server():
+ server_address = ('127.0.0.1', 0)
+
+ class DaemonThreadingMixIn(ThreadingMixIn):
+ daemon_threads = True
+
+ class ThreadedHTTPServer(DaemonThreadingMixIn, HTTPServer):
+ """Handle requests in a separate thread."""
+
+ # not actually going to do anything with this server, so a
+ # do-nothing handler is reasonable
+ httpd = ThreadedHTTPServer(server_address, BaseHTTPRequestHandler)
+ server_thread = threading.Thread(target=httpd.serve_forever)
+ server_thread.daemon = True
+ server_thread.start()
+ return httpd, server_thread
+
+
+class SocketConnectTest(unittest.TestCase):
+
+ def setUp(self):
+ self.httpd, self.server_thread = start_server()
+ self.address = self.httpd.server_name, self.httpd.server_port
+
+ def tearDown(self):
+ self.httpd.shutdown()
+ self.server_thread.join()
+
+ def do_nonblocking_connection(self, results, index):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setblocking(0)
+ connect_errno = 0
+ connect_attempt = 0
+
+ while connect_errno != errno.EISCONN and connect_attempt < 100:
+ connect_attempt += 1
+ connect_errno = sock.connect_ex(self.address)
+ results[index].append(connect_errno)
+ sock.close()
+
+ def do_workout(self, num_threads=10):
+ connect_results = []
+ connect_threads = []
+ for i in xrange(num_threads):
+ connect_results.append([])
+ connect_threads.append(threading.Thread(
+ target=self.do_nonblocking_connection,
+ name="socket-workout-%s" % i,
+ args=(connect_results, i)))
+
+ for thread in connect_threads:
+ thread.start()
+ for thread in connect_threads:
+ thread.join()
+ return connect_results
+
+ def test_connect_ex_workout(self):
+ """Verify connect_ex states go through EINPROGRESS?, EALREADY*, EISCONN"""
+ # Tests fix for http://bugs.jython.org/issue2428; based in part on the
+ # code showing failure that was submitted with that bug
+
+ #self.httpd, self.server_thread = start_server()
+ #self.address = self.httpd.server_name, self.httpd.server_port
+ for result in self.do_workout():
+ self.assertIn(result[0], {errno.EINPROGRESS, errno.EISCONN})
+ self.assertEqual(result[-1], errno.EISCONN)
+ for code in result[1:-1]:
+ self.assertEqual(code, errno.EALREADY)
+
+
+def test_main():
+ test_support.run_unittest(SocketConnectTest)
+
+
+if __name__ == "__main__":
+ test_main()
--
Repository URL: https://hg.python.org/jython
More information about the Jython-checkins
mailing list