1from __future__ import annotations
2
3import contextlib
4import typing
5
6from .._models import (
7 URL,
8 Extensions,
9 HeaderTypes,
10 Origin,
11 Request,
12 Response,
13 enforce_bytes,
14 enforce_headers,
15 enforce_url,
16 include_request_headers,
17)
18
19
20class RequestInterface:
21 def request(
22 self,
23 method: bytes | str,
24 url: URL | bytes | str,
25 *,
26 headers: HeaderTypes = None,
27 content: bytes | typing.Iterator[bytes] | None = None,
28 extensions: Extensions | None = None,
29 ) -> Response:
30 # Strict type checking on our parameters.
31 method = enforce_bytes(method, name="method")
32 url = enforce_url(url, name="url")
33 headers = enforce_headers(headers, name="headers")
34
35 # Include Host header, and optionally Content-Length or Transfer-Encoding.
36 headers = include_request_headers(headers, url=url, content=content)
37
38 request = Request(
39 method=method,
40 url=url,
41 headers=headers,
42 content=content,
43 extensions=extensions,
44 )
45 response = self.handle_request(request)
46 try:
47 response.read()
48 finally:
49 response.close()
50 return response
51
52 @contextlib.contextmanager
53 def stream(
54 self,
55 method: bytes | str,
56 url: URL | bytes | str,
57 *,
58 headers: HeaderTypes = None,
59 content: bytes | typing.Iterator[bytes] | None = None,
60 extensions: Extensions | None = None,
61 ) -> typing.Iterator[Response]:
62 # Strict type checking on our parameters.
63 method = enforce_bytes(method, name="method")
64 url = enforce_url(url, name="url")
65 headers = enforce_headers(headers, name="headers")
66
67 # Include Host header, and optionally Content-Length or Transfer-Encoding.
68 headers = include_request_headers(headers, url=url, content=content)
69
70 request = Request(
71 method=method,
72 url=url,
73 headers=headers,
74 content=content,
75 extensions=extensions,
76 )
77 response = self.handle_request(request)
78 try:
79 yield response
80 finally:
81 response.close()
82
83 def handle_request(self, request: Request) -> Response:
84 raise NotImplementedError() # pragma: nocover
85
86
87class ConnectionInterface(RequestInterface):
88 def close(self) -> None:
89 raise NotImplementedError() # pragma: nocover
90
91 def info(self) -> str:
92 raise NotImplementedError() # pragma: nocover
93
94 def can_handle_request(self, origin: Origin) -> bool:
95 raise NotImplementedError() # pragma: nocover
96
97 def is_available(self) -> bool:
98 """
99 Return `True` if the connection is currently able to accept an
100 outgoing request.
101
102 An HTTP/1.1 connection will only be available if it is currently idle.
103
104 An HTTP/2 connection will be available so long as the stream ID space is
105 not yet exhausted, and the connection is not in an error state.
106
107 While the connection is being established we may not yet know if it is going
108 to result in an HTTP/1.1 or HTTP/2 connection. The connection should be
109 treated as being available, but might ultimately raise `NewConnectionRequired`
110 required exceptions if multiple requests are attempted over a connection
111 that ends up being established as HTTP/1.1.
112 """
113 raise NotImplementedError() # pragma: nocover
114
115 def has_expired(self) -> bool:
116 """
117 Return `True` if the connection is in a state where it should be closed.
118
119 This either means that the connection is idle and it has passed the
120 expiry time on its keep-alive, or that server has sent an EOF.
121 """
122 raise NotImplementedError() # pragma: nocover
123
124 def is_idle(self) -> bool:
125 """
126 Return `True` if the connection is currently idle.
127 """
128 raise NotImplementedError() # pragma: nocover
129
130 def is_closed(self) -> bool:
131 """
132 Return `True` if the connection has been closed.
133
134 Used when a response is closed to determine if the connection may be
135 returned to the connection pool or not.
136 """
137 raise NotImplementedError() # pragma: nocover