Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/connexion/middleware/lifespan.py: 54%

13 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1import typing as t 

2 

3from starlette.routing import Router 

4from starlette.types import ASGIApp, Receive, Scope, Send 

5 

6Lifespan = t.Callable[[t.Any], t.AsyncContextManager] 

7 

8 

9class LifespanMiddleware: 

10 """ 

11 Middleware that adds support for Starlette lifespan handlers 

12 (https://www.starlette.io/lifespan/). 

13 """ 

14 

15 def __init__( 

16 self, next_app: ASGIApp, *, lifespan: t.Optional[Lifespan], **kwargs 

17 ) -> None: 

18 self.next_app = next_app 

19 self._lifespan = lifespan 

20 # Leverage a Starlette Router for lifespan handling only 

21 self.router = Router(lifespan=lifespan) 

22 

23 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: 

24 # If no lifespan is registered, pass to next app so it can be handled downstream. 

25 if scope["type"] == "lifespan" and self._lifespan: 

26 await self.router(scope, receive, send) 

27 else: 

28 await self.next_app(scope, receive, send)