asyncio subprocess PIPE output lost

yuzhichang yuzhichang at gmail.com
Tue Aug 19 03:39:07 EDT 2014


Hi all,
    I'm new to asyncio introduced by Python 3.4. I created two tasks each pings a host. I noticed some pieces of output will be lost(see "error: found icmp_seq gap"). If I changed to run only one task, this problem never occur.
    Do you have any idea?
    Thanks!
Zhichang

zhichyu at cto-team-6:~$ python3 asyn2.py
got line: b'PING tyr.global.tektronix.net (10.250.163.252) 56(84) bytes of data.\n'
got line: b'64 bytes from tyr.global.tektronix.net (10.250.163.252): icmp_seq=2 ttl=55 time=204 ms\n'
got line: b'PING babylon.rich.tek.com (10.250.155.123) 56(84) bytes of data.\n'
got line: b'64 bytes from babylon.rich.tek.com (10.250.155.123): icmp_seq=2 ttl=55 time=205 ms\n'
got line: b'64 bytes from babylon.rich.tek.com (10.250.155.123): icmp_seq=3 ttl=55 time=205 ms\n'
got line: b'64 bytes from tyr.global.tektronix.net (10.250.163.252): icmp_seq=3 ttl=55 time=205 ms\n'
got line: b'64 bytes from babylon.rich.tek.com (10.250.155.123): icmp_seq=4 ttl=55 time=196 ms\n'
got line: b'64 bytes from tyr.global.tektronix.net (10.250.163.252): icmp_seq=4 ttl=55 time=198 ms\n'
got line: b'64 bytes from tyr.global.tektronix.net (10.250.163.252): icmp_seq=5 ttl=55 time=181 ms\n'
got line: b'64 bytes from babylon.rich.tek.com (10.250.155.123): icmp_seq=5 ttl=55 time=182 ms\n'
got line: b'64 bytes from babylon.rich.tek.com (10.250.155.123): icmp_seq=6 ttl=55 time=180 ms\n'
got line: b'64 bytes from tyr.global.tektronix.net (10.250.163.252): icmp_seq=6 ttl=55 time=179 ms\n'
got line: b'64 bytes from babylon.rich.tek.com (10.250.155.123): icmp_seq=7 ttl=55 time=193 ms\n'
got line: b'64 bytes from tyr.global.tektronix.net (10.250.163.252): icmp_seq=7 ttl=55 time=196 ms\n'
got line: b'64 bytes from tyr.global.tektronix.net (10.250.163.252): icmp_seq=8 ttl=55 time=185 ms\n'
got line: b'64 bytes from babylon.rich.tek.com (10.250.155.123): icmp_seq=8 ttl=55 time=187 ms\n'
got line: b'64 bytes from babylon.rich.tek.com (10.250.155.123): icmp_seq=9 ttl=55 time=213 ms\n'
got line: b'64 bytes from tyr.global.tektronix.net (10.250.163.252): icmp_seq=9 ttl=55 time=211 ms\n'
got line: b'64 bytes from babylon.rich.tek.com (10.250.155.123): icmp_seq=10 ttl=55 time=192 ms\n'
got line: b'64 bytes from tyr.global.tektronix.net (10.250.163.252): icmp_seq=10 ttl=55 time=192 ms\n'
got line: b'64 bytes from tyr.global.tektronix.net (10.250.163.252): icmp_seq=11 ttl=55 time=210 ms\n'
got line: b'64 bytes from babylon.rich.tek.com (10.250.155.123): icmp_seq=12 ttl=55 time=210 ms\n'
error: found icmp_seq gap!

zhichyu at cto-team-6:~$ cat asyn2.py
#!/usr/bin/env python3.4

'''
refers to 
* http://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python/20697159#20697159
* 18.5.3.5.1. Example: Parallel execution of tasks
'''
import asyncio
import sys
import signal
import time
import re
from asyncio.subprocess import PIPE
from contextlib import closing
from functools import partial

icmp_seq = re.compile(b'icmp_seq=(?P<seq>\d+)')

@asyncio.coroutine
def ping(*args, max_line_size=1<<20, timeout=None, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()
    with_timeout = partial(asyncio.wait_for, timeout=timeout, loop=loop)

    # start child process
    process = yield from asyncio.create_subprocess_exec(*args, stdout=PIPE, stderr=PIPE, limit=max_line_size, loop=loop)
    try:
        seq = 0
        begin = loop.time()
        while(True):
            # read line (sequence of bytes ending with b'\n') asynchronously
            #line = yield from with_timeout(process.stdout.readline())
            line = yield from process.stdout.readline()
            print("got line:", line)
            sys.stdout.flush()
            m = icmp_seq.search(line)
            if(m):
                seq2 = int(m.group('seq'))
                if(seq!=0 and seq2!=seq + 1):
                    loop.stop()
                    print('error: found icmp_seq gap!')
                    break
                seq = seq2
            now = loop.time()
            if(now - begin > 1000):
                break
    finally:
        '''
        process.kill()
        rc = yield from with_timeout(process.wait())
        '''
        process.send_signal(signal.SIGTERM)
        (stdoutdata, stderrdata) = yield from with_timeout(process.communicate())
        print("final stdout:", stdoutdata)
        print("final stderr:", stderrdata)
        rc =  process.returncode
    return rc

#NOTE: use ProactorEventLoop on Windows as shown in the previous example
with closing(asyncio.get_event_loop()) as loop:
    # Some lines will be lost if running multiple tasks. Why?
    task1 = asyncio.async(ping("ping", "tyr.global.tektronix.net", timeout=5, loop=loop))
    task2 = asyncio.async(ping("ping", "babylon.rich.tek.com", timeout=5, loop=loop))
    tasks = [task1, task2]
    rc = loop.run_until_complete(asyncio.wait(tasks))
sys.exit(rc)



More information about the Python-list mailing list