[Python-ideas] Better integration of multiprocessing with asyncio

Dan O'Reilly oreilldf at gmail.com
Sun Jul 27 05:34:29 CEST 2014


Right, this is the same approach I've used myself. For example, the
AsyncProcessQueue in my example above was implemented like this:

def AsyncProcessQueue(maxsize=0):
    m = Manager()
    q = m.Queue(maxsize=maxsize)
    return _ProcQueue(q)

class _ProcQueue(object):
    def __init__(self, q):
        self._queue = q
        self._executor = self._get_executor()
        self._cancelled_join = False

    def __getstate__(self):
        self_dict = self.__dict__
        self_dict['_executor'] = None
        return self_dict

    def _get_executor(self):
        return ThreadPoolExecutor(max_workers=cpu_count())

    def __setstate__(self, self_dict):
        self_dict['_executor'] = self._get_executor()
        self.__dict__.update(self_dict)

    def __getattr__(self, name):
        if name in ['qsize', 'empty', 'full', 'put', 'put_nowait',
                    'get', 'get_nowait', 'close']:
            return getattr(self._queue, name)
        else:
            raise AttributeError("'%s' object has no attribute '%s'" %
                                    (self.__class__.__name__, name))

    @asyncio.coroutine
    def coro_put(self, item):
        loop = asyncio.get_event_loop()
        return (yield from loop.run_in_executor(self._executor, self.put,
item))

    @asyncio.coroutine
    def coro_get(self):
        loop = asyncio.get_event_loop()
        return (yield from loop.run_in_executor(self._executor, self.get))

    def cancel_join_thread(self):
        self._cancelled_join = True
        self._queue.cancel_join_thread()

    def join_thread(self):
        self._queue.join_thread()
        if self._executor and not self._cancelled_join:
            self._executor.shutdown()

I'm wondering if a complete library providing this kind of behavior for all
or some subset of multiprocessing is worth adding to the the asyncio
module, or if you prefer users to deal with this on their own (or perhaps
just distribute something that provides this behavior as a stand-alone
library). I suppose adding asyncio-friendly methods to the existing objects
in multiprocessing is also an option, but I doubt its desirable to add
asyncio-specific code to modules other than asyncio.

It also sort of sounds like some of the work that's gone on in Billiard
would make the alternative, more complicated approach you mentioned a
realistic possibility, at least going by this comment by Ask Solem (from
http://bugs.python.org/issue9248#msg221963):

> we have a version of multiprocessing.Pool using async IO and one pipe per process that drastically improves performance and also avoids the threads+forking issues (well, not the initial fork), but I have not yet adapted it to use the new asyncio module in 3.4.

I don't know the details there, though. Hopefully someone more
familiar with Billiard/multiprocessing than I am can provide some
additional information.





On Sat, Jul 26, 2014 at 10:39 PM, Guido van Rossum <guido at python.org> wrote:

> I actually know very little about multiprocessing (have never used it) but
> I imagine the way you normally interact with multiprocessing is using a
> synchronous calls that talk to the subprocesses and their work queues and
> so on, right?
>
> In the asyncio world you would put that work in a thread and then use
> run_in_executor() with a thread executor -- the thread would then be
> managing the subprocesses and talking to them. While you are waiting for
> that thread to complete your other coroutines will still work.
>
> Unless you want to rewrite the communication and process management as
> coroutines, but that sounds like a lot of work.
>
>
> On Sat, Jul 26, 2014 at 1:59 PM, Dan O'Reilly <oreilldf at gmail.com> wrote:
>
>> I think it would be helpful for folks using the asyncio module to be able
>> to make non-blocking calls to objects in the multiprocessing module more
>> easily. While some use-cases for using multiprocessing can be replaced with
>> ProcessPoolExecutor/run_in_executor, there are others that cannot; more
>> advanced usages of multiprocessing.Pool aren't supported by
>> ProcessPoolExecutor (initializer/initargs, contexts, etc.), and other
>> multiprocessing classes like Lock and Queue have blocking methods that
>> could be made into coroutines.
>>
>> Consider this (extremely contrived, but use your imagination) example of
>> a asyncio-friendly Queue:
>>
>> import asyncio
>> import time
>>
>> def do_proc_work(q, val, val2):
>>     time.sleep(3)  # Imagine this is some expensive CPU work.
>>     ok = val + val2
>>     print("Passing {} to parent".format(ok))
>>     q.put(ok) # The Queue can be used with the normal blocking API, too.
>>     item = q.get()
>>     print("got {} back from parent".format(item))
>>
>> def do_some_async_io_task():
>>     # Imagine there's some kind of asynchronous I/O
>>     # going on here that utilizes asyncio.
>>     asyncio.sleep(5)
>>
>> @asyncio.coroutine
>> def do_work(q):
>>     loop.run_in_executor(ProcessPoolExecutor(),
>>                          do_proc_work, q, 1, 2)
>>     do_some_async_io_task()
>>     item = yield from q.coro_get() # Non-blocking get that won't affect
>> our io_task
>>     print("Got {} from worker".format(item))
>>     item = item + 25
>>     yield from q.coro_put(item)
>>
>>
>> if __name__  == "__main__":
>>     q = AsyncProcessQueue()  # This is our new asyncio-friendly version
>> of multiprocessing.Queue
>>     loop = asyncio.get_event_loop()
>>     loop.run_until_complete(do_work(q))
>>
>> I have seen some rumblings about a desire to do this kind of integration
>> on the bug tracker (http://bugs.python.org/issue10037#msg162497 and
>> http://bugs.python.org/issue9248#msg221963) though that discussion is
>> specifically tied to merging the enhancements from the Billiard library
>> into multiprocessing.Pool. Are there still plans to do that? If so, should
>> asyncio integration with multiprocessing be rolled into those plans, or
>> does it make sense to pursue it separately?
>>
>> Even more generally, do people think this kind of integration is a good
>> idea to begin with? I know using asyncio is primarily about *avoiding* the
>> headaches of concurrent threads/processes, but there are always going to be
>> cases where CPU-intensive work is going to be required in a primarily
>> I/O-bound application. The easier it is to for developers to handle those
>> use-cases, the better, IMO.
>>
>> Note that the same sort of integration could be done with the threading
>> module, though I think there's a fairly limited use-case for that; most
>> times you'd want to use threads over processes, you could probably just use
>> non-blocking I/O instead.
>>
>> Thanks,
>> Dan
>>
>>
>> _______________________________________________
>> Python-ideas mailing list
>> Python-ideas at python.org
>> https://mail.python.org/mailman/listinfo/python-ideas
>> Code of Conduct: http://python.org/psf/codeofconduct/
>>
>
>
>
> --
> --Guido van Rossum (python.org/~guido)
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-ideas/attachments/20140726/75e41028/attachment.html>


More information about the Python-ideas mailing list