1from __future__ import annotations
2
3import io
4import typing
5from base64 import b64encode
6from enum import Enum
7
8from ..exceptions import UnrewindableBodyError
9from .util import to_bytes
10
11if typing.TYPE_CHECKING:
12 from typing import Final
13
14# Pass as a value within ``headers`` to skip
15# emitting some HTTP headers that are added automatically.
16# The only headers that are supported are ``Accept-Encoding``,
17# ``Host``, and ``User-Agent``.
18SKIP_HEADER = "@@@SKIP_HEADER@@@"
19SKIPPABLE_HEADERS = frozenset(["accept-encoding", "host", "user-agent"])
20
21ACCEPT_ENCODING = "gzip,deflate"
22try:
23 try:
24 import brotlicffi as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401
25 except ImportError:
26 import brotli as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401
27except ImportError:
28 pass
29else:
30 ACCEPT_ENCODING += ",br"
31
32try:
33 from compression import ( # type: ignore[import-not-found] # noqa: F401
34 zstd as _unused_module_zstd,
35 )
36
37 ACCEPT_ENCODING += ",zstd"
38except ImportError:
39 try:
40 import zstandard as _unused_module_zstd # noqa: F401
41
42 ACCEPT_ENCODING += ",zstd"
43 except ImportError:
44 pass
45
46
47class _TYPE_FAILEDTELL(Enum):
48 token = 0
49
50
51_FAILEDTELL: Final[_TYPE_FAILEDTELL] = _TYPE_FAILEDTELL.token
52
53_TYPE_BODY_POSITION = typing.Union[int, _TYPE_FAILEDTELL]
54
55# When sending a request with these methods we aren't expecting
56# a body so don't need to set an explicit 'Content-Length: 0'
57# The reason we do this in the negative instead of tracking methods
58# which 'should' have a body is because unknown methods should be
59# treated as if they were 'POST' which *does* expect a body.
60_METHODS_NOT_EXPECTING_BODY = {"GET", "HEAD", "DELETE", "TRACE", "OPTIONS", "CONNECT"}
61
62
63def make_headers(
64 keep_alive: bool | None = None,
65 accept_encoding: bool | list[str] | str | None = None,
66 user_agent: str | None = None,
67 basic_auth: str | None = None,
68 proxy_basic_auth: str | None = None,
69 disable_cache: bool | None = None,
70) -> dict[str, str]:
71 """
72 Shortcuts for generating request headers.
73
74 :param keep_alive:
75 If ``True``, adds 'connection: keep-alive' header.
76
77 :param accept_encoding:
78 Can be a boolean, list, or string.
79 ``True`` translates to 'gzip,deflate'. If the dependencies for
80 Brotli (either the ``brotli`` or ``brotlicffi`` package) and/or Zstandard
81 (the ``zstandard`` package) algorithms are installed, then their encodings are
82 included in the string ('br' and 'zstd', respectively).
83 List will get joined by comma.
84 String will be used as provided.
85
86 :param user_agent:
87 String representing the user-agent you want, such as
88 "python-urllib3/0.6"
89
90 :param basic_auth:
91 Colon-separated username:password string for 'authorization: basic ...'
92 auth header.
93
94 :param proxy_basic_auth:
95 Colon-separated username:password string for 'proxy-authorization: basic ...'
96 auth header.
97
98 :param disable_cache:
99 If ``True``, adds 'cache-control: no-cache' header.
100
101 Example:
102
103 .. code-block:: python
104
105 import urllib3
106
107 print(urllib3.util.make_headers(keep_alive=True, user_agent="Batman/1.0"))
108 # {'connection': 'keep-alive', 'user-agent': 'Batman/1.0'}
109 print(urllib3.util.make_headers(accept_encoding=True))
110 # {'accept-encoding': 'gzip,deflate'}
111 """
112 headers: dict[str, str] = {}
113 if accept_encoding:
114 if isinstance(accept_encoding, str):
115 pass
116 elif isinstance(accept_encoding, list):
117 accept_encoding = ",".join(accept_encoding)
118 else:
119 accept_encoding = ACCEPT_ENCODING
120 headers["accept-encoding"] = accept_encoding
121
122 if user_agent:
123 headers["user-agent"] = user_agent
124
125 if keep_alive:
126 headers["connection"] = "keep-alive"
127
128 if basic_auth:
129 headers["authorization"] = (
130 f"Basic {b64encode(basic_auth.encode('latin-1')).decode()}"
131 )
132
133 if proxy_basic_auth:
134 headers["proxy-authorization"] = (
135 f"Basic {b64encode(proxy_basic_auth.encode('latin-1')).decode()}"
136 )
137
138 if disable_cache:
139 headers["cache-control"] = "no-cache"
140
141 return headers
142
143
144def set_file_position(
145 body: typing.Any, pos: _TYPE_BODY_POSITION | None
146) -> _TYPE_BODY_POSITION | None:
147 """
148 If a position is provided, move file to that point.
149 Otherwise, we'll attempt to record a position for future use.
150 """
151 if pos is not None:
152 rewind_body(body, pos)
153 elif getattr(body, "tell", None) is not None:
154 try:
155 pos = body.tell()
156 except OSError:
157 # This differentiates from None, allowing us to catch
158 # a failed `tell()` later when trying to rewind the body.
159 pos = _FAILEDTELL
160
161 return pos
162
163
164def rewind_body(body: typing.IO[typing.AnyStr], body_pos: _TYPE_BODY_POSITION) -> None:
165 """
166 Attempt to rewind body to a certain position.
167 Primarily used for request redirects and retries.
168
169 :param body:
170 File-like object that supports seek.
171
172 :param int pos:
173 Position to seek to in file.
174 """
175 body_seek = getattr(body, "seek", None)
176 if body_seek is not None and isinstance(body_pos, int):
177 try:
178 body_seek(body_pos)
179 except OSError as e:
180 raise UnrewindableBodyError(
181 "An error occurred when rewinding request body for redirect/retry."
182 ) from e
183 elif body_pos is _FAILEDTELL:
184 raise UnrewindableBodyError(
185 "Unable to record file position for rewinding "
186 "request body during a redirect/retry."
187 )
188 else:
189 raise ValueError(
190 f"body_pos must be of type integer, instead it was {type(body_pos)}."
191 )
192
193
194class ChunksAndContentLength(typing.NamedTuple):
195 chunks: typing.Iterable[bytes] | None
196 content_length: int | None
197
198
199def body_to_chunks(
200 body: typing.Any | None, method: str, blocksize: int
201) -> ChunksAndContentLength:
202 """Takes the HTTP request method, body, and blocksize and
203 transforms them into an iterable of chunks to pass to
204 socket.sendall() and an optional 'Content-Length' header.
205
206 A 'Content-Length' of 'None' indicates the length of the body
207 can't be determined so should use 'Transfer-Encoding: chunked'
208 for framing instead.
209 """
210
211 chunks: typing.Iterable[bytes] | None
212 content_length: int | None
213
214 # No body, we need to make a recommendation on 'Content-Length'
215 # based on whether that request method is expected to have
216 # a body or not.
217 if body is None:
218 chunks = None
219 if method.upper() not in _METHODS_NOT_EXPECTING_BODY:
220 content_length = 0
221 else:
222 content_length = None
223
224 # Bytes or strings become bytes
225 elif isinstance(body, (str, bytes)):
226 chunks = (to_bytes(body),)
227 content_length = len(chunks[0])
228
229 # File-like object, TODO: use seek() and tell() for length?
230 elif hasattr(body, "read"):
231
232 def chunk_readable() -> typing.Iterable[bytes]:
233 nonlocal body, blocksize
234 encode = isinstance(body, io.TextIOBase)
235 while True:
236 datablock = body.read(blocksize)
237 if not datablock:
238 break
239 if encode:
240 datablock = datablock.encode("utf-8")
241 yield datablock
242
243 chunks = chunk_readable()
244 content_length = None
245
246 # Otherwise we need to start checking via duck-typing.
247 else:
248 try:
249 # Check if the body implements the buffer API.
250 mv = memoryview(body)
251 except TypeError:
252 try:
253 # Check if the body is an iterable
254 chunks = iter(body)
255 content_length = None
256 except TypeError:
257 raise TypeError(
258 f"'body' must be a bytes-like object, file-like "
259 f"object, or iterable. Instead was {body!r}"
260 ) from None
261 else:
262 # Since it implements the buffer API can be passed directly to socket.sendall()
263 chunks = (body,)
264 content_length = mv.nbytes
265
266 return ChunksAndContentLength(chunks=chunks, content_length=content_length)