Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add note about lifespans in multi-thread environments #277

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

graingert
Copy link
Contributor

No description provided.

specs/lifespan.rst Outdated Show resolved Hide resolved
@graingert
Copy link
Contributor Author

asgi.rst states that:

ASGI attempts to preserve a simple application interface, while providing an abstraction that allows for data to be sent and received at any time, and from different application threads or processes.

however most asgi apps seem to assume that only one lifespan task will be in progress per process eg: https://fastapi.tiangolo.com/advanced/async-sql-databases/

asgi frameworks should use a RunVar to make sure they create and teardown async resources per lifespan not per process

@graingert
Copy link
Contributor Author

graingert commented Jun 30, 2021

It's also possible to run two eventloops in one thread. Eg two asyncio eventloops using alternating .run_until_complete calls. or trio guest mode. A RunVar will also handle this scenario

@graingert
Copy link
Contributor Author

Alternatively asgiref should specify that once a web server starts a lifespan task it MUST NOT call the asgi app from any other thread or event loop until the lifespan task terminates

@andrewgodwin
Copy link
Member

Hm, I had not envisioned that lifespan would be once-per-thread - I think that confuses things unless you're thinking only in terms of a given set of fixed threads spun up at the start of the process, as opposed to launching them dynamically.

What's the problem you're trying to solve here, exactly? Lifespan events are meant to be per-process for very basic startup tasks, so it would seem that saying they should be thread-pinned is a reasonable response.

@graingert
Copy link
Contributor Author

graingert commented Jun 30, 2021

I'm using lifespan to manage a trio Nursery via anyio. This doc change would mean I and quart-trio would need to use a RunVar to store the Nursery in

@andrewgodwin
Copy link
Member

Unfortunately I'm not super familiar with trio, so if you could explain the impact of both the options on the code you're writing that would be great - what are the side effects, what would you prefer, etc?

@graingert
Copy link
Contributor Author

Sure I'll see what I can do

@graingert
Copy link
Contributor Author

consider an asgi app and threaded server:

from __future__ import annotations

import sys
import asyncio

import anyio.abc
import traceback
import concurrent.futures


async def background_async_fn():
    print("processing")
    await anyio.sleep(1)
    print("done")


class App:
    task_group: anyio.abc.TaskGroup

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        assert scope["type"] in ("http", "lifespan")

        if scope["type"] == "lifespan":
            await self.lifespan(scope, receive, send)
            return

        await self.http(scope, receive, send)
        return

    async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
        app = scope.get("app")
        try:
            async with anyio.create_task_group() as app.task_group:
                await send({"type": "lifespan.startup.complete"})
                await receive()
        except BaseException:
            exc_text = traceback.format_exc()
            await send({"type": "lifespan.shutdown.failed", "message": exc_text})
        else:
            await send({"type": "lifespan.shutdown.complete"})

    async def http(self, scope: Scope, receive: Receive, send: Send) -> None:
        self.task_group.start_soon(background_async_fn)
        await send({
            'type': 'http.response.start',
            'status': 200,
            'headers': [
                [b'content-type', b'text/plain'],
            ]
        })
        await send({
            'type': 'http.response.body',
            'body': b'Hello, world!',
        })


async def lifespan(started, done, app):

    async def receive():
        return await done.wait()

    async def send(event, *args):
        started.set()
        print(event.get("message"))

    await app({"type": "lifespan", "app": app}, receive, send)


async def serve(app):
    done = asyncio.Event()
    started = asyncio.Event()
    task = asyncio.create_task(lifespan(started, done, app))

    async def receive():
        return {"type": "http.disconnect"}

    async def send(*args):
        pass

    await started.wait()
    await app({"type": "http", "app": app}, receive, send)
    done.set()
    await task


def asgi_tpe_server(app):
    with concurrent.futures.ThreadPoolExecutor(5) as tpe:
        futs = [tpe.submit(asyncio.run, serve(app)) for _ in range(5)]

        for v in concurrent.futures.as_completed(futs):
            print(v.result())


def main():
    app = App()
    asgi_tpe_server(app)
    return 0


