Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pip/_vendor/urllib3/util/request.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

99 statements  

1from __future__ import annotations 

2 

3import io 

4import sys 

5import typing 

6from base64 import b64encode 

7from enum import Enum 

8 

9from ..exceptions import UnrewindableBodyError 

10from .util import to_bytes 

11 

12if typing.TYPE_CHECKING: 

13 from typing import Final 

14 

15# Pass as a value within ``headers`` to skip 

16# emitting some HTTP headers that are added automatically. 

17# The only headers that are supported are ``Accept-Encoding``, 

18# ``Host``, and ``User-Agent``. 

19SKIP_HEADER = "@@@SKIP_HEADER@@@" 

20SKIPPABLE_HEADERS = frozenset(["accept-encoding", "host", "user-agent"]) 

21 

22ACCEPT_ENCODING = "gzip,deflate" 

23 

24try: 

25 if sys.version_info >= (3, 14): 

26 from compression import zstd as _unused_module_zstd # noqa: F401 

27 else: 

28 from backports import zstd as _unused_module_zstd # noqa: F401 

29except ImportError: 

30 pass 

31else: 

32 ACCEPT_ENCODING += ",zstd" 

33 

34 

35class _TYPE_FAILEDTELL(Enum): 

36 token = 0 

37 

38 

39_FAILEDTELL: Final[_TYPE_FAILEDTELL] = _TYPE_FAILEDTELL.token 

40 

41_TYPE_BODY_POSITION = typing.Union[int, _TYPE_FAILEDTELL] 

42 

43# When sending a request with these methods we aren't expecting 

44# a body so don't need to set an explicit 'Content-Length: 0' 

45# The reason we do this in the negative instead of tracking methods 

46# which 'should' have a body is because unknown methods should be 

47# treated as if they were 'POST' which *does* expect a body. 

48_METHODS_NOT_EXPECTING_BODY = {"GET", "HEAD", "DELETE", "TRACE", "OPTIONS", "CONNECT"} 

49 

50 

51def make_headers( 

52 keep_alive: bool | None = None, 

53 accept_encoding: bool | list[str] | str | None = None, 

54 user_agent: str | None = None, 

55 basic_auth: str | None = None, 

56 proxy_basic_auth: str | None = None, 

57 disable_cache: bool | None = None, 

58) -> dict[str, str]: 

59 """ 

60 Shortcuts for generating request headers. 

61 

62 :param keep_alive: 

63 If ``True``, adds 'connection: keep-alive' header. 

64 

65 :param accept_encoding: 

66 Can be a boolean, list, or string. 

67 ``True`` translates to 'gzip,deflate'. If the dependencies for 

68 Brotli (either the ``brotli`` or ``brotlicffi`` package) and/or 

69 Zstandard (the ``backports.zstd`` package for Python before 3.14) 

70 algorithms are installed, then their encodings are 

71 included in the string ('br' and 'zstd', respectively). 

72 List will get joined by comma. 

73 String will be used as provided. 

74 

75 :param user_agent: 

76 String representing the user-agent you want, such as 

77 "python-urllib3/0.6" 

78 

79 :param basic_auth: 

80 Colon-separated username:password string for 'authorization: basic ...' 

81 auth header. 

82 

83 :param proxy_basic_auth: 

84 Colon-separated username:password string for 'proxy-authorization: basic ...' 

85 auth header. 

86 

87 :param disable_cache: 

88 If ``True``, adds 'cache-control: no-cache' header. 

89 

90 Example: 

91 

92 .. code-block:: python 

93 

94 from pip._vendor import urllib3 

95 

96 print(urllib3.util.make_headers(keep_alive=True, user_agent="Batman/1.0")) 

97 # {'connection': 'keep-alive', 'user-agent': 'Batman/1.0'} 

98 print(urllib3.util.make_headers(accept_encoding=True)) 

99 # {'accept-encoding': 'gzip,deflate'} 

100 """ 

101 headers: dict[str, str] = {} 

102 if accept_encoding: 

103 if isinstance(accept_encoding, str): 

104 pass 

105 elif isinstance(accept_encoding, list): 

106 accept_encoding = ",".join(accept_encoding) 

107 else: 

108 accept_encoding = ACCEPT_ENCODING 

109 headers["accept-encoding"] = accept_encoding 

110 

111 if user_agent: 

112 headers["user-agent"] = user_agent 

113 

114 if keep_alive: 

115 headers["connection"] = "keep-alive" 

116 

117 if basic_auth: 

118 headers["authorization"] = ( 

119 f"Basic {b64encode(basic_auth.encode('latin-1')).decode()}" 

120 ) 

