[issue25593] _sock_connect_cb can be called twice resulting in InvalidStateError

Alexander Mohr report at bugs.python.org
Wed Nov 11 18:26:10 EST 2015


Alexander Mohr added the comment:

Perhaps I'm doing something really stupid, but I was able to reproduce the two issues I'm having with the following sample script. If you leave the monkey patch disabled, you get the InvalidStateError, if you enable it, you get the ServerDisconnect errors that I'm currently seeing which I work-around with retries.  Ideas?

import asyncio
import aiohttp
import multiprocessing
import aiohttp.server
import logging
import traceback

# Monkey patching
import asyncio.selector_events

# http://bugs.python.org/issue25593
if False:
    orig_sock_connect_cb = asyncio.selector_events.BaseSelectorEventLoop._sock_connect_cb
    def _sock_connect_cb(self, fut, sock, address):
        if fut.done(): return
        return orig_sock_connect_cb(self, fut, sock, address)
    asyncio.selector_events.BaseSelectorEventLoop._sock_connect_cb = _sock_connect_cb


class HttpRequestHandler(aiohttp.server.ServerHttpProtocol):
    @asyncio.coroutine
    def handle_request(self, message, payload):
        response = aiohttp.Response(self.writer, 200, http_version=message.version)
        response.add_header('Content-Type', 'text/html')
        response.add_header('Content-Length', '18')
        response.send_headers()
        yield from asyncio.sleep(0.5)
        response.write(b'<h1>It Works!</h1>')
        yield from response.write_eof()


def process_worker(q):
    loop = asyncio.get_event_loop()
    #loop.set_debug(True)
    connector = aiohttp.TCPConnector(force_close=False, keepalive_timeout=8, use_dns_cache=True)
    session = aiohttp.ClientSession(connector=connector)
    async_queue = asyncio.Queue(100)

    @asyncio.coroutine
    def async_worker(session, async_queue):
        while True:
            try:
                print("blocking on asyncio queue get")
                url = yield from async_queue.get()
                print("unblocking on asyncio queue get")
                print("get aqueue size:", async_queue.qsize())
                response = yield from session.request('GET', url)
                try:
                    data = yield from response.read()
                    print(data)
                finally:
                    yield from response.wait_for_close()
            except:
                traceback.print_exc()

    def producer(q):
        print("blocking on multiprocessing queue get")
        obj2 = q.get()
        print("unblocking on multiprocessing queue get")
        print("get qempty:", q.empty())
        return obj2

    def worker_done(f):
        try:
            f.result()
            print("worker exited")
        except:
            traceback.print_exc()

    workers = []
    for i in range(100):
        t = asyncio.ensure_future(async_worker(session, async_queue))
        t.add_done_callback(worker_done)
        workers.append(t)

    @asyncio.coroutine
    def doit():
        print("start producer")
        obj = yield from loop.run_in_executor(None, producer, q)
        print("finish producer")

        print("blocking on asyncio queue put")
        yield from async_queue.put(obj)
        print("unblocking on asyncio queue put")
        print("put aqueue size:", async_queue.qsize())

    while True:
        loop.run_until_complete(doit())


def server():
    loop = asyncio.get_event_loop()
    #loop.set_debug(True)

    f = loop.create_server(lambda: HttpRequestHandler(debug=True, keep_alive=75), '0.0.0.0', '8080')

    srv = loop.run_until_complete(f)
    loop.run_forever()


if __name__ == '__main__':
    q = multiprocessing.Queue(100)

    log_proc = multiprocessing.log_to_stderr()
    log_proc.setLevel(logging.DEBUG)

    p = multiprocessing.Process(target=process_worker, args=(q,))
    p.start()

    p2 = multiprocessing.Process(target=server)
    p2.start()

    while True:
        print("blocking on multiprocessing queue put")
        q.put("http://0.0.0.0:8080")
        print("unblocking on multiprocessing queue put")

        print("put qempty:", q.empty())

----------

_______________________________________
Python tracker <report at bugs.python.org>
<http://bugs.python.org/issue25593>
_______________________________________


More information about the Python-bugs-list mailing list