Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/urllib3/util/request.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

104 statements  

1from __future__ import annotations 

2 

3import io 

4import typing 

5from base64 import b64encode 

6from enum import Enum 

7 

8from ..exceptions import UnrewindableBodyError 

9from .util import to_bytes 

10 

11if typing.TYPE_CHECKING: 

12 from typing import Final 

13 

14# Pass as a value within ``headers`` to skip 

15# emitting some HTTP headers that are added automatically. 

16# The only headers that are supported are ``Accept-Encoding``, 

17# ``Host``, and ``User-Agent``. 

18SKIP_HEADER = "@@@SKIP_HEADER@@@" 

19SKIPPABLE_HEADERS = frozenset(["accept-encoding", "host", "user-agent"]) 

20 

21ACCEPT_ENCODING = "gzip,deflate" 

22try: 

23 try: 

24 import brotlicffi as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401 

25 except ImportError: 

26 import brotli as _unused_module_brotli # type: ignore[import-not-found] # noqa: F401 

27except ImportError: 

28 pass 

29else: 

30 ACCEPT_ENCODING += ",br" 

31try: 

32 import zstandard as _unused_module_zstd # noqa: F401 

33except ImportError: 

34 pass 

35else: 

36 ACCEPT_ENCODING += ",zstd" 

37 

38 

39class _TYPE_FAILEDTELL(Enum): 

40 token = 0 

41 

42 

43_FAILEDTELL: Final[_TYPE_FAILEDTELL] = _TYPE_FAILEDTELL.token 

44 

45_TYPE_BODY_POSITION = typing.Union[int, _TYPE_FAILEDTELL] 

46 

47# When sending a request with these methods we aren't expecting 

48# a body so don't need to set an explicit 'Content-Length: 0' 

49# The reason we do this in the negative instead of tracking methods 

50# which 'should' have a body is because unknown methods should be 

51# treated as if they were 'POST' which *does* expect a body. 

52_METHODS_NOT_EXPECTING_BODY = {"GET", "HEAD", "DELETE", "TRACE", "OPTIONS", "CONNECT"} 

53 

54 

55def make_headers( 

56 keep_alive: bool | None = None, 

57 accept_encoding: bool | list[str] | str | None = None, 

58 user_agent: str | None = None, 

59 basic_auth: str | None = None, 

60 proxy_basic_auth: str | None = None, 

61 disable_cache: bool | None = None, 

62) -> dict[str, str]: 

63 """ 

64 Shortcuts for generating request headers. 

65 

66 :param keep_alive: 

67 If ``True``, adds 'connection: keep-alive' header. 

68 

69 :param accept_encoding: 

70 Can be a boolean, list, or string. 

71 ``True`` translates to 'gzip,deflate'. If the dependencies for 

72 Brotli (either the ``brotli`` or ``brotlicffi`` package) and/or Zstandard 

73 (the ``zstandard`` package) algorithms are installed, then their encodings are 

74 included in the string ('br' and 'zstd', respectively). 

75 List will get joined by comma. 

76 String will be used as provided. 

77 

78 :param user_agent: 

79 String representing the user-agent you want, such as 

80 "python-urllib3/0.6" 

81 

82 :param basic_auth: 

83 Colon-separated username:password string for 'authorization: basic ...' 

84 auth header. 

85 

86 :param proxy_basic_auth: 

87 Colon-separated username:password string for 'proxy-authorization: basic ...' 

88 auth header. 

89 

90 :param disable_cache: 

91 If ``True``, adds 'cache-control: no-cache' header. 

92 

93 Example: 

94 

95 .. code-block:: python 

96 

97 import urllib3 

98 

99 print(urllib3.util.make_headers(keep_alive=True, user_agent="Batman/1.0")) 

100 # {'connection': 'keep-alive', 'user-agent': 'Batman/1.0'} 

101 print(urllib3.util.make_headers(accept_encoding=True)) 

102 # {'accept-encoding': 'gzip,deflate'} 

103 """ 

104 headers: dict[str, str] = {} 

105 if accept_encoding: 

106 if isinstance(accept_encoding, str): 

107 pass 

108 elif isinstance(accept_encoding, list): 

109 accept_encoding = ",".join(accept_encoding) 

110 else: 

111 accept_encoding = ACCEPT_ENCODING 

112 headers["accept-encoding"] = accept_encoding 

113 

114 if user_agent: 

115 headers["user-agent"] = user_agent 

116 

117 if keep_alive: 

118 headers["connection"] = "keep-alive" 

119 

120 if basic_auth: 

121 headers["authorization"] = ( 

122 f"Basic {b64encode(basic_auth.encode('latin-1')).decode()}" 

123 ) 

