[Jython-checkins] jython: Fix logic on when to shutdown Netty child workers.

jim.baker jython-checkins at python.org
Tue Sep 23 00:13:16 CEST 2014


https://hg.python.org/jython/rev/a33353c0ac25
changeset:   7389:a33353c0ac25
user:        Jim Baker <jim.baker at rackspace.com>
date:        Mon Sep 22 16:13:01 2014 -0600
summary:
  Fix logic on when to shutdown Netty child workers.

Do not shutdown Netty child worker group thread pool for listening
server socket unless the server socket is also closed. Note that it is
perfectly possible for an accepted child socket to have a lifetime
past that of the listening socket (especially perhaps in tests), but
the socket code was not also checking if the server socket itself was
still open. This meant that subsequent incoming child connections
would failed with a RejectedExecutionException error.

Other socket fixes include no longer uses the root handler for socket
debug logging, and allows for any object that is an Iterable for
select lists.

files:
  Lib/_socket.py          |  18 +++++++++++-------
  Lib/test/test_socket.py |  10 ++++++++++
  2 files changed, 21 insertions(+), 7 deletions(-)


diff --git a/Lib/_socket.py b/Lib/_socket.py
--- a/Lib/_socket.py
+++ b/Lib/_socket.py
@@ -9,7 +9,7 @@
 import sys
 import time
 import _google_ipaddr_r234
-from collections import namedtuple, Sequence
+from collections import namedtuple, Iterable
 from contextlib import contextmanager
 from functools import partial, wraps
 from itertools import chain
@@ -54,7 +54,10 @@
 
 def _debug():
     FORMAT = '%(asctime)-15s %(threadName)s %(levelname)s %(funcName)s %(message)s %(sock)s'
-    logging.basicConfig(format=FORMAT, level=logging.DEBUG)
+    debug_sh = logging.StreamHandler()
+    debug_sh.setFormatter(logging.Formatter(FORMAT))
+    log.addHandler(debug_sh)
+    log.setLevel(level=logging.DEBUG)
 
 # _debug()  # UNCOMMENT to get logging of socket activity
 
@@ -575,6 +578,8 @@
 
     def initChannel(self, child_channel):
         child = ChildSocket(self.parent_socket)
+        log.debug("Initializing child %s", extra={"sock": self.parent_socket})
+
         child.proto = IPPROTO_TCP
         child._init_client_mode(child_channel)
 
@@ -1366,8 +1371,6 @@
         self._ensure_post_connect()
         return super(ChildSocket, self).setblocking(mode)
 
-    # FIXME FIXME FIXME other ops.
-
     def close(self):
         self._ensure_post_connect()
         super(ChildSocket, self).close()
@@ -1376,8 +1379,9 @@
         if self.accepted:
             with self.parent_socket.open_lock:
                 self.parent_socket.accepted_children -= 1
-                if self.parent_socket.accepted_children == 0:
-                    log.debug("Shutting down child group for parent socket=%s", self.parent_socket, extra={"sock": self})
+                if self.parent_socket.open_count == 0 and self.parent_socket.accepted_children == 0:
+                    log.debug("Shutting down child group for parent socket=%s accepted_children=%s",
+                              self.parent_socket, self.parent_socket.accepted_children, extra={"sock": self})
                     self.parent_socket.child_group.shutdownGracefully(0, 100, TimeUnit.MILLISECONDS)
 
     def shutdown(self, how):
@@ -1402,7 +1406,7 @@
 
 def select(rlist, wlist, xlist, timeout=None):
     for lst in (rlist, wlist, xlist):
-        if not isinstance(lst, Sequence):
+        if not isinstance(lst, Iterable):
             raise TypeError("arguments 1-3 must be sequences")
     if not(timeout is None or isinstance(timeout, Number)):
         raise TypeError("timeout must be a float or None")
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -4,6 +4,7 @@
 from test import test_support
 
 import errno
+import gc
 import jarray
 import Queue
 import platform
@@ -128,7 +129,16 @@
         self.client_ready.wait()
 
     def _assert_no_pending_threads(self, group, msg):
+        # Ensure __del__ finalizers are called on sockets. Two things to note:
+        # 1. It takes two collections for finalization to run.
+        # 2. gc.collect() is only advisory to the JVM, never mandatory. Still 
+        #    it usually seems to happen under light load.
+        gc.collect()
+        time.sleep(0.1)
+        gc.collect()
+
         # Wait up to one second for there not to be pending threads
+
         for i in xrange(10):
             pending_threads = _check_threadpool_for_pending_threads(group)
             if len(pending_threads) == 0:

-- 
Repository URL: https://hg.python.org/jython


More information about the Jython-checkins mailing list