Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/werkzeug/_internal.py: 47%

176 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-09 06:08 +0000

1from __future__ import annotations 

2 

3import logging 

4import operator 

5import re 

6import sys 

7import typing as t 

8from datetime import datetime 

9from datetime import timezone 

10 

11if t.TYPE_CHECKING: 

12 from _typeshed.wsgi import WSGIEnvironment 

13 from .wrappers.request import Request 

14 

15_logger: logging.Logger | None = None 

16 

17 

18class _Missing: 

19 def __repr__(self) -> str: 

20 return "no value" 

21 

22 def __reduce__(self) -> str: 

23 return "_missing" 

24 

25 

26_missing = _Missing() 

27 

28 

29@t.overload 

30def _make_encode_wrapper(reference: str) -> t.Callable[[str], str]: 

31 ... 

32 

33 

34@t.overload 

35def _make_encode_wrapper(reference: bytes) -> t.Callable[[str], bytes]: 

36 ... 

37 

38 

39def _make_encode_wrapper(reference: t.AnyStr) -> t.Callable[[str], t.AnyStr]: 

40 """Create a function that will be called with a string argument. If 

41 the reference is bytes, values will be encoded to bytes. 

42 """ 

43 if isinstance(reference, str): 

44 return lambda x: x 

45 

46 return operator.methodcaller("encode", "latin1") 

47 

48 

49def _check_str_tuple(value: tuple[t.AnyStr, ...]) -> None: 

50 """Ensure tuple items are all strings or all bytes.""" 

51 if not value: 

52 return 

53 

54 item_type = str if isinstance(value[0], str) else bytes 

55 

56 if any(not isinstance(item, item_type) for item in value): 

57 raise TypeError(f"Cannot mix str and bytes arguments (got {value!r})") 

58 

59 

60_default_encoding = sys.getdefaultencoding() 

61 

62 

63def _to_bytes( 

64 x: str | bytes, charset: str = _default_encoding, errors: str = "strict" 

65) -> bytes: 

66 if x is None or isinstance(x, bytes): 

67 return x 

68 

69 if isinstance(x, (bytearray, memoryview)): 

70 return bytes(x) 

71 

72 if isinstance(x, str): 

73 return x.encode(charset, errors) 

74 

75 raise TypeError("Expected bytes") 

76 

77 

78@t.overload 

79def _to_str( # type: ignore 

80 x: None, 

81 charset: str | None = ..., 

82 errors: str = ..., 

83 allow_none_charset: bool = ..., 

84) -> None: 

85 ... 

86 

87 

88@t.overload 

89def _to_str( 

90 x: t.Any, 

91 charset: str | None = ..., 

92 errors: str = ..., 

93 allow_none_charset: bool = ..., 

94) -> str: 

95 ... 

96 

97 

98def _to_str( 

99 x: t.Any | None, 

100 charset: str | None = _default_encoding, 

101 errors: str = "strict", 

102 allow_none_charset: bool = False, 

103) -> str | bytes | None: 

104 if x is None or isinstance(x, str): 

105 return x 

106 

107 if not isinstance(x, (bytes, bytearray)): 

108 return str(x) 

109 

110 if charset is None: 

111 if allow_none_charset: 

112 return x 

113 

114 return x.decode(charset, errors) # type: ignore 

115 

116 

117def _wsgi_decoding_dance( 

118 s: str, charset: str = "utf-8", errors: str = "replace" 

119) -> str: 

120 return s.encode("latin1").decode(charset, errors) 

121 

122 

123def _wsgi_encoding_dance(s: str, charset: str = "utf-8", errors: str = "strict") -> str: 

124 return s.encode(charset).decode("latin1", errors) 

125 

126 

127def _get_environ(obj: WSGIEnvironment | Request) -> WSGIEnvironment: 

128 env = getattr(obj, "environ", obj) 

129 assert isinstance( 

130 env, dict 

131 ), f"{type(obj).__name__!r} is not a WSGI environment (has to be a dict)" 

132 return env 

133 

134 

135def _has_level_handler(logger: logging.Logger) -> bool: 

136 """Check if there is a handler in the logging chain that will handle 

137 the given logger's effective level. 

138 """ 

139 level = logger.getEffectiveLevel() 

140 current = logger 

141 

142 while current: 

143 if any(handler.level <= level for handler in current.handlers): 

144 return True 

145 

146 if not current.propagate: 

147 break 

148 

149 current = current.parent # type: ignore 

150 

151 return False 

152 

153 

154class _ColorStreamHandler(logging.StreamHandler): 

155 """On Windows, wrap stream with Colorama for ANSI style support.""" 

156 

157 def __init__(self) -> None: 

158 try: 

159 import colorama 

160 except ImportError: 

161 stream = None 

162 else: 

163 stream = colorama.AnsiToWin32(sys.stderr) 

164 

165 super().__init__(stream) 

166 

167 

168def _log(type: str, message: str, *args: t.Any, **kwargs: t.Any) -> None: 

169 """Log a message to the 'werkzeug' logger. 

170 

171 The logger is created the first time it is needed. If there is no 

172 level set, it is set to :data:`logging.INFO`. If there is no handler 

173 for the logger's effective level, a :class:`logging.StreamHandler` 

174 is added. 

175 """ 

176 global _logger 

177 

178 if _logger is None: 

179 _logger = logging.getLogger("werkzeug") 

180 

181 if _logger.level == logging.NOTSET: 

182 _logger.setLevel(logging.INFO) 

183 

184 if not _has_level_handler(_logger): 

185 _logger.addHandler(_ColorStreamHandler()) 

186 

187 getattr(_logger, type)(message.rstrip(), *args, **kwargs) 

188 

189 

190@t.overload 

191def _dt_as_utc(dt: None) -> None: 

192 ... 

193 

194 

195@t.overload 

196def _dt_as_utc(dt: datetime) -> datetime: 

197 ... 

198 

199 

200def _dt_as_utc(dt: datetime | None) -> datetime | None: 

201 if dt is None: 

202 return dt 

203 

204 if dt.tzinfo is None: 

205 return dt.replace(tzinfo=timezone.utc) 

206 elif dt.tzinfo != timezone.utc: 

207 return dt.astimezone(timezone.utc) 

208 

209 return dt 

210 

211 

212_TAccessorValue = t.TypeVar("_TAccessorValue") 

213 

214 

215class _DictAccessorProperty(t.Generic[_TAccessorValue]): 

216 """Baseclass for `environ_property` and `header_property`.""" 

217 

218 read_only = False 

219 

220 def __init__( 

221 self, 

222 name: str, 

223 default: _TAccessorValue | None = None, 

224 load_func: t.Callable[[str], _TAccessorValue] | None = None, 

225 dump_func: t.Callable[[_TAccessorValue], str] | None = None, 

226 read_only: bool | None = None, 

227 doc: str | None = None, 

228 ) -> None: 

229 self.name = name 

230 self.default = default 

231 self.load_func = load_func 

232 self.dump_func = dump_func 

233 if read_only is not None: 

234 self.read_only = read_only 

235 self.__doc__ = doc 

236 

237 def lookup(self, instance: t.Any) -> t.MutableMapping[str, t.Any]: 

238 raise NotImplementedError 

239 

240 @t.overload 

241 def __get__( 

242 self, instance: None, owner: type 

243 ) -> _DictAccessorProperty[_TAccessorValue]: 

244 ... 

245 

246 @t.overload 

247 def __get__(self, instance: t.Any, owner: type) -> _TAccessorValue: 

248 ... 

249 

250 def __get__( 

251 self, instance: t.Any | None, owner: type 

252 ) -> _TAccessorValue | _DictAccessorProperty[_TAccessorValue]: 

253 if instance is None: 

254 return self 

255 

256 storage = self.lookup(instance) 

257 

258 if self.name not in storage: 

259 return self.default # type: ignore 

260 

261 value = storage[self.name] 

262 

263 if self.load_func is not None: 

264 try: 

265 return self.load_func(value) 

266 except (ValueError, TypeError): 

267 return self.default # type: ignore 

268 

269 return value # type: ignore 

270 

271 def __set__(self, instance: t.Any, value: _TAccessorValue) -> None: 

272 if self.read_only: 

273 raise AttributeError("read only property") 

274 

275 if self.dump_func is not None: 

276 self.lookup(instance)[self.name] = self.dump_func(value) 

277 else: 

278 self.lookup(instance)[self.name] = value 

279 

280 def __delete__(self, instance: t.Any) -> None: 

281 if self.read_only: 

282 raise AttributeError("read only property") 

283 

284 self.lookup(instance).pop(self.name, None) 

285 

286 def __repr__(self) -> str: 

287 return f"<{type(self).__name__} {self.name}>" 

288 

289 

290def _decode_idna(domain: str) -> str: 

291 try: 

292 data = domain.encode("ascii") 

293 except UnicodeEncodeError: 

294 # If the domain is not ASCII, it's decoded already. 

295 return domain 

296 

297 try: 

298 # Try decoding in one shot. 

299 return data.decode("idna") 

300 except UnicodeDecodeError: 

301 pass 

302 

303 # Decode each part separately, leaving invalid parts as punycode. 

304 parts = [] 

305 

306 for part in data.split(b"."): 

307 try: 

308 parts.append(part.decode("idna")) 

309 except UnicodeDecodeError: 

310 parts.append(part.decode("ascii")) 

311 

312 return ".".join(parts) 

313 

314 

315_plain_int_re = re.compile(r"-?\d+", re.ASCII) 

316_plain_float_re = re.compile(r"-?\d+\.\d+", re.ASCII) 

317 

318 

319def _plain_int(value: str) -> int: 

320 """Parse an int only if it is only ASCII digits and ``-``. 

321 

322 This disallows ``+``, ``_``, and non-ASCII digits, which are accepted by ``int`` but 

323 are not allowed in HTTP header values. 

324 """ 

325 if _plain_int_re.fullmatch(value) is None: 

326 raise ValueError 

327 

328 return int(value) 

329 

330 

331def _plain_float(value: str) -> float: 

332 """Parse a float only if it is only ASCII digits and ``-``, and contains digits 

333 before and after the ``.``. 

334 

335 This disallows ``+``, ``_``, non-ASCII digits, and ``.123``, which are accepted by 

336 ``float`` but are not allowed in HTTP header values. 

337 """ 

338 if _plain_float_re.fullmatch(value) is None: 

339 raise ValueError 

340 

341 return float(value)