Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/urllib3/util/request.py: 45%

103 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:45 +0000

1from __future__ import annotations 

2 

3import io 

4import typing 

5from base64 import b64encode 

6from enum import Enum 

7 

8from ..exceptions import UnrewindableBodyError 

9from .util import to_bytes 

10 

11if typing.TYPE_CHECKING: 

12 from typing import Final 

13 

14# Pass as a value within ``headers`` to skip 

15# emitting some HTTP headers that are added automatically. 

16# The only headers that are supported are ``Accept-Encoding``, 

17# ``Host``, and ``User-Agent``. 

18SKIP_HEADER = "@@@SKIP_HEADER@@@" 

19SKIPPABLE_HEADERS = frozenset(["accept-encoding", "host", "user-agent"]) 

20 

21ACCEPT_ENCODING = "gzip,deflate" 

22try: 

23 try: 

24 import brotlicffi as _unused_module_brotli # type: ignore[import] # noqa: F401 

25 except ImportError: 

26 import brotli as _unused_module_brotli # type: ignore[import] # noqa: F401 

27except ImportError: 

28 pass 

29else: 

30 ACCEPT_ENCODING += ",br" 

31try: 

32 import zstandard as _unused_module_zstd # type: ignore[import] # noqa: F401 

33except ImportError: 

34 pass 

35else: 

36 ACCEPT_ENCODING += ",zstd" 

37 

38 

39class _TYPE_FAILEDTELL(Enum): 

40 token = 0 

41 

42 

43_FAILEDTELL: Final[_TYPE_FAILEDTELL] = _TYPE_FAILEDTELL.token 

44 

45_TYPE_BODY_POSITION = typing.Union[int, _TYPE_FAILEDTELL] 

46 

47# When sending a request with these methods we aren't expecting 

48# a body so don't need to set an explicit 'Content-Length: 0' 

49# The reason we do this in the negative instead of tracking methods 

50# which 'should' have a body is because unknown methods should be 

51# treated as if they were 'POST' which *does* expect a body. 

52_METHODS_NOT_EXPECTING_BODY = {"GET", "HEAD", "DELETE", "TRACE", "OPTIONS", "CONNECT"} 

53 

54 

55def make_headers( 

56 keep_alive: bool | None = None, 

57 accept_encoding: bool | list[str] | str | None = None, 

58 user_agent: str | None = None, 

59 basic_auth: str | None = None, 

60 proxy_basic_auth: str | None = None, 

61 disable_cache: bool | None = None, 

62) -> dict[str, str]: 

63 """ 

64 Shortcuts for generating request headers. 

65 

66 :param keep_alive: 

67 If ``True``, adds 'connection: keep-alive' header. 

68 

69 :param accept_encoding: 

70 Can be a boolean, list, or string. 

71 ``True`` translates to 'gzip,deflate'. If either the ``brotli`` or 

72 ``brotlicffi`` package is installed 'gzip,deflate,br' is used instead. 

73 List will get joined by comma. 

74 String will be used as provided. 

75 

76 :param user_agent: 

77 String representing the user-agent you want, such as 

78 "python-urllib3/0.6" 

79 

80 :param basic_auth: 

81 Colon-separated username:password string for 'authorization: basic ...' 

82 auth header. 

83 

84 :param proxy_basic_auth: 

85 Colon-separated username:password string for 'proxy-authorization: basic ...' 

86 auth header. 

87 

88 :param disable_cache: 

89 If ``True``, adds 'cache-control: no-cache' header. 

90 

91 Example: 

92 

93 .. code-block:: python 

94 

95 import urllib3 

96 

97 print(urllib3.util.make_headers(keep_alive=True, user_agent="Batman/1.0")) 

98 # {'connection': 'keep-alive', 'user-agent': 'Batman/1.0'} 

99 print(urllib3.util.make_headers(accept_encoding=True)) 

100 # {'accept-encoding': 'gzip,deflate'} 

101 """ 

102 headers: dict[str, str] = {} 

103 if accept_encoding: 

104 if isinstance(accept_encoding, str): 

105 pass 

106 elif isinstance(accept_encoding, list): 

107 accept_encoding = ",".join(accept_encoding) 

108 else: 

109 accept_encoding = ACCEPT_ENCODING 

110 headers["accept-encoding"] = accept_encoding 

111 

112 if user_agent: 

113 headers["user-agent"] = user_agent 

114 

115 if keep_alive: 

116 headers["connection"] = "keep-alive" 

117 

118 if basic_auth: 

119 headers[ 

120 "authorization" 

121 ] = f"Basic {b64encode(basic_auth.encode('latin-1')).decode()}" 

