Async subprocess context manager

Peter Sutton petersutton2009 at gmail.com
Sat Aug 31 09:36:50 EDT 2019


Hi all,

First time posting! I need an async context manager that ensures a
Process has finished before it `__exit__()`s, either by graceful or
forceful termination, and regardless of cancellation. I've put my code
at the bottom.

I'm relatively new to asyncio, so I'm looking for feedback on any
aspect of the code. Not only correctness, but things like if I'm
handling cancellation correctly or is the a more idiomatic way to go
back this?

Thanks,


Peter.


```
from __future__ import annotations
import asyncio
import time
from contextlib import asynccontextmanager
from typing import AsyncIterator, Optional


@asynccontextmanager
async def terminating(proc: asyncio.subprocess.Process, timeout: float) \
        -> AsyncIterator[asyncio.subprocess.Process]:
    try:
        yield proc
    finally:
        try:
            proc.terminate()
        except ProcessLookupError:
            pass
        else:
            start = time.time()
            while True:
                remaining = timeout - (time.time() - start)
                try:
                    await asyncio.wait_for(proc.wait(), remaining)
                except asyncio.CancelledError:
                    is_done = False
                    is_cancelled = True
                except asyncio.TimeoutError:
                    is_done = False
                    is_cancelled = False
                    break
                else:
                    print('Terminated')
                    is_done = True
                    is_cancelled = False
                    break
            if not is_done:
                try:
                    proc.kill()
                except ProcessLookupError:
                    pass
                else:
                    while True:
                        try:
                            proc.wait()
                        except asyncio.CancelledError:
                            is_cancelled = True
                        else:
                            print('Killed')
                            break
            if is_cancelled:
                raise asyncio.CancelledError()


async def test(sleep: float) -> None:
    proc = await asyncio.create_subprocess_shell('sleep 10')
    async with terminating(proc, 1):
        await asyncio.sleep(sleep)


async def main():
    await test(1)

    task = asyncio.create_task(test(1))
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass


asyncio.run(main())
```



More information about the Python-list mailing list