if __name__ == "__main__":
    sys.exit(main())

When I run it I get:

None
None
processing
None
processing
None
processing
None
None
None
None
processing
processing
Traceback (most recent call last):
  File "foo.py", line 35, in lifespan
    await receive()
  File "/home/graingert/.virtualenvs/starlette/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 546, in __aexit__
    await asyncio.wait(self.cancel_scope._tasks)
  File "/usr/lib/python3.8/asyncio/tasks.py", line 424, in wait
    fs = {ensure_future(f, loop=loop) for f in set(fs)}
  File "/usr/lib/python3.8/asyncio/tasks.py", line 424, in <setcomp>
    fs = {ensure_future(f, loop=loop) for f in set(fs)}
  File "/usr/lib/python3.8/asyncio/tasks.py", line 667, in ensure_future
    raise ValueError('The future belongs to a different loop than '
ValueError: The future belongs to a different loop than the one specified as the loop argument

None
None
None
Traceback (most recent call last):
  File "foo.py", line 35, in lifespan
    await receive()
  File "foo.py", line 60, in receive
    return await done.wait()
  File "/usr/lib/python3.8/asyncio/locks.py", line 309, in wait
    await fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "foo.py", line 35, in lifespan
    await receive()
  File "/home/graingert/.virtualenvs/starlette/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 546, in __aexit__
    await asyncio.wait(self.cancel_scope._tasks)
  File "/usr/lib/python3.8/asyncio/tasks.py", line 424, in wait
    fs = {ensure_future(f, loop=loop) for f in set(fs)}
  File "/usr/lib/python3.8/asyncio/tasks.py", line 424, in <setcomp>
    fs = {ensure_future(f, loop=loop) for f in set(fs)}
  File "/usr/lib/python3.8/asyncio/tasks.py", line 667, in ensure_future
    raise ValueError('The future belongs to a different loop than '
ValueError: The future belongs to a different loop than the one specified as the loop argument

None
None

@graingert
Copy link
Contributor Author

graingert commented Jun 30, 2021

however I can fix the app by using an anyio.lowlevel.RunVar to store the task_group in:

from __future__ import annotations

import sys
import asyncio

import anyio.abc
import anyio.lowlevel
import traceback
import concurrent.futures


async def background_async_fn():
    print("processing")
    await anyio.sleep(1)
    print("done")




class App:
    def __init__(self):
        self.task_group_runvar = anyio.lowlevel.RunVar[anyio.abc.TaskGroup]("task_group")


    @property
    def task_group(self):
        return self.task_group_runvar.get()

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        assert scope["type"] in ("http", "lifespan")

        if scope["type"] == "lifespan":
            await self.lifespan(scope, receive, send)
            return

        await self.http(scope, receive, send)
        return

    async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
        app = scope.get("app")
        try:
            async with anyio.create_task_group() as tg:
                app.task_group_runvar.set(tg)
                await send({"type": "lifespan.startup.complete"})
                await receive()
        except BaseException:
            exc_text = traceback.format_exc()
            await send({"type": "lifespan.shutdown.failed", "message": exc_text})
        else:
            await send({"type": "lifespan.shutdown.complete"})

    async def http(self, scope: Scope, receive: Receive, send: Send) -> None:
        self.task_group.start_soon(background_async_fn)
        await send({
            'type': 'http.response.start',
            'status': 200,
            'headers': [
                [b'content-type', b'text/plain'],
            ]
        })
        await send({
            'type': 'http.response.body',
            'body': b'Hello, world!',
        })


async def lifespan(started, done, app):

    async def receive():
        return await done.wait()

    async def send(event, *args):
        started.set()
        print(event.get("message"))

    await app({"type": "lifespan", "app": app}, receive, send)


async def serve(app):
    done = asyncio.Event()
    started = asyncio.Event()
    task = asyncio.create_task(lifespan(started, done, app))

    async def receive():
        return {"type": "http.disconnect"}

    async def send(*args):
        pass

    await started.wait()
    await app({"type": "http", "app": app}, receive, send)
    done.set()
    await task


def asgi_tpe_server(app):
    with concurrent.futures.ThreadPoolExecutor(5) as tpe:
        futs = [tpe.submit(asyncio.run, serve(app)) for _ in range(5)]

        for v in concurrent.futures.as_completed(futs):
            print(v.result())


def main():
    app = App()
    asgi_tpe_server(app)
    return 0


if __name__ == "__main__":
    sys.exit(main())
    ```

@graingert
Copy link
Contributor Author

graingert commented Jun 30, 2021

anyio.lowlevel.RunVar is half-way between a threadlocal and a contextvar. It's driven by a WeakKeyDict keyed by the asyncio.get_running_loop() or trio.lowlevel.current_token()

@adriangb
Copy link
Contributor

Coming from #322 (comment)

I think one important guarantee is that the lifespan event and any requests that are processed are in the same event loop. If the server spins up multiple threads or multiple event loops, I think it should run the lifespan events once per event loop. #322 would help because it provides a clear way for the application to store state without caring about the execution model of the ASGI server.

@graingert please correct me if I'm wrong, but if we said that lifespans must be run once per thread and that the ASGI server can spin up multiple event loops per thread, wouldn't that result in having some async resources initialized in a one event loop and then used in another? I think that would break things. So to be safe the guarantee would need to be that the lifespan and requests are run in the same event loop.

@adriangb
Copy link
Contributor

I think that confuses things unless you're thinking only in terms of a given set of fixed threads spun up at the start of the process, as opposed to launching them dynamically

@andrewgodwin are there to your knowledge any ASGI servers that run lifespans in different threads than requests, or even in different event loops?

@andrewgodwin
Copy link
Member

I don't have much insight into the innards of the ASGI servers around at the moment - I don't remember any that do strange things with multiple event loops, but I can't rule it out.

@adriangb
Copy link
Contributor

I checked and both Uvicorn and Hypercorn use a single event loop. Daphne doesn't support lifespans at all but it looks like it does use multiple threads and dispatches to them.

Maybe something like "ASGI servers supporting lifespans should run the lifespan event once per event loop" makes sense?

Passing event loops between threads is dubious at best, so this would also mean "lifespan events should be run for each thread" (the current change this PR is proposing).

@adriangb
Copy link
Contributor

adriangb commented Jul 5, 2022

So to be safe the guarantee would need to be that the lifespan and requests are run in the same event loop.

From discussion today with @pgjones, if we take this just one step further and say "requests must be handled as child tasks/child contexts of the lifespan task/context" we'd also make sure context vars work correctly across lifespans/requests (they only currently work correctly when using hypercorn with trio, they don't work in Uvicorn), which solves #322

@adriangb
Copy link
Contributor

Does he recently merged #338 resolve this @graingert ?

@graingert
Copy link
Contributor Author

Yeah #338 looks great. I'd like to see an example in the docs though, eg:
Bad:

database = Database()

@contextlib.asynccontextmanager
async def lifespan_context(app):
    async with database: yield

app = App(lifespan_context=lifespan_context)
app.database = database

Good:

@contextlib.asynccontextmanager
async def lifespan_context(app):
    async with Database() as database:
        app.database = database
        yield

def create_app():
    return App(lifespan_context=lifespan_context)

@adriangb
Copy link
Contributor

adriangb commented Jul 26, 2022

I'd like to understand a bit more what's "bad" about the first version. Things I'm spotting:

  1. database = Database() outside of an event loop could be problematic (depends on what Database's constructor does, e.g. if it creates an async resource).
  2. setting database as a global variable means that if requests are processed in multiple threads that object crosses a thread boundary (and that may not be safe)

I think the second example "solves" the first issue certainly because the object is created inside of an event loop. It maybe solves the second issue as long as the server calls create_app() once per thread and runs the lifespan once per app instance. The wording we just added in #338 implies the server should run the lifespan once per event loop (and so at least once per thread that is processing requests), but there is nothing in the ASGI spec relating to create_app() or to having multiple copies of some app object at all, so as it stands it would be "valid" for the server to just run the lifespan on the same app instance (that it got from the module level or from some create_app() factory).

Did I get this right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants