[Python-ideas] solving multi-core Python

Trent Nelson trent at snakebite.org
Thu Jun 25 06:59:04 CEST 2015


On Tue, Jun 23, 2015 at 11:01:24PM -0600, Eric Snow wrote:
> On Sun, Jun 21, 2015 at 5:41 AM, Sturla Molden <sturla.molden at gmail.com> wrote:
> > From the perspective of software design, it would be good it the CPython
> > interpreter provided an environment instead of using global objects. It
> > would mean that all functions in the C API would need to take the
> > environment pointer as their first variable, which will be a major rewrite.
> > It would also allow the "one interpreter per thread" design similar to tcl
> > and .NET application domains.
>
> While perhaps a worthy goal, I don't know that it fits in well with my
> goals.  I'm aiming for an improved multi-core story with a minimum of
> change in the interpreter.

This slide and the following two are particularly relevant:

    https://speakerdeck.com/trent/parallelism-and-concurrency-with-python?slide=4

I elicit three categories of contemporary problems where efficient
use of multiple cores would be desirable:

    1)  Computationally-intensive work against large data sets (the
        traditional "parallel" HPC/science/engineering space, and
        lately, to today's "Big Data" space).

    2a) Serving tens/hundreds of thousands of network clients with
        non-trivial computation required per-request (i.e. more than
        just buffer copying between two sockets); best example being
        the modern day web server, or:

    2b) Serving far fewer clients, but striving for the lowest latency
        possible in an environment with "maximum permitted latency"
        restrictions (or percentile targets, 99s etc).

In all three problem domains, there is a clear inflection point at
which multiple cores would overtake a single core in either:

    1)    Reducing the overall computation time.

    2a|b) Serving a greater number of clients (or being able to perform
          more complex computation per request) before hitting maximum
          permitted latency limits.

For PyParallel, I focused on 2a and 2b.  More specifically, a TCP/IP
socket server that had the ability to dynamically adjust its behavior
(low latency vs concurrency vs throughput[1]), whilst maintaining
optimal usage of underlying hardware[2].  That is: given sufficient
load, you should be able to saturate all I/O channels (network and
disk), or all cores, or both, with *useful* work.  (The next step
after saturation is sustained saturation (given sufficient load),
which can be even harder to achieve, as you need to factor in latencies
for "upcoming I/O" ahead of time if your computation is driven by
the results of a disk read (or database cursor fetch).)

(Sturla commented on the "import-DDoS" that you can run into on POSIX
 systems, which is a good example.  You're saturating your underlying
 hardware, sure, but you're not doing useful work -- it's important
 to distinguish the two.)

    Dynamically adjusting behavior based on low latency vs
    concurrency vs throughput:
        [1]: https://speakerdeck.com/trent/pyparallel-how-we-removed-the-gil-and-exploited-all-cores?slide=115
             https://speakerdeck.com/trent/pyparallel-how-we-removed-the-gil-and-exploited-all-cores?slide=120

    Optimal hardware use:
        [2]: https://speakerdeck.com/trent/parallelism-and-concurrency-with-python?slide=6

So, with the focus of PyParallel established (socket server that
could exploit all cores), my hypothesis was that I could find a
new way of doing things that was more performant than the status
quo.  (In particular, I wanted to make sure I had an answer for
"why not just use multiprocessing?" -- which is an important
question.)

    https://speakerdeck.com/trent/parallelism-and-concurrency-with-python?slide=22

So, I also made the decision to leverage threads for parallelism and
not processes+IPC, which it sounds like you're leaning toward as
well.  Actually, other than the subinterpreter implementation aspect,
everything you've described is basically on par with PyParallel, more
or less.

Now, going back to your original comment:

> While perhaps a worthy goal, I don't know that it fits in well with my
> goals.  I'm aiming for an improved multi-core story with a minimum of
> change in the interpreter.

That last sentence is very vague as multi-core means different things to
different people.  What is the problem domain you're going to try and
initially target?  Computationally-intensive parallel workloads like in
1), or the network I/O-driven socket server stuff like in 2a/2b?

