1from __future__ import annotations
2
3import io
4import sys
5import typing
6from base64 import b64encode
7from enum import Enum
8
9from ..exceptions import UnrewindableBodyError
10from .util import to_bytes
11
12if typing.TYPE_CHECKING:
13 from typing import Final
14
15# Pass as a value within ``headers`` to skip
16# emitting some HTTP headers that are added automatically.
17# The only headers that are supported are ``Accept-Encoding``,
18# ``Host``, and ``User-Agent``.
19SKIP_HEADER = "@@@SKIP_HEADER@@@"
20SKIPPABLE_HEADERS = frozenset(["accept-encoding", "host", "user-agent"])
21
22ACCEPT_ENCODING = "gzip,deflate"
23
24try:
25 if sys.version_info >= (3, 14):
26 from compression import zstd as _unused_module_zstd # noqa: F401
27 else:
28 from backports import zstd as _unused_module_zstd # noqa: F401
29except ImportError:
30 pass
31else:
32 ACCEPT_ENCODING += ",zstd"
33
34
35class _TYPE_FAILEDTELL(Enum):
36 token = 0
37
38
39_FAILEDTELL: Final[_TYPE_FAILEDTELL] = _TYPE_FAILEDTELL.token
40
41_TYPE_BODY_POSITION = typing.Union[int, _TYPE_FAILEDTELL]
42
43# When sending a request with these methods we aren't expecting
44# a body so don't need to set an explicit 'Content-Length: 0'
45# The reason we do this in the negative instead of tracking methods
46# which 'should' have a body is because unknown methods should be
47# treated as if they were 'POST' which *does* expect a body.
48_METHODS_NOT_EXPECTING_BODY = {"GET", "HEAD", "DELETE", "TRACE", "OPTIONS", "CONNECT"}
49
50
51def make_headers(
52 keep_alive: bool | None = None,
53 accept_encoding: bool | list[str] | str | None = None,
54 user_agent: str | None = None,
55 basic_auth: str | None = None,
56 proxy_basic_auth: str | None = None,
57 disable_cache: bool | None = None,
58) -> dict[str, str]:
59 """
60 Shortcuts for generating request headers.
61
62 :param keep_alive:
63 If ``True``, adds 'connection: keep-alive' header.
64
65 :param accept_encoding:
66 Can be a boolean, list, or string.
67 ``True`` translates to 'gzip,deflate'. If the dependencies for
68 Brotli (either the ``brotli`` or ``brotlicffi`` package) and/or
69 Zstandard (the ``backports.zstd`` package for Python before 3.14)
70 algorithms are installed, then their encodings are
71 included in the string ('br' and 'zstd', respectively).
72 List will get joined by comma.
73 String will be used as provided.
74
75 :param user_agent:
76 String representing the user-agent you want, such as
77 "python-urllib3/0.6"
78
79 :param basic_auth:
80 Colon-separated username:password string for 'authorization: basic ...'
81 auth header.
82
83 :param proxy_basic_auth:
84 Colon-separated username:password string for 'proxy-authorization: basic ...'
85 auth header.
86
87 :param disable_cache:
88 If ``True``, adds 'cache-control: no-cache' header.
89
90 Example:
91
92 .. code-block:: python
93
94 from pip._vendor import urllib3
95
96 print(urllib3.util.make_headers(keep_alive=True, user_agent="Batman/1.0"))
97 # {'connection': 'keep-alive', 'user-agent': 'Batman/1.0'}
98 print(urllib3.util.make_headers(accept_encoding=True))
99 # {'accept-encoding': 'gzip,deflate'}
100 """
101 headers: dict[str, str] = {}
102 if accept_encoding:
103 if isinstance(accept_encoding, str):
104 pass
105 elif isinstance(accept_encoding, list):
106 accept_encoding = ",".join(accept_encoding)
107 else:
108 accept_encoding = ACCEPT_ENCODING
109 headers["accept-encoding"] = accept_encoding
110
111 if user_agent:
112 headers["user-agent"] = user_agent
113
114 if keep_alive:
115 headers["connection"] = "keep-alive"
116
117 if basic_auth:
118 headers["authorization"] = (
119 f"Basic {b64encode(basic_auth.encode('latin-1')).decode()}"
120 )
121
122 if proxy_basic_auth:
123 headers["proxy-authorization"] = (
124 f"Basic {b64encode(proxy_basic_auth.encode('latin-1')).decode()}"
125 )
126
127 if disable_cache:
128 headers["cache-control"] = "no-cache"
129
130 return headers
131
132
133def set_file_position(
134 body: typing.Any, pos: _TYPE_BODY_POSITION | None
135) -> _TYPE_BODY_POSITION | None:
136 """
137 If a position is provided, move file to that point.
138 Otherwise, we'll attempt to record a position for future use.
139 """
140 if pos is not None:
141 rewind_body(body, pos)
142 elif getattr(body, "tell", None) is not None:
143 try:
144 pos = body.tell()
145 except OSError:
146 # This differentiates from None, allowing us to catch
147 # a failed `tell()` later when trying to rewind the body.
148 pos = _FAILEDTELL
149
150 return pos
151
152
153def rewind_body(body: typing.IO[typing.AnyStr], body_pos: _TYPE_BODY_POSITION) -> None:
154 """
155 Attempt to rewind body to a certain position.
156 Primarily used for request redirects and retries.
157
158 :param body:
159 File-like object that supports seek.
160
161 :param int pos:
162 Position to seek to in file.
163 """
164 body_seek = getattr(body, "seek", None)
165 if body_seek is not None and isinstance(body_pos, int):
166 try:
167 body_seek(body_pos)
168 except OSError as e:
169 raise UnrewindableBodyError(
170 "An error occurred when rewinding request body for redirect/retry."
171 ) from e
172 elif body_pos is _FAILEDTELL:
173 raise UnrewindableBodyError(
174 "Unable to record file position for rewinding "
175 "request body during a redirect/retry."
176 )
177 else:
178 raise ValueError(
179 f"body_pos must be of type integer, instead it was {type(body_pos)}."
180 )
181
182
183class ChunksAndContentLength(typing.NamedTuple):
184 chunks: typing.Iterable[bytes] | None
185 content_length: int | None
186
187
188def body_to_chunks(
189 body: typing.Any | None, method: str, blocksize: int
190) -> ChunksAndContentLength:
191 """Takes the HTTP request method, body, and blocksize and
192 transforms them into an iterable of chunks to pass to
193 socket.sendall() and an optional 'Content-Length' header.
194
195 A 'Content-Length' of 'None' indicates the length of the body
196 can't be determined so should use 'Transfer-Encoding: chunked'
197 for framing instead.
198 """
199
200 chunks: typing.Iterable[bytes] | None
201 content_length: int | None
202
203 # No body, we need to make a recommendation on 'Content-Length'
204 # based on whether that request method is expected to have
205 # a body or not.
206 if body is None:
207 chunks = None
208 if method.upper() not in _METHODS_NOT_EXPECTING_BODY:
209 content_length = 0
210 else:
211 content_length = None
212
213 # Bytes or strings become bytes
214 elif isinstance(body, (str, bytes)):
215 chunks = (to_bytes(body),)
216 content_length = len(chunks[0])
217
218 # File-like object, TODO: use seek() and tell() for length?
219 elif hasattr(body, "read"):
220
221 def chunk_readable() -> typing.Iterable[bytes]:
222 encode = isinstance(body, io.TextIOBase)
223 while True:
224 datablock = body.read(blocksize)
225 if not datablock:
226 break
227 if encode:
228 datablock = datablock.encode("utf-8")
229 yield datablock
230
231 chunks = chunk_readable()
232 content_length = None
233
234 # Otherwise we need to start checking via duck-typing.
235 else:
236 try:
237 # Check if the body implements the buffer API.
238 mv = memoryview(body)
239 except TypeError:
240 try:
241 # Check if the body is an iterable
242 chunks = iter(body)
243 content_length = None
244 except TypeError:
245 raise TypeError(
246 f"'body' must be a bytes-like object, file-like "
247 f"object, or iterable. Instead was {body!r}"
248 ) from None
249 else:
250 # Since it implements the buffer API can be passed directly to socket.sendall()
251 chunks = (body,)
252 content_length = mv.nbytes
253
254 return ChunksAndContentLength(chunks=chunks, content_length=content_length)