draining pipes simultaneously

bockman at virgilio.it bockman at virgilio.it
Wed Mar 5 05:34:14 EST 2008


On 5 Mar, 10:33, "Dmitry Teslenko" <dtesle... at gmail.com> wrote:
> Hello!
> Here's my implementation of a function that executes some command and
> drains stdout/stderr invoking other functions for every line of
> command output:
>
> def __execute2_drain_pipe(queue, pipe):
>         for line in pipe:
>                 queue.put(line)
>         return
>
> def execute2(command, out_filter = None, err_filter = None):
>         p = subprocess.Popen(command , shell=True, stdin = subprocess.PIPE, \
>                 stdout = subprocess.PIPE, stderr = subprocess.PIPE, \
>                 env = os.environ)
>
>         qo = Queue.Queue()
>         qe = Queue.Queue()
>
>         to = threading.Thread(target = __execute2_drain_pipe, \
>                 args = (qo, p.stdout))
>         to.start()
>         time.sleep(0)
>         te = threading.Thread(target = __execute2_drain_pipe, \
>                 args = (qe, p.stderr))
>         te.start()
>
>         while to.isAlive() or te.isAlive():
>                 try:
>                         line = qo.get()
>                         if out_filter:
>                                 out_filter(line)
>                         qo.task_done()
>                 except Queue.Empty:
>                         pass
>
>                 try:
>                         line = qe.get()
>                         if err_filter:
>                                 err_filter(line)
>                         qe.task_done()
>                 except Queue.Empty:
>                         pass
>
>         to.join()
>         te.join()
>         return p.wait()
>
> Problem is my implementation is buggy and function hungs when there's
> empty stdout/stderr. Can I have your feedback?

The Queue.get method by default is blocking. The documentation is not
100% clear about that (maybe it should report
the full python definition of the function parameters, which makes
self-evident the default value) but if you do
help(Queue.Queue) in a python shell you will see it.

Hence, try using a timeout or a non-blocking get (but in case of a non
blocking get you should add a delay in the
loop, or you will poll the queues at naximum speed and maybe prevent
the other threads from accessing them).

Ciao
-----
FB




More information about the Python-list mailing list