121 

122 if proxy_basic_auth: 

123 headers["proxy-authorization"] = ( 

124 f"Basic {b64encode(proxy_basic_auth.encode('latin-1')).decode()}" 

125 ) 

126 

127 if disable_cache: 

128 headers["cache-control"] = "no-cache" 

129 

130 return headers 

131 

132 

133def set_file_position( 

134 body: typing.Any, pos: _TYPE_BODY_POSITION | None 

135) -> _TYPE_BODY_POSITION | None: 

136 """ 

137 If a position is provided, move file to that point. 

138 Otherwise, we'll attempt to record a position for future use. 

139 """ 

140 if pos is not None: 

141 rewind_body(body, pos) 

142 elif getattr(body, "tell", None) is not None: 

143 try: 

144 pos = body.tell() 

145 except OSError: 

146 # This differentiates from None, allowing us to catch 

147 # a failed `tell()` later when trying to rewind the body. 

148 pos = _FAILEDTELL 

149 

150 return pos 

151 

152 

153def rewind_body(body: typing.IO[typing.AnyStr], body_pos: _TYPE_BODY_POSITION) -> None: 

154 """ 

155 Attempt to rewind body to a certain position. 

156 Primarily used for request redirects and retries. 

157 

158 :param body: 

159 File-like object that supports seek. 

160 

161 :param int pos: 

162 Position to seek to in file. 

163 """ 

164 body_seek = getattr(body, "seek", None) 

165 if body_seek is not None and isinstance(body_pos, int): 

166 try: 

167 body_seek(body_pos) 

168 except OSError as e: 

169 raise UnrewindableBodyError( 

170 "An error occurred when rewinding request body for redirect/retry." 

171 ) from e 

172 elif body_pos is _FAILEDTELL: 

173 raise UnrewindableBodyError( 

174 "Unable to record file position for rewinding " 

175 "request body during a redirect/retry." 

176 ) 

177 else: 

178 raise ValueError( 

179 f"body_pos must be of type integer, instead it was {type(body_pos)}." 

180 ) 

181 

182 

183class ChunksAndContentLength(typing.NamedTuple): 

184 chunks: typing.Iterable[bytes] | None 

185 content_length: int | None 

186 

187 

188def body_to_chunks( 

189 body: typing.Any | None, method: str, blocksize: int 

190) -> ChunksAndContentLength: 

191 """Takes the HTTP request method, body, and blocksize and 

192 transforms them into an iterable of chunks to pass to 

193 socket.sendall() and an optional 'Content-Length' header. 

194 

195 A 'Content-Length' of 'None' indicates the length of the body 

196 can't be determined so should use 'Transfer-Encoding: chunked' 

197 for framing instead. 

198 """ 

199 

200 chunks: typing.Iterable[bytes] | None 

201 content_length: int | None 

202 

203 # No body, we need to make a recommendation on 'Content-Length' 

204 # based on whether that request method is expected to have 

205 # a body or not. 

206 if body is None: 

207 chunks = None 

208 if method.upper() not in _METHODS_NOT_EXPECTING_BODY: 

209 content_length = 0 

210 else: 

211 content_length = None 

212 

213 # Bytes or strings become bytes 

214 elif isinstance(body, (str, bytes)): 

215 chunks = (to_bytes(body),) 

216 content_length = len(chunks[0]) 

217 

218 # File-like object, TODO: use seek() and tell() for length? 

219 elif hasattr(body, "read"): 

220 

221 def chunk_readable() -> typing.Iterable[bytes]: 

222 encode = isinstance(body, io.TextIOBase) 

223 while True: 

224 datablock = body.read(blocksize) 

225 if not datablock: 

226 break 

227 if encode: 

228 datablock = datablock.encode("utf-8") 

229 yield datablock 

230 

231 chunks = chunk_readable() 

232 content_length = None 

233 

234 # Otherwise we need to start checking via duck-typing. 

235 else: 

236 try: 

237 # Check if the body implements the buffer API. 

238 mv = memoryview(body) 

239 except TypeError: 

240 try: 

241 # Check if the body is an iterable 

242 chunks = iter(body) 

243 content_length = None 

244 except TypeError: 

245 raise TypeError( 

246 f"'body' must be a bytes-like object, file-like " 

247 f"object, or iterable. Instead was {body!r}" 

248 ) from None 

249 else: 

250 # Since it implements the buffer API can be passed directly to socket.sendall() 

251 chunks = (body,) 

252 content_length = mv.nbytes 

253 

254 return ChunksAndContentLength(chunks=chunks, content_length=content_length)