1from __future__ import annotations
2
3import io
4import typing
5from base64 import b64encode
6from enum import Enum
7
8from ..exceptions import UnrewindableBodyError
9from .util import to_bytes
10
11if typing.TYPE_CHECKING:
12 from typing import Final
13
14# Pass as a value within ``headers`` to skip
15# emitting some HTTP headers that are added automatically.
16# The only headers that are supported are ``Accept-Encoding``,
17# ``Host``, and ``User-Agent``.
18SKIP_HEADER = "@@@SKIP_HEADER@@@"
19SKIPPABLE_HEADERS = frozenset(["accept-encoding", "host", "user-agent"])
20
21ACCEPT_ENCODING = "gzip,deflate"
22try:
23 try:
24 import brotlicffi as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401
25 except ImportError:
26 import brotli as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401
27except ImportError:
28 pass
29else:
30 ACCEPT_ENCODING += ",br"
31try:
32 import zstandard as _unused_module_zstd # noqa: F401
33except ImportError:
34 pass
35else:
36 ACCEPT_ENCODING += ",zstd"
37
38
39class _TYPE_FAILEDTELL(Enum):
40 token = 0
41
42
43_FAILEDTELL: Final[_TYPE_FAILEDTELL] = _TYPE_FAILEDTELL.token
44
45_TYPE_BODY_POSITION = typing.Union[int, _TYPE_FAILEDTELL]
46
47# When sending a request with these methods we aren't expecting
48# a body so don't need to set an explicit 'Content-Length: 0'
49# The reason we do this in the negative instead of tracking methods
50# which 'should' have a body is because unknown methods should be
51# treated as if they were 'POST' which *does* expect a body.
52_METHODS_NOT_EXPECTING_BODY = {"GET", "HEAD", "DELETE", "TRACE", "OPTIONS", "CONNECT"}
53
54
55def make_headers(
56 keep_alive: bool | None = None,
57 accept_encoding: bool | list[str] | str | None = None,
58 user_agent: str | None = None,
59 basic_auth: str | None = None,
60 proxy_basic_auth: str | None = None,
61 disable_cache: bool | None = None,
62) -> dict[str, str]:
63 """
64 Shortcuts for generating request headers.
65
66 :param keep_alive:
67 If ``True``, adds 'connection: keep-alive' header.
68
69 :param accept_encoding:
70 Can be a boolean, list, or string.
71 ``True`` translates to 'gzip,deflate'. If the dependencies for
72 Brotli (either the ``brotli`` or ``brotlicffi`` package) and/or Zstandard
73 (the ``zstandard`` package) algorithms are installed, then their encodings are
74 included in the string ('br' and 'zstd', respectively).
75 List will get joined by comma.
76 String will be used as provided.
77
78 :param user_agent:
79 String representing the user-agent you want, such as
80 "python-urllib3/0.6"
81
82 :param basic_auth:
83 Colon-separated username:password string for 'authorization: basic ...'
84 auth header.
85
86 :param proxy_basic_auth:
87 Colon-separated username:password string for 'proxy-authorization: basic ...'
88 auth header.
89
90 :param disable_cache:
91 If ``True``, adds 'cache-control: no-cache' header.
92
93 Example:
94
95 .. code-block:: python
96
97 import urllib3
98
99 print(urllib3.util.make_headers(keep_alive=True, user_agent="Batman/1.0"))
100 # {'connection': 'keep-alive', 'user-agent': 'Batman/1.0'}
101 print(urllib3.util.make_headers(accept_encoding=True))
102 # {'accept-encoding': 'gzip,deflate'}
103 """
104 headers: dict[str, str] = {}
105 if accept_encoding:
106 if isinstance(accept_encoding, str):
107 pass
108 elif isinstance(accept_encoding, list):
109 accept_encoding = ",".join(accept_encoding)
110 else:
111 accept_encoding = ACCEPT_ENCODING
112 headers["accept-encoding"] = accept_encoding
113
114 if user_agent:
115 headers["user-agent"] = user_agent
116
117 if keep_alive:
118 headers["connection"] = "keep-alive"
119
120 if basic_auth:
121 headers["authorization"] = (
122 f"Basic {b64encode(basic_auth.encode('latin-1')).decode()}"
123 )
124
125 if proxy_basic_auth:
126 headers["proxy-authorization"] = (
127 f"Basic {b64encode(proxy_basic_auth.encode('latin-1')).decode()}"
128 )
129
130 if disable_cache:
131 headers["cache-control"] = "no-cache"
132
133 return headers
134
135
136def set_file_position(
137 body: typing.Any, pos: _TYPE_BODY_POSITION | None
138) -> _TYPE_BODY_POSITION | None:
139 """
140 If a position is provided, move file to that point.
141 Otherwise, we'll attempt to record a position for future use.
142 """
143 if pos is not None:
144 rewind_body(body, pos)
145 elif getattr(body, "tell", None) is not None:
146 try:
147 pos = body.tell()
148 except OSError:
149 # This differentiates from None, allowing us to catch
150 # a failed `tell()` later when trying to rewind the body.
151 pos = _FAILEDTELL
152
153 return pos
154
155
156def rewind_body(body: typing.IO[typing.AnyStr], body_pos: _TYPE_BODY_POSITION) -> None:
157 """
158 Attempt to rewind body to a certain position.
159 Primarily used for request redirects and retries.
160
161 :param body:
162 File-like object that supports seek.
163
164 :param int pos:
165 Position to seek to in file.
166 """
167 body_seek = getattr(body, "seek", None)
168 if body_seek is not None and isinstance(body_pos, int):
169 try:
170 body_seek(body_pos)
171 except OSError as e:
172 raise UnrewindableBodyError(
173 "An error occurred when rewinding request body for redirect/retry."
174 ) from e
175 elif body_pos is _FAILEDTELL:
176 raise UnrewindableBodyError(
177 "Unable to record file position for rewinding "
178 "request body during a redirect/retry."
179 )
180 else:
181 raise ValueError(
182 f"body_pos must be of type integer, instead it was {type(body_pos)}."
183 )
184
185
186class ChunksAndContentLength(typing.NamedTuple):
187 chunks: typing.Iterable[bytes] | None
188 content_length: int | None
189
190
191def body_to_chunks(
192 body: typing.Any | None, method: str, blocksize: int
193) -> ChunksAndContentLength:
194 """Takes the HTTP request method, body, and blocksize and
195 transforms them into an iterable of chunks to pass to
196 socket.sendall() and an optional 'Content-Length' header.
197
198 A 'Content-Length' of 'None' indicates the length of the body
199 can't be determined so should use 'Transfer-Encoding: chunked'
200 for framing instead.
201 """
202
203 chunks: typing.Iterable[bytes] | None
204 content_length: int | None
205
206 # No body, we need to make a recommendation on 'Content-Length'
207 # based on whether that request method is expected to have
208 # a body or not.
209 if body is None:
210 chunks = None
211 if method.upper() not in _METHODS_NOT_EXPECTING_BODY:
212 content_length = 0
213 else:
214 content_length = None
215
216 # Bytes or strings become bytes
217 elif isinstance(body, (str, bytes)):
218 chunks = (to_bytes(body),)
219 content_length = len(chunks[0])
220
221 # File-like object, TODO: use seek() and tell() for length?
222 elif hasattr(body, "read"):
223
224 def chunk_readable() -> typing.Iterable[bytes]:
225 nonlocal body, blocksize
226 encode = isinstance(body, io.TextIOBase)
227 while True:
228 datablock = body.read(blocksize)
229 if not datablock:
230 break
231 if encode:
232 datablock = datablock.encode("utf-8")
233 yield datablock
234
235 chunks = chunk_readable()
236 content_length = None
237
238 # Otherwise we need to start checking via duck-typing.
239 else:
240 try:
241 # Check if the body implements the buffer API.
242 mv = memoryview(body)
243 except TypeError:
244 try:
245 # Check if the body is an iterable
246 chunks = iter(body)
247 content_length = None
248 except TypeError:
249 raise TypeError(
250 f"'body' must be a bytes-like object, file-like "
251 f"object, or iterable. Instead was {body!r}"
252 ) from None
253 else:
254 # Since it implements the buffer API can be passed directly to socket.sendall()
255 chunks = (body,)
256 content_length = mv.nbytes
257
258 return ChunksAndContentLength(chunks=chunks, content_length=content_length)