124 

125 if proxy_basic_auth: 

126 headers["proxy-authorization"] = ( 

127 f"Basic {b64encode(proxy_basic_auth.encode('latin-1')).decode()}" 

128 ) 

129 

130 if disable_cache: 

131 headers["cache-control"] = "no-cache" 

132 

133 return headers 

134 

135 

136def set_file_position( 

137 body: typing.Any, pos: _TYPE_BODY_POSITION | None 

138) -> _TYPE_BODY_POSITION | None: 

139 """ 

140 If a position is provided, move file to that point. 

141 Otherwise, we'll attempt to record a position for future use. 

142 """ 

143 if pos is not None: 

144 rewind_body(body, pos) 

145 elif getattr(body, "tell", None) is not None: 

146 try: 

147 pos = body.tell() 

148 except OSError: 

149 # This differentiates from None, allowing us to catch 

150 # a failed `tell()` later when trying to rewind the body. 

151 pos = _FAILEDTELL 

152 

153 return pos 

154 

155 

156def rewind_body(body: typing.IO[typing.AnyStr], body_pos: _TYPE_BODY_POSITION) -> None: 

157 """ 

158 Attempt to rewind body to a certain position. 

159 Primarily used for request redirects and retries. 

160 

161 :param body: 

162 File-like object that supports seek. 

163 

164 :param int pos: 

165 Position to seek to in file. 

166 """ 

167 body_seek = getattr(body, "seek", None) 

168 if body_seek is not None and isinstance(body_pos, int): 

169 try: 

170 body_seek(body_pos) 

171 except OSError as e: 

172 raise UnrewindableBodyError( 

173 "An error occurred when rewinding request body for redirect/retry." 

174 ) from e 

175 elif body_pos is _FAILEDTELL: 

176 raise UnrewindableBodyError( 

177 "Unable to record file position for rewinding " 

178 "request body during a redirect/retry." 

179 ) 

180 else: 

181 raise ValueError( 

182 f"body_pos must be of type integer, instead it was {type(body_pos)}." 

183 ) 

184 

185 

186class ChunksAndContentLength(typing.NamedTuple): 

187 chunks: typing.Iterable[bytes] | None 

188 content_length: int | None 

189 

190 

191def body_to_chunks( 

192 body: typing.Any | None, method: str, blocksize: int 

193) -> ChunksAndContentLength: 

194 """Takes the HTTP request method, body, and blocksize and 

195 transforms them into an iterable of chunks to pass to 

196 socket.sendall() and an optional 'Content-Length' header. 

197 

198 A 'Content-Length' of 'None' indicates the length of the body 

199 can't be determined so should use 'Transfer-Encoding: chunked' 

200 for framing instead. 

201 """ 

202 

203 chunks: typing.Iterable[bytes] | None 

204 content_length: int | None 

205 

206 # No body, we need to make a recommendation on 'Content-Length' 

207 # based on whether that request method is expected to have 

208 # a body or not. 

209 if body is None: 

210 chunks = None 

211 if method.upper() not in _METHODS_NOT_EXPECTING_BODY: 

212 content_length = 0 

213 else: 

214 content_length = None 

215 

216 # Bytes or strings become bytes 

217 elif isinstance(body, (str, bytes)): 

218 chunks = (to_bytes(body),) 

219 content_length = len(chunks[0]) 

220 

221 # File-like object, TODO: use seek() and tell() for length? 

222 elif hasattr(body, "read"): 

223 

224 def chunk_readable() -> typing.Iterable[bytes]: 

225 nonlocal body, blocksize 

226 encode = isinstance(body, io.TextIOBase) 

227 while True: 

228 datablock = body.read(blocksize) 

229 if not datablock: 

230 break 

231 if encode: 

232 datablock = datablock.encode("utf-8") 

233 yield datablock 

234 

235 chunks = chunk_readable() 

236 content_length = None 

237 

238 # Otherwise we need to start checking via duck-typing. 

239 else: 

240 try: 

241 # Check if the body implements the buffer API. 

242 mv = memoryview(body) 

243 except TypeError: 

244 try: 

245 # Check if the body is an iterable 

246 chunks = iter(body) 

247 content_length = None 

248 except TypeError: 

249 raise TypeError( 

250 f"'body' must be a bytes-like object, file-like " 

251 f"object, or iterable. Instead was {body!r}" 

252 ) from None 

253 else: 

254 # Since it implements the buffer API can be passed directly to socket.sendall() 

255 chunks = (body,) 

256 content_length = mv.nbytes 

257 

258 return ChunksAndContentLength(chunks=chunks, content_length=content_length)