I'd argue it should be the latter.  Reason being is that you'll rarely
see the former problem tackled solely by pure Python -- e.g. Python may
be gluing everything together, but the actual computation will be handled
by something like NumPy/Numba/Fortran/Cython or custom C stuff, and, as
Sturla's mentioned, OpenMP and MPI usually gets involved to manage the
parallel aspect.

For the I/O-driven socket server stuff, though, you already have this
nice delineation of what would be run serially versus what would be
ideal to run in parallel:

    import datrie
    import numpy as np
    import pyodbc
    import async
    from collections import defaultdict
    from async.http.server import (
        router,
        make_routes,
        HttpServer,
        RangedRequest,
    )

    # Tell PyParallel to invoke the tp_dealloc method explicitly
    # for these classes when rewinding a heap after a parallel
    # callback has finished.  (Implementation detail: this toggles
    # the Py_TPFLAGS_PX_DEALLOC flag in the TypeObject's tp_flags;
    # when PyParallel intercepts PyObject_NEW/INIT (init_object),
    # classes (PyTypeObject *tp) with this flag set will be tracked
    # in a linked-list that is local to the parallel context being
    # used to service this client.  When the context has its heaps
    # rewound back to the initial state at the time of the snapshot,
    # it will call tp_dealloc() explicitly against all objects of
    # this type that were encountered.)
    async.register_dealloc(pyodbc.Connection)
    async.register_dealloc(pyodbc.Cursor)
    async.register_dealloc(pyodbc.Row)

    # Load 29 million titles.  RSS += ~9.5GB.
    TITLES = datrie.Trie.load('titles.trie')
    # Load 15 million 64-bit offsets. RSS += ~200MB.
    OFFSETS = np.load('offsets.npy')
    XML = 'enwiki-20150205-pages-articles.xml'

    class WikiServer(HttpServer):
        # All of these methods are automatically invoked in
        # parallel.  HttpServer implements a data_received()
        # method which prepares the request object and then
        # calls the relevant method depending on the URL, e.g.
        # http://localhost/user/foo will call the user(request,
        # name='foo').  If we want to "write" to the client,
        # we return a bytes, bytearray or unicode object from
        # our callback (that is, we don't expose a socket.write()
        # to the user).
        #
        # Just before the PyParallel machinery invokes the
        # callback (via a simple PyObject_CallObject), though,
        # it takes a snapshot of its current state, such that
        # the exact state can be rolled back to (termed a socket
        # "rewind") when this callback is complete.  If we don't
        # return a sendable object back, this rewind happens
        # immediately, and then we go straight into a read call.
        # If we do return something sendable, we send it.  When
        # that send completes, *then* we do the rewind, then we
        # issue the next read/recv call.
        #
        # This approach is particularly well suited to parallel
        # callback execution because none of the objects we create
        # as part of the callback are needed when the callback
        # completes.  No garbage can accumulate because nothing
        # can live longer than that callback.  That obviates the
        # need for two things: reference counting against any object
        # in a parallel context, and garbage collection.  Those
        # things are useful for the main thread, but not parallel
        # contexts.
        #
        # What if you do want to keep something around after
        # the callback?  If it's a simple scalar type, the
        # following will work:
        #   class Server:
        #       name = None
        #       @route
        #       def set_name(self, request, name):
        #           self.name = name.upper()
        #           ^^^^^^^^^ we intercept that setattr and make
        #                     a copy of (the result of) name.upper()
        #                     using memory allocation from a different
        #                     heap that persists as long as the client
        #                     stays connnected.  (There's actually
        #                     support for alternatively persisting
        #                     the entire heap that the object was
        #                     allocated from, which we could use if
        #                     we were persisting complex, external,
        #                     or container types where simply doing
        #                     a memcpy() of a *base + size_t wouldn't
        #                     be feasible.  However, I haven't wired
        #                     up this logic to the socket context
        #                     logic yet.)
        #       @route
        #       def name(self, request):
        #           return json_serialization(request, self.name)
        #                                              ^^^^^^^^^
        #                                   This will return whatever
        #                                   was set in the call above.
        #                                   Once the client disconnects,
        #                                   the value disappears.
        #
        #       (Actually I think if you wanted to persist the object
        #        for the lifetime of the server, you could probably
        #        do `request.transport.parent.name = xyz`; or at least,
        #        if that doesn't currently work, the required mechanics
        #        definitely exist, so it would just need to be wired
        #        up.)
        #
        # If you want to keep an object around past the lifetime of
        # the connected client and the server, then send it to the main
        # thread where it can be tracked like a normal Python object:
        #
        # USERS = async.dict()
        #         ^^^^^^^^^^^^ shortcut for:
        #                           foo = {}
        #                           async.protect(foo)
        #                      or just:
        #                           foo = async.protect({})
        #         (On the backend, this instruments[3] the object such
        #          that PyParallel can intercept setattr/setitem and
        #          getattr/getitem calls and "do stuff"[4], depending
        #          on the context.)

