should "self" be changed?

Marko Rauhamaa marko at pacujo.net
Thu May 28 11:01:46 EDT 2015


Anssi Saari <as at sci.fi>:

> Do you have an example of state pattern using nested classes and
> python? With a quick look I didn't happen to find one in any language.

Here's an sampling from my mail server:

========================================================================
class SMTPServerConnection(ServerConnection):
    #:     :     :
    #:     :     :

    def __init__(self, server, sock, domain, peer):
        super().__init__(server, sock)
        conn = self
        client_ip = peer[0]

        class STATE:
            def __str__(self):
                return self.__class__.__name__
            def handle_command(self, cmd):
                conn.log("handle_command (unexpected): {}".format(cmd))
                assert False
            def handle_bad_command(self, reason):
                conn.log("handle_bad_command (unexpected): {}".format(reason))
                assert False
            def handle_eof(self):
                conn.log("eof (unexpected)")
                assert False
            def terminate(self):
                assert False
        
        class IDLE(STATE):
            def handle_command(self, cmd):
                conn.log("handle_command: {}".format(cmd))
                verb, args = conn.split_cmd(cmd)
                if verb == b'EHLO':
                    self.process_ehlo_or_helo(
                        args,
                        "PIPELINING",
                        "SIZE {:d}".format(conn.MAX_MSG_SIZE),
                        "VRFY",
                        "EXPN",
                        "8BITMIME")
                elif verb == b'HELO':
                    self.process_ehlo_or_helo(args)
                elif verb in [ b'NOOP', b'RSET' ]:
                    conn.respond(250, "OK")
                elif verb == b'HELP':
                    conn.respond(214, "No help available")
                elif verb in [ b'VRFY', b'EXPN' ]:
                    conn.respond(550, "Access denied to you")
                elif verb == b'QUIT':
                    conn.respond(221, "{} Closing channel".format(
                            server.domain))
                    conn.shut_down()
                    conn.set_state(QUITTING)
                elif verb == b'MAIL':
                    self.handle_mail(args)
                elif verb in [ b'RCPT', b'DATA' ]:
                    conn.respond(503, "Bad sequence of commands")
                else:
                    conn.respond(500, "Command unrecognized")

            def process_ehlo_or_helo(self, args, *capabilities):
                if not args:
                    conn.respond(501, "Missing parameter")
                    return
                try:
                    # may be an IP address
                    conn.helo_domain_name = args[0].decode()
                except UnicodeError:
                    conn.respond(501, "Bad encoding in parameter")
                    return
                if conn.local_client:
                    conn.respond(250, server.domain, *capabilities)
                    conn.set_state(IDLE)
                    return
                ## todo: remove the "suspend" concept from mux
                conn.suspend()
                conn.log("authorize helo {} from {}".format(
                        conn.helo_domain_name, client_ip))
                conn.set_state(SPF_HELO)
                def callback(verdict, reason):
                    conn.state.handle_spf_verdict(
                        verdict, reason, *capabilities)
                conn.spf_query = server.spf_client.authorize_helo(
                    server.host, conn.helo_domain_name, server.family,
                    client_ip, callback, xid = id(conn))

            #:     :     :
            #:     :     :

        class SPF_HELO(STATE):
            def terminate(self):
                conn.resume()
                conn.spf_query.cancel()
                conn.close()
                if conn.timer is not None:
                    conn.timer.cancel()
                    conn.timer = None
                conn.set_state(ZOMBIE)

            def handle_spf_verdict(self, verdict, reason, *capabilities):
                conn.resume()
                conn.log("verdict {} reason {}".format(verdict, reason))
                # RFC 4408 §2.5.5 calls for leniency for SoftFail
                #if verdict in [ SPFClient.FAIL, SPFClient.SOFT_FAIL ]:
                if verdict in [ SPFClient.FAIL ]:
                    conn.respond(550, "SPF check failed: {}".format(reason))
                    conn.set_state(IDLE)
                    return
                conn.respond(250, server.domain, *capabilities)
                conn.set_state(IDLE)

        class SPF_SENDER(STATE):
            def terminate(self):
                conn.resume()
                conn.spf_query.cancel()
                conn.close()
                if conn.timer is not None:
                    conn.timer.cancel()
                    conn.timer = None
                conn.set_state(ZOMBIE)

            def handle_spf_verdict(self, verdict, reason):
                conn.resume()
                conn.log("verdict {} reason {}".format(verdict, reason))
                # RFC 4408 §2.5.5 calls for leniency for SoftFail
                #if verdict in [ SPFClient.FAIL, SPFClient.SOFT_FAIL ]:
                if verdict in [ SPFClient.FAIL ]:
                    conn.respond(550, "SPF check failed: {}".format(reason))
                    conn.set_state(IDLE)
                    return
                conn.spf_sender_verdict = verdict
                conn.spf_sender_reason = reason
                conn.respond(250, "OK")
                conn.set_state(RECIPIENTS)

        class RECIPIENTS(STATE):
            def handle_command(self, cmd):
                conn.log("handle_command: {}".format(cmd))
                verb, args = conn.split_cmd(cmd)
                if verb in [ b'EHLO', b'HELO', b'MAIL' ]:
                    conn.respond(503, "Bad sequence of commands")
                elif verb == b'NOOP':
                    conn.respond(250, "OK")
                elif verb == b'HELP':
                    conn.respond(214, "No help available")
                elif verb == b'VRFY' or verb == b'EXPN':
                    conn.respond(550, "Access denied to you")
                elif verb == b'RSET':
                    conn.set_state(IDLE)
                    conn.respond(250, "OK")
                elif verb == b'QUIT':
                    conn.respond(221, "{} Closing channel".format(
                            server.domain))
                    conn.shut_down()
                    conn.set_state(QUITTING)
                elif verb == b'RCPT':
                    self.handle_rcpt(args)
                elif verb == b'DATA':
                    self.handle_data(args)
                else:
                    conn.respond(500, "Command unrecognized")

            #:     :     :
            #:     :     :

        self.send(("220 {} Service ready\r\n".format(domain)).encode())
        self.timer = self.mux.after(self.SERVER_TIMEOUT, self.handle_timeout)
        #:     :     :
        #:     :     :
        self.state = IDLE()

    def set_state(self, state):
        new_state = state()
        self.log("set_state: {} -> {}".format(self.state, new_state))
        self.state = new_state

    def handle_command(self, cmd):
        self.state.handle_command(cmd)

    def handle_timeout(self):
        if self.timer is None:
            return
        self.timer = None
        self.log("timeout")
        self.state.terminate()

    def handle_spf_verdict(self, verdict, reason, *capabilities):
        self.state.handle_spf_verdict(verdict, reason, *capabilities)

    #:     :     :
    #:     :     :
========================================================================


Marko



More information about the Python-list mailing list