[Jython-checkins] jython: Change socket.connect_ex to report intermediate connection states. Fixes #2428.

jim.baker jython-checkins at python.org
Sat Nov 14 09:17:55 EST 2015


https://hg.python.org/jython/rev/962002f7d96b
changeset:   7809:962002f7d96b
user:        Jim Baker <jim.baker at rackspace.com>
date:        Sat Nov 14 07:17:46 2015 -0700
summary:
  Change socket.connect_ex to report intermediate connection states. Fixes #2428.

A major usage of socket.connect_ex is to poll the state of a
connection in progress. The proper reported sequence for a successful
connection is zero or one EINPROGRESS, zero or more EALREADY, followed
by EISCONN. In addition, extant code assumes that the poll will take a
nonnegible amount of time (~ 1-2 ms) or will fail in exhausting a busy
loop. With this fix, Jython gets CPython-compliant semantics.

files:
  Lib/_socket.py             |  58 +++++++++++------
  Lib/test/test_socket_jy.py |  85 ++++++++++++++++++++++++++
  2 files changed, 121 insertions(+), 22 deletions(-)


diff --git a/Lib/_socket.py b/Lib/_socket.py
--- a/Lib/_socket.py
+++ b/Lib/_socket.py
@@ -28,7 +28,8 @@
 from java.util import NoSuchElementException
 from java.util.concurrent import (
     ArrayBlockingQueue, CopyOnWriteArrayList, CountDownLatch, LinkedBlockingQueue,
-    RejectedExecutionException, ThreadFactory, TimeUnit)
+    ExecutionException, RejectedExecutionException, ThreadFactory,
+    TimeoutException, TimeUnit)
 from java.util.concurrent.atomic import AtomicBoolean, AtomicLong
 from javax.net.ssl import SSLPeerUnverifiedException, SSLException
 
@@ -852,7 +853,7 @@
 
         if self.bind_addr:
             log.debug("Connect %s to %s", self.bind_addr, addr, extra={"sock": self})
-            bind_future = bootstrap.bind(self.bind_addr)
+            bind_future = bootstrap.bind(self.bind_addr).sync()
             self._handle_channel_future(bind_future, "local bind")
             self.channel = bind_future.channel()
         else:
@@ -888,16 +889,39 @@
             log.debug("Completed connection to %s", addr, extra={"sock": self})
 
     def connect_ex(self, addr):
+        was_connecting = self.connected  # actually means self.connecting if
+                                         # not blocking
         if not self.connected:
             try:
                 self.connect(addr)
             except error as e:
                 return e.errno
         if not self.connect_future.isDone():
-            return errno.EINPROGRESS
+            if was_connecting:
+                try:
+                    # Timing is based on CPython and was empirically
+                    # guestimated. Of course this means user code is
+                    # polling, so the the best we can do is wait like
+                    # this in supposedly nonblocking mode without
+                    # completely busy waiting!
+                    self.connect_future.get(1500, TimeUnit.MICROSECONDS)
+                except ExecutionException:
+                    # generally raised if closed; pick up the state
+                    # when testing for success
+                    pass
+                except TimeoutException:
+                    # more than 1.5ms, will report EALREADY below
+                    pass
+
+        if not self.connect_future.isDone():
+            if was_connecting:
+                return errno.EALREADY
+            else:
+                return errno.EINPROGRESS
         elif self.connect_future.isSuccess():
             return errno.EISCONN
         else:
+            print self.connect_future.cause()
             return errno.ENOTCONN
 
     # SERVER METHODS
@@ -1241,27 +1265,17 @@
                 raise error(errno.ENOTCONN, "Socket is not connected")
             else:
                 return _socktuple(self.bind_addr)
-        # Netty 4 currently races between bind to ephemeral port and the availability
-        # of the local address for the channel. Poll to work around this issue.
-        while True:
-            local_addr = self.channel.localAddress()
-            if local_addr:
-                if hasattr(self, "bind_future"):
-                    if self.bind_future.isDone():
-                        break
-                else:
-                    break
-            if time.time() - self.bind_timestamp > 1:
-                # Presumably after a second something is completely wrong,
-                # so punt
-                raise error(errno.ENOTCONN, "Socket is not connected")
-            log.debug("Poll for local address", extra={"sock": self})
-            time.sleep(0.01)  # completely arbitrary
+        if hasattr(self, "bind_future"):
+            self.bind_future.sync()
+        local_addr = self.channel.localAddress()
         if local_addr.getAddress().isAnyLocalAddress():