[3]: https://bitbucket.org/tpn/pyparallel/src/8528b11ba51003a9821ceb75683ee96ed33db28a/Python/pyparallel.c?at=3.3-px#cl-1796
[4]: https://bitbucket.org/tpn/pyparallel/src/8528b11ba51003a9821ceb75683ee96ed33db28a/Python/pyparallel.c?at=3.3-px#cl-1632

        #
        # class MyServer(HttpServer):
        #   @route
        #   ^^^^^^ Ignore the mechanics of this, it's just a helper
        #          decorator I used to translate a HTTP GET for
        #          /login/foo to a function call of `login(name='foo')`.
        #          (see the bowls of async.http.server for details).
        #   def login(self, request, name):
        #       @call_from_main_thread
        #       def _save_name(n):
        #           USERS[n] = async.rdtsc()
        #           return len(USERS)
        #       count = _save_name(name)
        #       return json_serialization(request, {'count': count})
        #
        # The @call_from_main_thread decorator will enqueue a work
        # item to the main thread, and then wait on the main thread's
        # response.  The main thread executes the callback and notifies
        # the parallel thread that the call has been completed and the
        # return value (in this case the value of `len(USERS)`).  The
        # parallel thread resumes and finishes the client request.
        # Note that this will implicitly serialize execution; any number
        # of parallel requests can submit main thread work, but the
        # main thread can only call them one at a time.  So, you'd
        # usually try and avoid this, or at least remove it from your
        # application's hot code path.

        connect_string = None
        all_users_sql = 'select * from user'
        one_user_sql = 'select * from user where login = ?'

        secret_key = None

        @route
        def wiki(self, request, name):
            # http://localhost/wiki/Python: name = Python
            if name not in TITLES:
                self.error(request, 404)

            # log(n) lookup against a trie with 29 million keys.
            offset = TITLES[name][0]
            # log(n) binary search against a numpy array with 15
            # million int64s.
            ix = OFFSETS.searchsorted(offset, side='right')
            # OFFSETS[ix] = what's the offset after this?
            (start, end) = (ix-7, OFFSETS[ix]-11)
            # -7, +11 = adjust for the fact that all of the offsets
            # were calculated against the '<' of '<title>Foo</title>'.
            range_request = '%d-%d' % (start, end)
            request.range = RangedRequest(range_request)
            request.response.content_type = 'text/xml; charset=utf-8'
            return self.sendfile(request, XML)

        @route
        def users(self, request):
            # ODBC driver managers that implement connection pooling
            # behind the scenes play very nicely with our
            # pyodbc.connect() call here, returning a connection
            # from the pool (when able) without blocking.
            con = pyodbc.connect(self.connect_string)

            # The next three odbc calls would all block (in the
            # traditional sense), so this current thread would
            # not be able to serve any other requests whilst
            # waiting for completion -- however, this is far
            # less of a problem for PyParallel than single-threaded
            # land as other threads will keep servicing requests
            # in the mean time.  (ODBC 3.8/SQL Server 2012/Windows 8
            # did introduce async notification, such that we could
            # request that an event be set when the cursor/query/call
            # has completed, which we'd tie in to PyParallel by
            # submitting a threadpool wait (much like we do for async
            # DNS lookup[5], also added in Windows 8), however, it was
            # going to require a bit of modification to the pyodbc
            # module to support the async calling style, so, all the
            # calls stay synchronous for now.)

