Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_transports/mock.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

19 statements  

1from __future__ import annotations 

2 

3import typing 

4 

5from .._models import Request, Response 

6from .base import AsyncBaseTransport, BaseTransport 

7 

8SyncHandler = typing.Callable[[Request], Response] 

9AsyncHandler = typing.Callable[[Request], typing.Coroutine[None, None, Response]] 

10 

11 

12class MockTransport(AsyncBaseTransport, BaseTransport): 

13 def __init__(self, handler: SyncHandler | AsyncHandler) -> None: 

14 self.handler = handler 

15 

16 def handle_request( 

17 self, 

18 request: Request, 

19 ) -> Response: 

20 request.read() 

21 response = self.handler(request) 

22 if not isinstance(response, Response): # pragma: no cover 

23 raise TypeError("Cannot use an async handler in a sync Client") 

24 return response 

25 

26 async def handle_async_request( 

27 self, 

28 request: Request, 

29 ) -> Response: 

30 await request.aread() 

31 response = self.handler(request) 

32 

33 # Allow handler to *optionally* be an `async` function. 

34 # If it is, then the `response` variable need to be awaited to actually 

35 # return the result. 

36 

37 if not isinstance(response, Response): 

38 response = await response 

39 

40 return response