-            # Netty 4 will default to an IPv6 "any" address from a channel even if it was originally bound to an IPv4 "any" address
-            # so, as a workaround, let's construct a new "any" address using the port information gathered above
+            # Netty 4 will default to an IPv6 "any" address from a
+            # channel even if it was originally bound to an IPv4 "any"
+            # address so, as a workaround, let's construct a new "any"
+            # address using the port information gathered above
             if type(self.bind_addr.getAddress()) != type(local_addr.getAddress()):
-                return _socktuple(java.net.InetSocketAddress(self.bind_addr.getAddress(), local_addr.getPort()))
+                return _socktuple(java.net.InetSocketAddress(
+                    self.bind_addr.getAddress(), local_addr.getPort()))
         return _socktuple(local_addr)
 
     def getpeername(self):
diff --git a/Lib/test/test_socket_jy.py b/Lib/test/test_socket_jy.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_socket_jy.py
@@ -0,0 +1,85 @@
+import errno
+import socket
+import threading
+import unittest
+from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
+from SocketServer import ThreadingMixIn
+from test import test_support
+
+
+def start_server():
+    server_address = ('127.0.0.1', 0)
+
+    class DaemonThreadingMixIn(ThreadingMixIn):
+        daemon_threads = True
+
+    class ThreadedHTTPServer(DaemonThreadingMixIn, HTTPServer):
+        """Handle requests in a separate thread."""
+
+    # not actually going to do anything with this server, so a
+    # do-nothing handler is reasonable
+    httpd = ThreadedHTTPServer(server_address, BaseHTTPRequestHandler)
+    server_thread = threading.Thread(target=httpd.serve_forever)
+    server_thread.daemon = True
+    server_thread.start()
+    return httpd, server_thread
+
+
+class SocketConnectTest(unittest.TestCase):
+
+    def setUp(self):
+        self.httpd, self.server_thread = start_server()
+        self.address = self.httpd.server_name, self.httpd.server_port
+
+    def tearDown(self):
+        self.httpd.shutdown()
+        self.server_thread.join()
+
+    def do_nonblocking_connection(self, results, index):
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.setblocking(0)
+        connect_errno = 0
+        connect_attempt = 0
+
+        while connect_errno != errno.EISCONN and connect_attempt < 100:
+            connect_attempt += 1
+            connect_errno = sock.connect_ex(self.address)
+            results[index].append(connect_errno)
+        sock.close()
+
+    def do_workout(self, num_threads=10):
+        connect_results = []
+        connect_threads = []
+        for i in xrange(num_threads):
+            connect_results.append([])
+            connect_threads.append(threading.Thread(
+                target=self.do_nonblocking_connection,
+                name="socket-workout-%s" % i,
+                args=(connect_results, i)))
+                           
+        for thread in connect_threads:
+            thread.start()
+        for thread in connect_threads:
+            thread.join()
+        return connect_results
+
+    def test_connect_ex_workout(self):
+        """Verify connect_ex states go through EINPROGRESS?, EALREADY*, EISCONN"""
+        # Tests fix for http://bugs.jython.org/issue2428; based in part on the
+        # code showing failure that was submitted with that bug
+
+        #self.httpd, self.server_thread = start_server()
+        #self.address = self.httpd.server_name, self.httpd.server_port
+        for result in self.do_workout():
+            self.assertIn(result[0], {errno.EINPROGRESS, errno.EISCONN})
+            self.assertEqual(result[-1], errno.EISCONN)
+            for code in result[1:-1]:
+                self.assertEqual(code, errno.EALREADY)
+
+
+def test_main():
+    test_support.run_unittest(SocketConnectTest)
+
+
+if __name__ == "__main__":
+    test_main()

-- 
Repository URL: https://hg.python.org/jython


More information about the Jython-checkins mailing list