[5]: https://bitbucket.org/tpn/pyparallel/src/8528b11ba51003a9821ceb75683ee96ed33db28a/Python/pyparallel.c?at=3.3-px#cl-7616

            cur = con.cursor()
            cur.execute(self.all_users_sql)
            return json_serialization(request, cur.fetchall())

        @route
        def user(self, request, login):
            con = pyodbc.connect(self.connect_string)
            cur = con.cursor()
            cur.execute(self.one_user_sql, (login,))
            return json_serialization(request, cur.fetchall())

        @route
        def set_secret_key(self, request, key):
            # http://localhost/set_secret_key/foobar
            # An example of persisting a scalar for the lifetime
            # of the thread (that is, until it disconects or EOFs).
            try:
                self.secret_key = [ key, ]
            except ValueError:
                # This would be hit, because we've got guards in place
                # to assess the "clonability" of an object at this
                # point[6].  (Ok, after reviewing the code, we don't,
                # but at least we'd crash.)
[6]: https://bitbucket.org/tpn/pyparallel/src/8528b11ba51003a9821ceb75683ee96ed33db28a/Python/pyparallel.c?at=3.3-px#cl-4944

            # However, this would work fine, essentially memcpy'ing
            # the key object at the time of assignment using a different
            # heap to the one that automatically gets reset at the end
            # of the callback.
            self.secret_key = key

        @route
        def secret_key(self, request):
            # http://localhost/secret_key -> 'foobar'
            return json_serialization(request, {'key': self.secret_key})

        @route
        def stats(self, request):
            # Handy little json representation of various system stats;
            # active parallel contexts, I/O hogs, memory load, etc.
            stats = {
                'system': dict(sys_stats()),
                'server': dict(socket_stats(request.transport.parent)),
                'memory': dict(memory_stats()),
                'contexts': dict(context_stats()),
                'elapsed': request.transport.elapsed(),
                'thread': async.thread_seq_id(),
            }
            return json_serialization(request, stats)

        @route
        def debug(self, request):
            # Don't call print() or any of the sys.std(err|out)
            # methods in a parallel context.  If you want to do some
            # poor man's debugging with print statements in lieu of not
            # being able to attach a pdb debugger (tracing is disabled
            # in parallel threads), then use async.debug().  (On
            # Windows, this writes the message to the debug stream,
            # which you'd monitor via dbgview or VS.)
            async.debug("received request: %s" % request.data)

            # Avoid repr() at the moment in parallel threads; it uses
            # PyThreadState_SetDictItem() to control recursion depths,
            # which I haven't made safe to call from a parallel context.

            # If you want to attach Visual Studio debugger at this point
            # though, you can do so via:
            async.debugbreak()
            # (That literally just generates an INT 3.)

        @route
        def shutdown(self, request):
            # Handy helper for server shutdown (stop listening on the
            # bound IP:PORT, wait for all running client callbacks to
            # complete, then return.  Totally almost works at the
            # moment[7].)
[7]: https://bitbucket.org/tpn/pyparallel/src/8528b11ba51003a9821ceb75683ee96ed33db28a/Python/pyparallel.c?at=3.3-px#cl-11818
            request.transport.shutdown()

    def main():
        server = async.server('0.0.0.0', port)
        protocol = HttpServer
        protocol.connect_string = 'Driver={SQL Server}...'
        async.register(transport=server, protocol=protocol)
        ^^^^^^^^^^^^^^ this will create a special 'server' instance
                       of the protocol, which will issue the bind()
                       call.  It then creates a configurable number
                       (currently ncpu * 2) of parallel contexts
                       and triggers parallel AcceptEx() invocation
                       (you can prime "pre-accepted" sockets on Windows,
                       which removes the serialization limits of
                       accept() on POSIX).

        # If an exception occurs in a parallel thread, it is queued
        # to a special list the main thread has.  The main thread
        # checks this list each time async.run_once() is called, so,
        # we call it here just to propagate any exceptions that
        # may have already occurred (like attempting to bind to an
        # invalid IP, or submitting a protocol that had an error).
        async.run_once()
        return server
        # (This also facilitates interactive console usage whilst
        #  serving request in parallel.)

    if __name__ == '__main__':
        main()
        # Run forever.  Returns when there are no active contexts
        # or ctrl-c is pressed.
        async.run()