122 

123 if proxy_basic_auth: 

124 headers[ 

125 "proxy-authorization" 

126 ] = f"Basic {b64encode(proxy_basic_auth.encode('latin-1')).decode()}" 

127 

128 if disable_cache: 

129 headers["cache-control"] = "no-cache" 

130 

131 return headers 

132 

133 

134def set_file_position( 

135 body: typing.Any, pos: _TYPE_BODY_POSITION | None 

136) -> _TYPE_BODY_POSITION | None: 

137 """ 

138 If a position is provided, move file to that point. 

139 Otherwise, we'll attempt to record a position for future use. 

140 """ 

141 if pos is not None: 

142 rewind_body(body, pos) 

143 elif getattr(body, "tell", None) is not None: 

144 try: 

145 pos = body.tell() 

146 except OSError: 

147 # This differentiates from None, allowing us to catch 

148 # a failed `tell()` later when trying to rewind the body. 

149 pos = _FAILEDTELL 

150 

151 return pos 

152 

153 

154def rewind_body(body: typing.IO[typing.AnyStr], body_pos: _TYPE_BODY_POSITION) -> None: 

155 """ 

156 Attempt to rewind body to a certain position. 

157 Primarily used for request redirects and retries. 

158 

159 :param body: 

160 File-like object that supports seek. 

161 

162 :param int pos: 

163 Position to seek to in file. 

164 """ 

165 body_seek = getattr(body, "seek", None) 

166 if body_seek is not None and isinstance(body_pos, int): 

167 try: 

168 body_seek(body_pos) 

169 except OSError as e: 

170 raise UnrewindableBodyError( 

171 "An error occurred when rewinding request body for redirect/retry." 

172 ) from e 

173 elif body_pos is _FAILEDTELL: 

174 raise UnrewindableBodyError( 

175 "Unable to record file position for rewinding " 

176 "request body during a redirect/retry." 

177 ) 

178 else: 

179 raise ValueError( 

180 f"body_pos must be of type integer, instead it was {type(body_pos)}." 

181 ) 

182 

183 

184class ChunksAndContentLength(typing.NamedTuple): 

185 chunks: typing.Iterable[bytes] | None 

186 content_length: int | None 

187 

188 

189def body_to_chunks( 

190 body: typing.Any | None, method: str, blocksize: int 

191) -> ChunksAndContentLength: 

192 """Takes the HTTP request method, body, and blocksize and 

193 transforms them into an iterable of chunks to pass to 

194 socket.sendall() and an optional 'Content-Length' header. 

195 

196 A 'Content-Length' of 'None' indicates the length of the body 

197 can't be determined so should use 'Transfer-Encoding: chunked' 

198 for framing instead. 

199 """ 

200 

201 chunks: typing.Iterable[bytes] | None 

202 content_length: int | None 

203 

204 # No body, we need to make a recommendation on 'Content-Length' 

205 # based on whether that request method is expected to have 

206 # a body or not. 

207 if body is None: 

208 chunks = None 

209 if method.upper() not in _METHODS_NOT_EXPECTING_BODY: 

210 content_length = 0 

211 else: 

212 content_length = None 

213 

214 # Bytes or strings become bytes 

215 elif isinstance(body, (str, bytes)): 

216 chunks = (to_bytes(body),) 

217 content_length = len(chunks[0]) 

218 

219 # File-like object, TODO: use seek() and tell() for length? 

220 elif hasattr(body, "read"): 

221 

222 def chunk_readable() -> typing.Iterable[bytes]: 

223 nonlocal body, blocksize 

224 encode = isinstance(body, io.TextIOBase) 

225 while True: 

226 datablock = body.read(blocksize) 

227 if not datablock: 

228 break 

229 if encode: 

230 datablock = datablock.encode("iso-8859-1") 

231 yield datablock 

232 

233 chunks = chunk_readable() 

234 content_length = None 

235 

236 # Otherwise we need to start checking via duck-typing. 

237 else: 

238 try: 

239 # Check if the body implements the buffer API. 

240 mv = memoryview(body) 

241 except TypeError: 

242 try: 

243 # Check if the body is an iterable 

244 chunks = iter(body) 

245 content_length = None 

246 except TypeError: 

247 raise TypeError( 

248 f"'body' must be a bytes-like object, file-like " 

249 f"object, or iterable. Instead was {body!r}" 

250 ) from None 

251 else: 

252 # Since it implements the buffer API can be passed directly to socket.sendall() 

253 chunks = (body,) 

254 content_length = mv.nbytes 

255 

256 return ChunksAndContentLength(chunks=chunks, content_length=content_length)