Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/middleware/lifespan.py: 54%
13 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import typing as t
3from starlette.routing import Router
4from starlette.types import ASGIApp, Receive, Scope, Send
6Lifespan = t.Callable[[t.Any], t.AsyncContextManager]
9class LifespanMiddleware:
10 """
11 Middleware that adds support for Starlette lifespan handlers
12 (https://www.starlette.io/lifespan/).
13 """
15 def __init__(
16 self, next_app: ASGIApp, *, lifespan: t.Optional[Lifespan], **kwargs
17 ) -> None:
18 self.next_app = next_app
19 self._lifespan = lifespan
20 # Leverage a Starlette Router for lifespan handling only
21 self.router = Router(lifespan=lifespan)
23 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
24 # If no lifespan is registered, pass to next app so it can be handled downstream.
25 if scope["type"] == "lifespan" and self._lifespan:
26 await self.router(scope, receive, send)
27 else:
28 await self.next_app(scope, receive, send)