Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_transports/base.py: 71%
21 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1import typing
2from types import TracebackType
4from .._models import Request, Response
6T = typing.TypeVar("T", bound="BaseTransport")
7A = typing.TypeVar("A", bound="AsyncBaseTransport")
10class BaseTransport:
11 def __enter__(self: T) -> T:
12 return self
14 def __exit__(
15 self,
16 exc_type: typing.Optional[typing.Type[BaseException]] = None,
17 exc_value: typing.Optional[BaseException] = None,
18 traceback: typing.Optional[TracebackType] = None,
19 ) -> None:
20 self.close()
22 def handle_request(self, request: Request) -> Response:
23 """
24 Send a single HTTP request and return a response.
26 Developers shouldn't typically ever need to call into this API directly,
27 since the Client class provides all the higher level user-facing API
28 niceties.
30 In order to properly release any network resources, the response
31 stream should *either* be consumed immediately, with a call to
32 `response.stream.read()`, or else the `handle_request` call should
33 be followed with a try/finally block to ensuring the stream is
34 always closed.
36 Example usage:
38 with httpx.HTTPTransport() as transport:
39 req = httpx.Request(
40 method=b"GET",
41 url=(b"https", b"www.example.com", 443, b"/"),
42 headers=[(b"Host", b"www.example.com")],
43 )
44 resp = transport.handle_request(req)
45 body = resp.stream.read()
46 print(resp.status_code, resp.headers, body)
49 Takes a `Request` instance as the only argument.
51 Returns a `Response` instance.
52 """
53 raise NotImplementedError(
54 "The 'handle_request' method must be implemented."
55 ) # pragma: no cover
57 def close(self) -> None:
58 pass
61class AsyncBaseTransport:
62 async def __aenter__(self: A) -> A:
63 return self
65 async def __aexit__(
66 self,
67 exc_type: typing.Optional[typing.Type[BaseException]] = None,
68 exc_value: typing.Optional[BaseException] = None,
69 traceback: typing.Optional[TracebackType] = None,
70 ) -> None:
71 await self.aclose()
73 async def handle_async_request(
74 self,
75 request: Request,
76 ) -> Response:
77 raise NotImplementedError(
78 "The 'handle_async_request' method must be implemented."
79 ) # pragma: no cover
81 async def aclose(self) -> None:
82 pass