[Python-ideas] Better integration of multiprocessing with asyncio

Guido van Rossum guido at python.org
Sun Jul 27 05:43:07 CEST 2014


I'm going to go out on a limb here and say that it feels too early to me.
First someone has to actually solve this problem well as a 3rd party
package before we can talk about adding it to the asyncio package. It
doesn't actually sound like Billiards has adapted to asyncio yet (not that
I have any idea what Billiards is -- it sounds like a fork of
multiprocessing actually?).


On Sat, Jul 26, 2014 at 8:34 PM, Dan O'Reilly <oreilldf at gmail.com> wrote:

> Right, this is the same approach I've used myself. For example, the
> AsyncProcessQueue in my example above was implemented like this:
>
> def AsyncProcessQueue(maxsize=0):
>     m = Manager()
>     q = m.Queue(maxsize=maxsize)
>     return _ProcQueue(q)
>
> class _ProcQueue(object):
>     def __init__(self, q):
>         self._queue = q
>         self._executor = self._get_executor()
>         self._cancelled_join = False
>
>     def __getstate__(self):
>         self_dict = self.__dict__
>         self_dict['_executor'] = None
>         return self_dict
>
>     def _get_executor(self):
>         return ThreadPoolExecutor(max_workers=cpu_count())
>
>     def __setstate__(self, self_dict):
>         self_dict['_executor'] = self._get_executor()
>          self.__dict__.update(self_dict)
>
>     def __getattr__(self, name):
>         if name in ['qsize', 'empty', 'full', 'put', 'put_nowait',
>                     'get', 'get_nowait', 'close']:
>             return getattr(self._queue, name)
>         else:
>             raise AttributeError("'%s' object has no attribute '%s'" %
>                                     (self.__class__.__name__, name))
>
>     @asyncio.coroutine
>     def coro_put(self, item):
>         loop = asyncio.get_event_loop()
>         return (yield from loop.run_in_executor(self._executor, self.put,
> item))
>
>     @asyncio.coroutine
>     def coro_get(self):
>         loop = asyncio.get_event_loop()
>         return (yield from loop.run_in_executor(self._executor, self.get))
>
>     def cancel_join_thread(self):
>         self._cancelled_join = True
>         self._queue.cancel_join_thread()
>
>     def join_thread(self):
>         self._queue.join_thread()
>         if self._executor and not self._cancelled_join:
>             self._executor.shutdown()
>
> I'm wondering if a complete library providing this kind of behavior for
> all or some subset of multiprocessing is worth adding to the the asyncio
> module, or if you prefer users to deal with this on their own (or perhaps
> just distribute something that provides this behavior as a stand-alone
> library). I suppose adding asyncio-friendly methods to the existing objects
> in multiprocessing is also an option, but I doubt its desirable to add
> asyncio-specific code to modules other than asyncio.
>
> It also sort of sounds like some of the work that's gone on in Billiard
> would make the alternative, more complicated approach you mentioned a
> realistic possibility, at least going by this comment by Ask Solem (from
> http://bugs.python.org/issue9248#msg221963):
>
> > we have a version of multiprocessing.Pool using async IO and one pipe per process that drastically improves performance and also avoids the threads+forking issues (well, not the initial fork), but I have not yet adapted it to use the new asyncio module in 3.4.
>
> I don't know the details there, though. Hopefully someone more familiar with Billiard/multiprocessing than I am can provide some additional information.
>
>
>
>
>
> On Sat, Jul 26, 2014 at 10:39 PM, Guido van Rossum <guido at python.org>
> wrote:
>
>> I actually know very little about multiprocessing (have never used it)
>> but I imagine the way you normally interact with multiprocessing is using a
>> synchronous calls that talk to the subprocesses and their work queues and
>> so on, right?
>>
>> In the asyncio world you would put that work in a thread and then use
>> run_in_executor() with a thread executor -- the thread would then be
>> managing the subprocesses and talking to them. While you are waiting for
>> that thread to complete your other coroutines will still work.
>>
>> Unless you want to rewrite the communication and process management as
>> coroutines, but that sounds like a lot of work.
>>
>>
>> On Sat, Jul 26, 2014 at 1:59 PM, Dan O'Reilly <oreilldf at gmail.com> wrote:
>>
>>> I think it would be helpful for folks using the asyncio module to be
>>> able to make non-blocking calls to objects in the multiprocessing module
>>> more easily. While some use-cases for using multiprocessing can be replaced
>>> with ProcessPoolExecutor/run_in_executor, there are others that cannot;
>>> more advanced usages of multiprocessing.Pool aren't supported by
>>> ProcessPoolExecutor (initializer/initargs, contexts, etc.), and other
>>> multiprocessing classes like Lock and Queue have blocking methods that
>>> could be made into coroutines.
>>>
>>> Consider this (extremely contrived, but use your imagination) example of
>>> a asyncio-friendly Queue:
>>>
>>> import asyncio
>>> import time
>>>
>>> def do_proc_work(q, val, val2):
>>>     time.sleep(3)  # Imagine this is some expensive CPU work.
>>>     ok = val + val2
>>>     print("Passing {} to parent".format(ok))
>>>     q.put(ok) # The Queue can be used with the normal blocking API, too.
>>>     item = q.get()
>>>     print("got {} back from parent".format(item))
>>>
>>> def do_some_async_io_task():
>>>     # Imagine there's some kind of asynchronous I/O
>>>     # going on here that utilizes asyncio.
>>>     asyncio.sleep(5)
>>>
>>> @asyncio.coroutine
>>> def do_work(q):
>>>     loop.run_in_executor(ProcessPoolExecutor(),
>>>                          do_proc_work, q, 1, 2)
>>>     do_some_async_io_task()
>>>     item = yield from q.coro_get() # Non-blocking get that won't affect
>>> our io_task
>>>     print("Got {} from worker".format(item))
>>>     item = item + 25
>>>     yield from q.coro_put(item)
>>>
>>>
>>> if __name__  == "__main__":
>>>     q = AsyncProcessQueue()  # This is our new asyncio-friendly version
>>> of multiprocessing.Queue
>>>     loop = asyncio.get_event_loop()
>>>     loop.run_until_complete(do_work(q))
>>>
>>> I have seen some rumblings about a desire to do this kind of integration
>>> on the bug tracker (http://bugs.python.org/issue10037#msg162497 and
>>> http://bugs.python.org/issue9248#msg221963) though that discussion is
>>> specifically tied to merging the enhancements from the Billiard library
>>> into multiprocessing.Pool. Are there still plans to do that? If so, should
>>> asyncio integration with multiprocessing be rolled into those plans, or
>>> does it make sense to pursue it separately?
>>>
>>> Even more generally, do people think this kind of integration is a good
>>> idea to begin with? I know using asyncio is primarily about *avoiding* the
>>> headaches of concurrent threads/processes, but there are always going to be
>>> cases where CPU-intensive work is going to be required in a primarily
>>> I/O-bound application. The easier it is to for developers to handle those
>>> use-cases, the better, IMO.
>>>
>>> Note that the same sort of integration could be done with the threading
>>> module, though I think there's a fairly limited use-case for that; most
>>> times you'd want to use threads over processes, you could probably just use
>>> non-blocking I/O instead.
>>>
>>> Thanks,
>>> Dan
>>>
>>>
>>> _______________________________________________
>>> Python-ideas mailing list
>>> Python-ideas at python.org
>>> https://mail.python.org/mailman/listinfo/python-ideas
>>> Code of Conduct: http://python.org/psf/codeofconduct/
>>>
>>
>>
>>
>> --
>> --Guido van Rossum (python.org/~guido)
>>
>
>


-- 
--Guido van Rossum (python.org/~guido)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mail.python.org/pipermail/python-ideas/attachments/20140726/7072bd9e/attachment.html>


More information about the Python-ideas mailing list