[Jython-checkins] jython: Upgrading SocketServer to 2.7. Step 2: Add in the cpython 2.7.2 modules.

alan.kennedy jython-checkins at python.org
Sun Apr 1 17:22:16 CEST 2012


http://hg.python.org/jython/rev/7690b912322c
changeset:   6516:7690b912322c
user:        Alan Kennedy <alan at xhaus.com>
date:        Sun Apr 01 13:44:21 2012 +0100
summary:
  Upgrading SocketServer to 2.7. Step 2: Add in the cpython 2.7.2 modules.

files:
  Lib/SocketServer.py           |  716 ++++++++++++++++++++++
  Lib/test/test_socketserver.py |  284 ++++++++
  2 files changed, 1000 insertions(+), 0 deletions(-)


diff --git a/Lib/SocketServer.py b/Lib/SocketServer.py
new file mode 100644
--- /dev/null
+++ b/Lib/SocketServer.py
@@ -0,0 +1,716 @@
+"""Generic socket server classes.
+
+This module tries to capture the various aspects of defining a server:
+
+For socket-based servers:
+
+- address family:
+        - AF_INET{,6}: IP (Internet Protocol) sockets (default)
+        - AF_UNIX: Unix domain sockets
+        - others, e.g. AF_DECNET are conceivable (see <socket.h>
+- socket type:
+        - SOCK_STREAM (reliable stream, e.g. TCP)
+        - SOCK_DGRAM (datagrams, e.g. UDP)
+
+For request-based servers (including socket-based):
+
+- client address verification before further looking at the request
+        (This is actually a hook for any processing that needs to look
+         at the request before anything else, e.g. logging)
+- how to handle multiple requests:
+        - synchronous (one request is handled at a time)
+        - forking (each request is handled by a new process)
+        - threading (each request is handled by a new thread)
+
+The classes in this module favor the server type that is simplest to
+write: a synchronous TCP/IP server.  This is bad class design, but
+save some typing.  (There's also the issue that a deep class hierarchy
+slows down method lookups.)
+
+There are five classes in an inheritance diagram, four of which represent
+synchronous servers of four types:
+
+        +------------+
+        | BaseServer |
+        +------------+
+              |
+              v
+        +-----------+        +------------------+
+        | TCPServer |------->| UnixStreamServer |
+        +-----------+        +------------------+
+              |
+              v
+        +-----------+        +--------------------+
+        | UDPServer |------->| UnixDatagramServer |
+        +-----------+        +--------------------+
+
+Note that UnixDatagramServer derives from UDPServer, not from
+UnixStreamServer -- the only difference between an IP and a Unix
+stream server is the address family, which is simply repeated in both
+unix server classes.
+
+Forking and threading versions of each type of server can be created
+using the ForkingMixIn and ThreadingMixIn mix-in classes.  For
+instance, a threading UDP server class is created as follows:
+
+        class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
+
+The Mix-in class must come first, since it overrides a method defined
+in UDPServer! Setting the various member variables also changes
+the behavior of the underlying server mechanism.
+
+To implement a service, you must derive a class from
+BaseRequestHandler and redefine its handle() method.  You can then run
+various versions of the service by combining one of the server classes
+with your request handler class.
+
+The request handler class must be different for datagram or stream
+services.  This can be hidden by using the request handler
+subclasses StreamRequestHandler or DatagramRequestHandler.
+
+Of course, you still have to use your head!
+
+For instance, it makes no sense to use a forking server if the service
+contains state in memory that can be modified by requests (since the
+modifications in the child process would never reach the initial state
+kept in the parent process and passed to each child).  In this case,
+you can use a threading server, but you will probably have to use
+locks to avoid two requests that come in nearly simultaneous to apply
+conflicting changes to the server state.
+
+On the other hand, if you are building e.g. an HTTP server, where all
+data is stored externally (e.g. in the file system), a synchronous
+class will essentially render the service "deaf" while one request is
+being handled -- which may be for a very long time if a client is slow
+to reqd all the data it has requested.  Here a threading or forking
+server is appropriate.
+
+In some cases, it may be appropriate to process part of a request
+synchronously, but to finish processing in a forked child depending on
+the request data.  This can be implemented by using a synchronous
+server and doing an explicit fork in the request handler class
+handle() method.
+
+Another approach to handling multiple simultaneous requests in an
+environment that supports neither threads nor fork (or where these are
+too expensive or inappropriate for the service) is to maintain an
+explicit table of partially finished requests and to use select() to
+decide which request to work on next (or whether to handle a new
+incoming request).  This is particularly important for stream services
+where each client can potentially be connected for a long time (if
+threads or subprocesses cannot be used).
+
+Future work:
+- Standard classes for Sun RPC (which uses either UDP or TCP)
+- Standard mix-in classes to implement various authentication
+  and encryption schemes
+- Standard framework for select-based multiplexing
+
+XXX Open problems:
+- What to do with out-of-band data?
+
+BaseServer:
+- split generic "request" functionality out into BaseServer class.
+  Copyright (C) 2000  Luke Kenneth Casson Leighton <lkcl at samba.org>
+
+  example: read entries from a SQL database (requires overriding
+  get_request() to return a table entry from the database).
+  entry is processed by a RequestHandlerClass.
+
+"""
+
+# Author of the BaseServer patch: Luke Kenneth Casson Leighton
+
+# XXX Warning!
+# There is a test suite for this module, but it cannot be run by the
+# standard regression test.
+# To run it manually, run Lib/test/test_socketserver.py.
+
+__version__ = "0.4"
+
+
+import socket
+import select
+import sys
+import os
+try:
+    import threading
+except ImportError:
+    import dummy_threading as threading
+
+__all__ = ["TCPServer","UDPServer","ForkingUDPServer","ForkingTCPServer",
+           "ThreadingUDPServer","ThreadingTCPServer","BaseRequestHandler",
+           "StreamRequestHandler","DatagramRequestHandler",
+           "ThreadingMixIn", "ForkingMixIn"]
+if hasattr(socket, "AF_UNIX"):
+    __all__.extend(["UnixStreamServer","UnixDatagramServer",
+                    "ThreadingUnixStreamServer",
+                    "ThreadingUnixDatagramServer"])
+
+class BaseServer:
+
+    """Base class for server classes.
+
+    Methods for the caller:
+
+    - __init__(server_address, RequestHandlerClass)
+    - serve_forever(poll_interval=0.5)
+    - shutdown()
+    - handle_request()  # if you do not use serve_forever()
+    - fileno() -> int   # for select()
+
+    Methods that may be overridden:
+
+    - server_bind()
+    - server_activate()
+    - get_request() -> request, client_address
+    - handle_timeout()
+    - verify_request(request, client_address)
+    - server_close()
+    - process_request(request, client_address)
+    - shutdown_request(request)
+    - close_request(request)
+    - handle_error()
+
+    Methods for derived classes:
+
+    - finish_request(request, client_address)
+
+    Class variables that may be overridden by derived classes or
+    instances:
+
+    - timeout
+    - address_family
+    - socket_type
+    - allow_reuse_address
+
+    Instance variables:
+
+    - RequestHandlerClass
+    - socket
+
+    """
+
+    timeout = None
+
+    def __init__(self, server_address, RequestHandlerClass):
+        """Constructor.  May be extended, do not override."""
+        self.server_address = server_address
+        self.RequestHandlerClass = RequestHandlerClass
+        self.__is_shut_down = threading.Event()
+        self.__shutdown_request = False
+
+    def server_activate(self):
+        """Called by constructor to activate the server.
+
+        May be overridden.
+
+        """
+        pass
+
+    def serve_forever(self, poll_interval=0.5):
+        """Handle one request at a time until shutdown.
+
+        Polls for shutdown every poll_interval seconds. Ignores
+        self.timeout. If you need to do periodic tasks, do them in
+        another thread.
+        """
+        self.__is_shut_down.clear()
+        try:
+            while not self.__shutdown_request:
+                # XXX: Consider using another file descriptor or
+                # connecting to the socket to wake this up instead of
+                # polling. Polling reduces our responsiveness to a
+                # shutdown request and wastes cpu at all other times.
+                r, w, e = select.select([self], [], [], poll_interval)
+                if self in r:
+                    self._handle_request_noblock()
+        finally:
+            self.__shutdown_request = False
+            self.__is_shut_down.set()
+
+    def shutdown(self):
+        """Stops the serve_forever loop.
+
+        Blocks until the loop has finished. This must be called while
+        serve_forever() is running in another thread, or it will
+        deadlock.
+        """
+        self.__shutdown_request = True
+        self.__is_shut_down.wait()
+
+    # The distinction between handling, getting, processing and
+    # finishing a request is fairly arbitrary.  Remember:
+    #
+    # - handle_request() is the top-level call.  It calls
+    #   select, get_request(), verify_request() and process_request()
+    # - get_request() is different for stream or datagram sockets
+    # - process_request() is the place that may fork a new process
+    #   or create a new thread to finish the request
+    # - finish_request() instantiates the request handler class;
+    #   this constructor will handle the request all by itself
+
+    def handle_request(self):
+        """Handle one request, possibly blocking.
+
+        Respects self.timeout.
+        """
+        # Support people who used socket.settimeout() to escape
+        # handle_request before self.timeout was available.
+        timeout = self.socket.gettimeout()
+        if timeout is None:
+            timeout = self.timeout
+        elif self.timeout is not None:
+            timeout = min(timeout, self.timeout)
+        fd_sets = select.select([self], [], [], timeout)
+        if not fd_sets[0]:
+            self.handle_timeout()
+            return
+        self._handle_request_noblock()
+
+    def _handle_request_noblock(self):
+        """Handle one request, without blocking.
+
+        I assume that select.select has returned that the socket is
+        readable before this function was called, so there should be
+        no risk of blocking in get_request().
+        """
+        try:
+            request, client_address = self.get_request()
+        except socket.error:
+            return
+        if self.verify_request(request, client_address):
+            try:
+                self.process_request(request, client_address)
+            except:
+                self.handle_error(request, client_address)
+                self.shutdown_request(request)
+
+    def handle_timeout(self):
+        """Called if no new request arrives within self.timeout.
+
+        Overridden by ForkingMixIn.
+        """
+        pass
+
+    def verify_request(self, request, client_address):
+        """Verify the request.  May be overridden.
+
+        Return True if we should proceed with this request.
+
+        """
+        return True
+
+    def process_request(self, request, client_address):
+        """Call finish_request.
+
+        Overridden by ForkingMixIn and ThreadingMixIn.
+
+        """
+        self.finish_request(request, client_address)
+        self.shutdown_request(request)
+
+    def server_close(self):
+        """Called to clean-up the server.
+
+        May be overridden.
+
+        """
+        pass
+
+    def finish_request(self, request, client_address):
+        """Finish one request by instantiating RequestHandlerClass."""
+        self.RequestHandlerClass(request, client_address, self)
+
+    def shutdown_request(self, request):
+        """Called to shutdown and close an individual request."""
+        self.close_request(request)
+
+    def close_request(self, request):
+        """Called to clean up an individual request."""
+        pass
+
+    def handle_error(self, request, client_address):
+        """Handle an error gracefully.  May be overridden.
+
+        The default is to print a traceback and continue.
+
+        """
+        print '-'*40
+        print 'Exception happened during processing of request from',
+        print client_address
+        import traceback
+        traceback.print_exc() # XXX But this goes to stderr!
+        print '-'*40
+
+
+class TCPServer(BaseServer):
+
+    """Base class for various socket-based server classes.
+
+    Defaults to synchronous IP stream (i.e., TCP).
+
+    Methods for the caller:
+
+    - __init__(server_address, RequestHandlerClass, bind_and_activate=True)
+    - serve_forever(poll_interval=0.5)
+    - shutdown()
+    - handle_request()  # if you don't use serve_forever()
+    - fileno() -> int   # for select()
+
+    Methods that may be overridden:
+
+    - server_bind()
+    - server_activate()
+    - get_request() -> request, client_address
+    - handle_timeout()
+    - verify_request(request, client_address)
+    - process_request(request, client_address)
+    - shutdown_request(request)
+    - close_request(request)
+    - handle_error()
+
+    Methods for derived classes:
+
+    - finish_request(request, client_address)
+
+    Class variables that may be overridden by derived classes or
+    instances:
+
+    - timeout
+    - address_family
+    - socket_type
+    - request_queue_size (only for stream sockets)
+    - allow_reuse_address
+
+    Instance variables:
+
+    - server_address
+    - RequestHandlerClass
+    - socket
+
+    """
+
+    address_family = socket.AF_INET
+
+    socket_type = socket.SOCK_STREAM
+
+    request_queue_size = 5
+
+    allow_reuse_address = False
+
+    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True):
+        """Constructor.  May be extended, do not override."""
+        BaseServer.__init__(self, server_address, RequestHandlerClass)
+        self.socket = socket.socket(self.address_family,
+                                    self.socket_type)
+        if bind_and_activate:
+            self.server_bind()
+            self.server_activate()
+
+    def server_bind(self):
+        """Called by constructor to bind the socket.
+
+        May be overridden.
+
+        """
+        if self.allow_reuse_address:
+            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        self.socket.bind(self.server_address)
+        self.server_address = self.socket.getsockname()
+
+    def server_activate(self):
+        """Called by constructor to activate the server.
+
+        May be overridden.
+
+        """
+        self.socket.listen(self.request_queue_size)
+
+    def server_close(self):
+        """Called to clean-up the server.
+
+        May be overridden.
+
+        """
+        self.socket.close()
+
+    def fileno(self):
+        """Return socket file number.
+
+        Interface required by select().
+
+        """
+        return self.socket.fileno()
+
+    def get_request(self):
+        """Get the request and client address from the socket.
+
+        May be overridden.
+
+        """
+        return self.socket.accept()
+
+    def shutdown_request(self, request):
+        """Called to shutdown and close an individual request."""
+        try:
+            #explicitly shutdown.  socket.close() merely releases
+            #the socket and waits for GC to perform the actual close.
+            request.shutdown(socket.SHUT_WR)
+        except socket.error:
+            pass #some platforms may raise ENOTCONN here
+        self.close_request(request)
+
+    def close_request(self, request):
+        """Called to clean up an individual request."""
+        request.close()
+
+
+class UDPServer(TCPServer):
+
+    """UDP server class."""
+
+    allow_reuse_address = False
+
+    socket_type = socket.SOCK_DGRAM
+
+    max_packet_size = 8192
+
+    def get_request(self):
+        data, client_addr = self.socket.recvfrom(self.max_packet_size)
+        return (data, self.socket), client_addr
+
+    def server_activate(self):
+        # No need to call listen() for UDP.
+        pass
+
+    def shutdown_request(self, request):
+        # No need to shutdown anything.
+        self.close_request(request)
+
+    def close_request(self, request):
+        # No need to close anything.
+        pass
+
+class ForkingMixIn:
+
+    """Mix-in class to handle each request in a new process."""
+
+    timeout = 300
+    active_children = None
+    max_children = 40
+
+    def collect_children(self):
+        """Internal routine to wait for children that have exited."""
+        if self.active_children is None: return
+        while len(self.active_children) >= self.max_children:
+            # XXX: This will wait for any child process, not just ones
+            # spawned by this library. This could confuse other
+            # libraries that expect to be able to wait for their own
+            # children.
+            try:
+                pid, status = os.waitpid(0, 0)
+            except os.error:
+                pid = None
+            if pid not in self.active_children: continue
+            self.active_children.remove(pid)
+
+        # XXX: This loop runs more system calls than it ought
+        # to. There should be a way to put the active_children into a
+        # process group and then use os.waitpid(-pgid) to wait for any
+        # of that set, but I couldn't find a way to allocate pgids
+        # that couldn't collide.
+        for child in self.active_children:
+            try:
+                pid, status = os.waitpid(child, os.WNOHANG)
+            except os.error:
+                pid = None
+            if not pid: continue
+            try:
+                self.active_children.remove(pid)
+            except ValueError, e:
+                raise ValueError('%s. x=%d and list=%r' % (e.message, pid,
+                                                           self.active_children))
+
+    def handle_timeout(self):
+        """Wait for zombies after self.timeout seconds of inactivity.
+
+        May be extended, do not override.
+        """
+        self.collect_children()
+
+    def process_request(self, request, client_address):
+        """Fork a new subprocess to process the request."""
+        self.collect_children()
+        pid = os.fork()
+        if pid:
+            # Parent process
+            if self.active_children is None:
+                self.active_children = []
+            self.active_children.append(pid)
+            self.close_request(request) #close handle in parent process
+            return
+        else:
+            # Child process.
+            # This must never return, hence os._exit()!
+            try:
+                self.finish_request(request, client_address)
+                self.shutdown_request(request)
+                os._exit(0)
+            except:
+                try:
+                    self.handle_error(request, client_address)
+                    self.shutdown_request(request)
+                finally:
+                    os._exit(1)
+
+
+class ThreadingMixIn:
+    """Mix-in class to handle each request in a new thread."""
+
+    # Decides how threads will act upon termination of the
+    # main process
+    daemon_threads = False
+
+    def process_request_thread(self, request, client_address):
+        """Same as in BaseServer but as a thread.
+
+        In addition, exception handling is done here.
+
+        """
+        try:
+            self.finish_request(request, client_address)
+            self.shutdown_request(request)
+        except:
+            self.handle_error(request, client_address)
+            self.shutdown_request(request)
+
+    def process_request(self, request, client_address):
+        """Start a new thread to process the request."""
+        t = threading.Thread(target = self.process_request_thread,
+                             args = (request, client_address))
+        if self.daemon_threads:
+            t.setDaemon (1)
+        t.start()
+
+
+class ForkingUDPServer(ForkingMixIn, UDPServer): pass
+class ForkingTCPServer(ForkingMixIn, TCPServer): pass
+
+class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
+class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
+
+if hasattr(socket, 'AF_UNIX'):
+
+    class UnixStreamServer(TCPServer):
+        address_family = socket.AF_UNIX
+
+    class UnixDatagramServer(UDPServer):
+        address_family = socket.AF_UNIX
+
+    class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): pass
+
+    class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): pass
+
+class BaseRequestHandler:
+
+    """Base class for request handler classes.
+
+    This class is instantiated for each request to be handled.  The
+    constructor sets the instance variables request, client_address
+    and server, and then calls the handle() method.  To implement a
+    specific service, all you need to do is to derive a class which
+    defines a handle() method.
+
+    The handle() method can find the request as self.request, the
+    client address as self.client_address, and the server (in case it
+    needs access to per-server information) as self.server.  Since a
+    separate instance is created for each request, the handle() method
+    can define arbitrary other instance variariables.
+
+    """
+
+    def __init__(self, request, client_address, server):
+        self.request = request
+        self.client_address = client_address
+        self.server = server
+        self.setup()
+        try:
+            self.handle()
+        finally:
+            self.finish()
+
+    def setup(self):
+        pass
+
+    def handle(self):
+        pass
+
+    def finish(self):
+        pass
+
+
+# The following two classes make it possible to use the same service
+# class for stream or datagram servers.
+# Each class sets up these instance variables:
+# - rfile: a file object from which receives the request is read
+# - wfile: a file object to which the reply is written
+# When the handle() method returns, wfile is flushed properly
+
+
+class StreamRequestHandler(BaseRequestHandler):
+
+    """Define self.rfile and self.wfile for stream sockets."""
+
+    # Default buffer sizes for rfile, wfile.
+    # We default rfile to buffered because otherwise it could be
+    # really slow for large data (a getc() call per byte); we make
+    # wfile unbuffered because (a) often after a write() we want to
+    # read and we need to flush the line; (b) big writes to unbuffered
+    # files are typically optimized by stdio even when big reads
+    # aren't.
+    rbufsize = -1
+    wbufsize = 0
+
+    # A timeout to apply to the request socket, if not None.
+    timeout = None
+
+    # Disable nagle algorithm for this socket, if True.
+    # Use only when wbufsize != 0, to avoid small packets.
+    disable_nagle_algorithm = False
+
+    def setup(self):
+        self.connection = self.request
+        if self.timeout is not None:
+            self.connection.settimeout(self.timeout)
+        if self.disable_nagle_algorithm:
+            self.connection.setsockopt(socket.IPPROTO_TCP,
+                                       socket.TCP_NODELAY, True)
+        self.rfile = self.connection.makefile('rb', self.rbufsize)
+        self.wfile = self.connection.makefile('wb', self.wbufsize)
+
+    def finish(self):
+        if not self.wfile.closed:
+            self.wfile.flush()
+        self.wfile.close()
+        self.rfile.close()
+
+
+class DatagramRequestHandler(BaseRequestHandler):
+
+    # XXX Regrettably, I cannot get this working on Linux;
+    # s.recvfrom() doesn't return a meaningful client address.
+
+    """Define self.rfile and self.wfile for datagram sockets."""
+
+    def setup(self):
+        try:
+            from cStringIO import StringIO
+        except ImportError:
+            from StringIO import StringIO
+        self.packet, self.socket = self.request
+        self.rfile = StringIO(self.packet)
+        self.wfile = StringIO()
+
+    def finish(self):
+        self.socket.sendto(self.wfile.getvalue(), self.client_address)
diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_socketserver.py
@@ -0,0 +1,284 @@
+"""
+Test suite for SocketServer.py.
+"""
+
+import contextlib
+import imp
+import os
+import select
+import signal
+import socket
+import tempfile
+import unittest
+import SocketServer
+
+import test.test_support
+from test.test_support import reap_children, reap_threads, verbose
+try:
+    import threading
+except ImportError:
+    threading = None
+
+test.test_support.requires("network")
+
+TEST_STR = "hello world\n"
+HOST = test.test_support.HOST
+
+HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
+HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
+
+def signal_alarm(n):
+    """Call signal.alarm when it exists (i.e. not on Windows)."""
+    if hasattr(signal, 'alarm'):
+        signal.alarm(n)
+
+def receive(sock, n, timeout=20):
+    r, w, x = select.select([sock], [], [], timeout)
+    if sock in r:
+        return sock.recv(n)
+    else:
+        raise RuntimeError, "timed out on %r" % (sock,)
+
+if HAVE_UNIX_SOCKETS:
+    class ForkingUnixStreamServer(SocketServer.ForkingMixIn,
+                                  SocketServer.UnixStreamServer):
+        pass
+
+    class ForkingUnixDatagramServer(SocketServer.ForkingMixIn,
+                                    SocketServer.UnixDatagramServer):
+        pass
+
+
+ at contextlib.contextmanager
+def simple_subprocess(testcase):
+    pid = os.fork()
+    if pid == 0:
+        # Don't throw an exception; it would be caught by the test harness.
+        os._exit(72)
+    yield None
+    pid2, status = os.waitpid(pid, 0)
+    testcase.assertEqual(pid2, pid)
+    testcase.assertEqual(72 << 8, status)
+
+
+ at unittest.skipUnless(threading, 'Threading required for this test.')
+class SocketServerTest(unittest.TestCase):
+    """Test all socket servers."""
+
+    def setUp(self):
+        signal_alarm(20)  # Kill deadlocks after 20 seconds.
+        self.port_seed = 0
+        self.test_files = []
+
+    def tearDown(self):
+        signal_alarm(0)  # Didn't deadlock.
+        reap_children()
+
+        for fn in self.test_files:
+            try:
+                os.remove(fn)
+            except os.error:
+                pass
+        self.test_files[:] = []
+
+    def pickaddr(self, proto):
+        if proto == socket.AF_INET:
+            return (HOST, 0)
+        else:
+            # XXX: We need a way to tell AF_UNIX to pick its own name
+            # like AF_INET provides port==0.
+            dir = None
+            if os.name == 'os2':
+                dir = '\socket'
+            fn = tempfile.mktemp(prefix='unix_socket.', dir=dir)
+            if os.name == 'os2':
+                # AF_UNIX socket names on OS/2 require a specific prefix
+                # which can't include a drive letter and must also use
+                # backslashes as directory separators
+                if fn[1] == ':':
+                    fn = fn[2:]
+                if fn[0] in (os.sep, os.altsep):
+                    fn = fn[1:]
+                if os.sep == '/':
+                    fn = fn.replace(os.sep, os.altsep)
+                else:
+                    fn = fn.replace(os.altsep, os.sep)
+            self.test_files.append(fn)
+            return fn
+
+    def make_server(self, addr, svrcls, hdlrbase):
+        class MyServer(svrcls):
+            def handle_error(self, request, client_address):
+                self.close_request(request)
+                self.server_close()
+                raise
+
+        class MyHandler(hdlrbase):
+            def handle(self):
+                line = self.rfile.readline()
+                self.wfile.write(line)
+
+        if verbose: print "creating server"
+        server = MyServer(addr, MyHandler)
+        self.assertEqual(server.server_address, server.socket.getsockname())
+        return server
+
+    @unittest.skipUnless(threading, 'Threading required for this test.')
+    @reap_threads
+    def run_server(self, svrcls, hdlrbase, testfunc):
+        server = self.make_server(self.pickaddr(svrcls.address_family),
+                                  svrcls, hdlrbase)
+        # We had the OS pick a port, so pull the real address out of
+        # the server.
+        addr = server.server_address
+        if verbose:
+            print "server created"
+            print "ADDR =", addr
+            print "CLASS =", svrcls
+        t = threading.Thread(
+            name='%s serving' % svrcls,
+            target=server.serve_forever,
+            # Short poll interval to make the test finish quickly.
+            # Time between requests is short enough that we won't wake
+            # up spuriously too many times.
+            kwargs={'poll_interval':0.01})
+        t.daemon = True  # In case this function raises.
+        t.start()
+        if verbose: print "server running"
+        for i in range(3):
+            if verbose: print "test client", i
+            testfunc(svrcls.address_family, addr)
+        if verbose: print "waiting for server"
+        server.shutdown()
+        t.join()
+        if verbose: print "done"
+
+    def stream_examine(self, proto, addr):
+        s = socket.socket(proto, socket.SOCK_STREAM)
+        s.connect(addr)
+        s.sendall(TEST_STR)
+        buf = data = receive(s, 100)
+        while data and '\n' not in buf:
+            data = receive(s, 100)
+            buf += data
+        self.assertEqual(buf, TEST_STR)
+        s.close()
+
+    def dgram_examine(self, proto, addr):
+        s = socket.socket(proto, socket.SOCK_DGRAM)
+        s.sendto(TEST_STR, addr)
+        buf = data = receive(s, 100)
+        while data and '\n' not in buf:
+            data = receive(s, 100)
+            buf += data
+        self.assertEqual(buf, TEST_STR)
+        s.close()
+
+    def test_TCPServer(self):
+        self.run_server(SocketServer.TCPServer,
+                        SocketServer.StreamRequestHandler,
+                        self.stream_examine)
+
+    def test_ThreadingTCPServer(self):
+        self.run_server(SocketServer.ThreadingTCPServer,
+                        SocketServer.StreamRequestHandler,
+                        self.stream_examine)
+
+    if HAVE_FORKING:
+        def test_ForkingTCPServer(self):
+            with simple_subprocess(self):
+                self.run_server(SocketServer.ForkingTCPServer,
+                                SocketServer.StreamRequestHandler,
+                                self.stream_examine)
+
+    if HAVE_UNIX_SOCKETS:
+        def test_UnixStreamServer(self):
+            self.run_server(SocketServer.UnixStreamServer,
+                            SocketServer.StreamRequestHandler,
+                            self.stream_examine)
+
+        def test_ThreadingUnixStreamServer(self):
+            self.run_server(SocketServer.ThreadingUnixStreamServer,
+                            SocketServer.StreamRequestHandler,
+                            self.stream_examine)
+
+        if HAVE_FORKING:
+            def test_ForkingUnixStreamServer(self):
+                with simple_subprocess(self):
+                    self.run_server(ForkingUnixStreamServer,
+                                    SocketServer.StreamRequestHandler,
+                                    self.stream_examine)
+
+    def test_UDPServer(self):
+        self.run_server(SocketServer.UDPServer,
+                        SocketServer.DatagramRequestHandler,
+                        self.dgram_examine)
+
+    def test_ThreadingUDPServer(self):
+        self.run_server(SocketServer.ThreadingUDPServer,
+                        SocketServer.DatagramRequestHandler,
+                        self.dgram_examine)
+
+    if HAVE_FORKING:
+        def test_ForkingUDPServer(self):
+            with simple_subprocess(self):
+                self.run_server(SocketServer.ForkingUDPServer,
+                                SocketServer.DatagramRequestHandler,
+                                self.dgram_examine)
+
+    # Alas, on Linux (at least) recvfrom() doesn't return a meaningful
+    # client address so this cannot work:
+
+    # if HAVE_UNIX_SOCKETS:
+    #     def test_UnixDatagramServer(self):
+    #         self.run_server(SocketServer.UnixDatagramServer,
+    #                         SocketServer.DatagramRequestHandler,
+    #                         self.dgram_examine)
+    #
+    #     def test_ThreadingUnixDatagramServer(self):
+    #         self.run_server(SocketServer.ThreadingUnixDatagramServer,
+    #                         SocketServer.DatagramRequestHandler,
+    #                         self.dgram_examine)
+    #
+    #     if HAVE_FORKING:
+    #         def test_ForkingUnixDatagramServer(self):
+    #             self.run_server(SocketServer.ForkingUnixDatagramServer,
+    #                             SocketServer.DatagramRequestHandler,
+    #                             self.dgram_examine)
+
+    @reap_threads
+    def test_shutdown(self):
+        # Issue #2302: shutdown() should always succeed in making an
+        # other thread leave serve_forever().
+        class MyServer(SocketServer.TCPServer):
+            pass
+
+        class MyHandler(SocketServer.StreamRequestHandler):
+            pass
+
+        threads = []
+        for i in range(20):
+            s = MyServer((HOST, 0), MyHandler)
+            t = threading.Thread(
+                name='MyServer serving',
+                target=s.serve_forever,
+                kwargs={'poll_interval':0.01})
+            t.daemon = True  # In case this function raises.
+            threads.append((t, s))
+        for t, s in threads:
+            t.start()
+            s.shutdown()
+        for t, s in threads:
+            t.join()
+
+
+def test_main():
+    if imp.lock_held():
+        # If the import lock is held, the threads will hang
+        raise unittest.SkipTest("can't run when import lock is held")
+
+    test.test_support.run_unittest(SocketServerTest)
+
+if __name__ == "__main__":
+    test_main()
+    signal_alarm(3)  # Shutdown shouldn't take more than 3 seconds.

-- 
Repository URL: http://hg.python.org/jython


More information about the Jython-checkins mailing list