Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_transports/base.py: 71%

21 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1import typing 

2from types import TracebackType 

3 

4from .._models import Request, Response 

5 

6T = typing.TypeVar("T", bound="BaseTransport") 

7A = typing.TypeVar("A", bound="AsyncBaseTransport") 

8 

9 

10class BaseTransport: 

11 def __enter__(self: T) -> T: 

12 return self 

13 

14 def __exit__( 

15 self, 

16 exc_type: typing.Optional[typing.Type[BaseException]] = None, 

17 exc_value: typing.Optional[BaseException] = None, 

18 traceback: typing.Optional[TracebackType] = None, 

19 ) -> None: 

20 self.close() 

21 

22 def handle_request(self, request: Request) -> Response: 

23 """ 

24 Send a single HTTP request and return a response. 

25 

26 Developers shouldn't typically ever need to call into this API directly, 

27 since the Client class provides all the higher level user-facing API 

28 niceties. 

29 

30 In order to properly release any network resources, the response 

31 stream should *either* be consumed immediately, with a call to 

32 `response.stream.read()`, or else the `handle_request` call should 

33 be followed with a try/finally block to ensuring the stream is 

34 always closed. 

35 

36 Example usage: 

37 

38 with httpx.HTTPTransport() as transport: 

39 req = httpx.Request( 

40 method=b"GET", 

41 url=(b"https", b"www.example.com", 443, b"/"), 

42 headers=[(b"Host", b"www.example.com")], 

43 ) 

44 resp = transport.handle_request(req) 

45 body = resp.stream.read() 

46 print(resp.status_code, resp.headers, body) 

47 

48 

49 Takes a `Request` instance as the only argument. 

50 

51 Returns a `Response` instance. 

52 """ 

53 raise NotImplementedError( 

54 "The 'handle_request' method must be implemented." 

55 ) # pragma: no cover 

56 

57 def close(self) -> None: 

58 pass 

59 

60 

61class AsyncBaseTransport: 

62 async def __aenter__(self: A) -> A: 

63 return self 

64 

65 async def __aexit__( 

66 self, 

67 exc_type: typing.Optional[typing.Type[BaseException]] = None, 

68 exc_value: typing.Optional[BaseException] = None, 

69 traceback: typing.Optional[TracebackType] = None, 

70 ) -> None: 

71 await self.aclose() 

72 

73 async def handle_async_request( 

74 self, 

75 request: Request, 

76 ) -> Response: 

77 raise NotImplementedError( 

78 "The 'handle_async_request' method must be implemented." 

79 ) # pragma: no cover 

80 

81 async def aclose(self) -> None: 

82 pass