All of that works *today* with PyParallel.  The main thread preps
everything, does the importing, loads the huge data structures,
establishes all the code objects and then, once async.run() is called,
sits there dormant waiting for feedback from the parallel threads.

It's not perfect; I haven't focused on clean shutdown yet, so you will
100% crash if you ctrl-C it currently.  That's mainly an issue with
interpreter finalization destroying the GIL, which clears our
Py_MainThreadId, which makes all the instrumented macros like
Py_INCREF/Py_DECREF think they're in a parallel context when they're
not, which... well, you can probably guess what happens after that if
you've got 8 threads still running at the time pointer dereferencing
things that aren't what they think they are.

None of the problems are showstoppers though, it's just a matter of
prioritization and engineering effort.  My strategic priorities to date
have been:
    a) no changes to semantics of CPython API
    b) high performance
    c) real-world examples

Now, given that this has been something I've mostly worked on in my own
time, my tactical priority each development session (often started after
an 8 hour work day where I'm operating at reduced brain power) is simply:
    a) forward progress at any cost

The quickest hack I can think of that'll address the immediate problem
is the one that gets implemented.  That hack will last until it stops
working, at which point, the quickest hack I can think of to replace it
wins, and so on.  At no time do I consider the maintainability, quality
or portability of the hack -- as long as it moves the overall needle
forward, perfect; it can be made elegant later.

I think it's important to mention that, because if you're reviewing the
source code, it helps explain things like how I implemented the
persistence of an object within a client session (e.g. intercepting the
setattr/setitem and doing the alternate heap memcpy dance alluded to
above):

    https://bitbucket.org/tpn/pyparallel/src/8528b11ba51003a9821ceb75683ee96ed33db28a/diffs/Objects/dictobject.c.patch?at=3.3-px#cl-28

Without that bit of code, you'll leak memory, with it, you won't.

I attacked pyodbc a few weeks ago -- it was also leaking memory
when called from parallel callbacks because tp_dealloc wasn't being
called on any of the Connection, Cursor or Row objects, so handles
that were allocated (i.e. SQLAllocHandle()) were never paired with a
SQLFreeHandle() (because we don't refcount in a parallel context, which
means there's never a Py_DECREF that hits 0, which means Py_Dealloc()
never gets called for that object (which works fine for everything
that allocates via PyObject/PyMem facilities, because we intercept those
and roll them back in bulk)), and thus, leak.

Quickest fix I could think of at the time:

    async.register_dealloc(pyodbc.Connection)
    async.register_dealloc(pyodbc.Cursor)
    async.register_dealloc(pyodbc.Row)

Which facilitates this during our interception of PyObject_NEW/INIT:

    https://bitbucket.org/tpn/pyparallel/src/8528b11ba51003a9821ceb75683ee96ed33db28a/Python/pyparallel.c?at=3.3-px#cl-3387

Which allows us to do this for each heap...

    https://bitbucket.org/tpn/pyparallel/src/8528b11ba51003a9821ceb75683ee96ed33db28a/Python/pyparallel.c?at=3.3-px#cl-873

....that we encounter as part of "socket rewinding":

    https://bitbucket.org/tpn/pyparallel/src/8528b11ba51003a9821ceb75683ee96ed33db28a/Python/pyparallel.c?at=3.3-px#cl-793

Absolutely horrendous hack from a software engineering perspective, but
is surprisingly effective at solving the problem.

Regards,

    Trent.


More information about the Python-ideas mailing list