Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tornado/util.py: 36%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

169 statements  

1"""Miscellaneous utility functions and classes. 

2 

3This module is used internally by Tornado. It is not necessarily expected 

4that the functions and classes defined here will be useful to other 

5applications, but they are documented here in case they are. 

6 

7The one public-facing part of this module is the `Configurable` class 

8and its `~Configurable.configure` method, which becomes a part of the 

9interface of its subclasses, including `.AsyncHTTPClient`, `.IOLoop`, 

10and `.Resolver`. 

11""" 

12 

13import array 

14import asyncio 

15from inspect import getfullargspec 

16import os 

17import re 

18import typing 

19import zlib 

20 

21from typing import ( 

22 Any, 

23 Optional, 

24 Dict, 

25 Mapping, 

26 List, 

27 Tuple, 

28 Match, 

29 Callable, 

30 Type, 

31 Sequence, 

32) 

33 

34if typing.TYPE_CHECKING: 

35 # Additional imports only used in type comments. 

36 # This lets us make these imports lazy. 

37 import datetime # noqa: F401 

38 from types import TracebackType # noqa: F401 

39 from typing import Union # noqa: F401 

40 import unittest # noqa: F401 

41 

42# Aliases for types that are spelled differently in different Python 

43# versions. bytes_type is deprecated and no longer used in Tornado 

44# itself but is left in case anyone outside Tornado is using it. 

45bytes_type = bytes 

46unicode_type = str 

47basestring_type = str 

48 

49 

50# versionchanged:: 6.2 

51# no longer our own TimeoutError, use standard asyncio class 

52TimeoutError = asyncio.TimeoutError 

53 

54 

55class ObjectDict(Dict[str, Any]): 

56 """Makes a dictionary behave like an object, with attribute-style access.""" 

57 

58 def __getattr__(self, name: str) -> Any: 

59 try: 

60 return self[name] 

61 except KeyError: 

62 raise AttributeError(name) 

63 

64 def __setattr__(self, name: str, value: Any) -> None: 

65 self[name] = value 

66 

67 

68class GzipDecompressor: 

69 """Streaming gzip decompressor. 

70 

71 The interface is like that of `zlib.decompressobj` (without some of the 

72 optional arguments, but it understands gzip headers and checksums. 

73 """ 

74 

75 def __init__(self) -> None: 

76 # Magic parameter makes zlib module understand gzip header 

77 # http://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib 

78 # This works on cpython and pypy, but not jython. 

79 self.decompressobj = zlib.decompressobj(16 + zlib.MAX_WBITS) 

80 

81 def decompress(self, value: bytes, max_length: int = 0) -> bytes: 

82 """Decompress a chunk, returning newly-available data. 

83 

84 Some data may be buffered for later processing; `flush` must 

85 be called when there is no more input data to ensure that 

86 all data was processed. 

87 

88 If ``max_length`` is given, some input data may be left over 

89 in ``unconsumed_tail``; you must retrieve this value and pass 

90 it back to a future call to `decompress` if it is not empty. 

91 """ 

92 return self.decompressobj.decompress(value, max_length) 

93 

94 @property 

95 def unconsumed_tail(self) -> bytes: 

96 """Returns the unconsumed portion left over""" 

97 return self.decompressobj.unconsumed_tail 

98 

99 def flush(self) -> bytes: 

100 """Return any remaining buffered data not yet returned by decompress. 

101 

102 Also checks for errors such as truncated input. 

103 No other methods may be called on this object after `flush`. 

104 """ 

105 return self.decompressobj.flush() 

106 

107 

108def import_object(name: str) -> Any: 

109 """Imports an object by name. 

110 

111 ``import_object('x')`` is equivalent to ``import x``. 

112 ``import_object('x.y.z')`` is equivalent to ``from x.y import z``. 

113 

114 >>> import tornado.escape 

115 >>> import_object('tornado.escape') is tornado.escape 

116 True 

117 >>> import_object('tornado.escape.utf8') is tornado.escape.utf8 

118 True 

119 >>> import_object('tornado') is tornado 

120 True 

121 >>> import_object('tornado.missing_module') 

122 Traceback (most recent call last): 

123 ... 

124 ImportError: No module named missing_module 

125 """ 

126 if name.count(".") == 0: 

127 return __import__(name) 

128 

129 parts = name.split(".") 

130 obj = __import__(".".join(parts[:-1]), fromlist=[parts[-1]]) 

131 try: 

132 return getattr(obj, parts[-1]) 

133 except AttributeError: 

134 raise ImportError("No module named %s" % parts[-1]) 

135 

136 

137def exec_in( 

138 code: Any, glob: Dict[str, Any], loc: Optional[Optional[Mapping[str, Any]]] = None 

139) -> None: 

140 if isinstance(code, str): 

141 # exec(string) inherits the caller's future imports; compile 

142 # the string first to prevent that. 

143 code = compile(code, "<string>", "exec", dont_inherit=True) 

144 exec(code, glob, loc) 

145 

146 

147def raise_exc_info( 

148 exc_info: Tuple[Optional[type], Optional[BaseException], Optional["TracebackType"]] 

149) -> typing.NoReturn: 

150 try: 

151 if exc_info[1] is not None: 

152 raise exc_info[1].with_traceback(exc_info[2]) 

153 else: 

154 raise TypeError("raise_exc_info called with no exception") 

155 finally: 

156 # Clear the traceback reference from our stack frame to 

157 # minimize circular references that slow down GC. 

158 exc_info = (None, None, None) 

159 

160 

161def errno_from_exception(e: BaseException) -> Optional[int]: 

162 """Provides the errno from an Exception object. 

163 

164 There are cases that the errno attribute was not set so we pull 

165 the errno out of the args but if someone instantiates an Exception 

166 without any args you will get a tuple error. So this function 

167 abstracts all that behavior to give you a safe way to get the 

168 errno. 

169 """ 

170 

171 if hasattr(e, "errno"): 

172 return e.errno # type: ignore 

173 elif e.args: 

174 return e.args[0] 

175 else: 

176 return None 

177 

178 

179_alphanum = frozenset("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") 

180 

181 

182def _re_unescape_replacement(match: Match[str]) -> str: 

183 group = match.group(1) 

184 if group[0] in _alphanum: 

185 raise ValueError("cannot unescape '\\\\%s'" % group[0]) 

186 return group 

187 

188 

189_re_unescape_pattern = re.compile(r"\\(.)", re.DOTALL) 

190 

191 

192def re_unescape(s: str) -> str: 

193 r"""Unescape a string escaped by `re.escape`. 

194 

195 May raise ``ValueError`` for regular expressions which could not 

196 have been produced by `re.escape` (for example, strings containing 

197 ``\d`` cannot be unescaped). 

198 

199 .. versionadded:: 4.4 

200 """ 

201 return _re_unescape_pattern.sub(_re_unescape_replacement, s) 

202 

203 

204class Configurable: 

205 """Base class for configurable interfaces. 

206 

207 A configurable interface is an (abstract) class whose constructor 

208 acts as a factory function for one of its implementation subclasses. 

209 The implementation subclass as well as optional keyword arguments to 

210 its initializer can be set globally at runtime with `configure`. 

211 

212 By using the constructor as the factory method, the interface 

213 looks like a normal class, `isinstance` works as usual, etc. This 

214 pattern is most useful when the choice of implementation is likely 

215 to be a global decision (e.g. when `~select.epoll` is available, 

216 always use it instead of `~select.select`), or when a 

217 previously-monolithic class has been split into specialized 

218 subclasses. 

219 

220 Configurable subclasses must define the class methods 

221 `configurable_base` and `configurable_default`, and use the instance 

222 method `initialize` instead of ``__init__``. 

223 

224 .. versionchanged:: 5.0 

225 

226 It is now possible for configuration to be specified at 

227 multiple levels of a class hierarchy. 

228 

229 """ 

230 

231 # Type annotations on this class are mostly done with comments 

232 # because they need to refer to Configurable, which isn't defined 

233 # until after the class definition block. These can use regular 

234 # annotations when our minimum python version is 3.7. 

235 # 

236 # There may be a clever way to use generics here to get more 

237 # precise types (i.e. for a particular Configurable subclass T, 

238 # all the types are subclasses of T, not just Configurable). 

239 __impl_class = None # type: Optional[Type[Configurable]] 

240 __impl_kwargs = None # type: Dict[str, Any] 

241 

242 def __new__(cls, *args: Any, **kwargs: Any) -> Any: 

243 base = cls.configurable_base() 

244 init_kwargs = {} # type: Dict[str, Any] 

245 if cls is base: 

246 impl = cls.configured_class() 

247 if base.__impl_kwargs: 

248 init_kwargs.update(base.__impl_kwargs) 

249 else: 

250 impl = cls 

251 init_kwargs.update(kwargs) 

252 if impl.configurable_base() is not base: 

253 # The impl class is itself configurable, so recurse. 

254 return impl(*args, **init_kwargs) 

255 instance = super().__new__(impl) 

256 # initialize vs __init__ chosen for compatibility with AsyncHTTPClient 

257 # singleton magic. If we get rid of that we can switch to __init__ 

258 # here too. 

259 instance.initialize(*args, **init_kwargs) 

260 return instance 

261 

262 @classmethod 

263 def configurable_base(cls): 

264 # type: () -> Type[Configurable] 

265 """Returns the base class of a configurable hierarchy. 

266 

267 This will normally return the class in which it is defined. 

268 (which is *not* necessarily the same as the ``cls`` classmethod 

269 parameter). 

270 

271 """ 

272 raise NotImplementedError() 

273 

274 @classmethod 

275 def configurable_default(cls): 

276 # type: () -> Type[Configurable] 

277 """Returns the implementation class to be used if none is configured.""" 

278 raise NotImplementedError() 

279 

280 def _initialize(self) -> None: 

281 pass 

282 

283 initialize = _initialize # type: Callable[..., None] 

284 """Initialize a `Configurable` subclass instance. 

285 

286 Configurable classes should use `initialize` instead of ``__init__``. 

287 

288 .. versionchanged:: 4.2 

289 Now accepts positional arguments in addition to keyword arguments. 

290 """ 

291 

292 @classmethod 

293 def configure(cls, impl, **kwargs): 

294 # type: (Union[None, str, Type[Configurable]], Any) -> None 

295 """Sets the class to use when the base class is instantiated. 

296 

297 Keyword arguments will be saved and added to the arguments passed 

298 to the constructor. This can be used to set global defaults for 

299 some parameters. 

300 """ 

301 base = cls.configurable_base() 

302 if isinstance(impl, str): 

303 impl = typing.cast(Type[Configurable], import_object(impl)) 

304 if impl is not None and not issubclass(impl, cls): 

305 raise ValueError("Invalid subclass of %s" % cls) 

306 base.__impl_class = impl 

307 base.__impl_kwargs = kwargs 

308 

309 @classmethod 

310 def configured_class(cls): 

311 # type: () -> Type[Configurable] 

312 """Returns the currently configured class.""" 

313 base = cls.configurable_base() 

314 # Manually mangle the private name to see whether this base 

315 # has been configured (and not another base higher in the 

316 # hierarchy). 

317 if base.__dict__.get("_Configurable__impl_class") is None: 

318 base.__impl_class = cls.configurable_default() 

319 if base.__impl_class is not None: 

320 return base.__impl_class 

321 else: 

322 # Should be impossible, but mypy wants an explicit check. 

323 raise ValueError("configured class not found") 

324 

325 @classmethod 

326 def _save_configuration(cls): 

327 # type: () -> Tuple[Optional[Type[Configurable]], Dict[str, Any]] 

328 base = cls.configurable_base() 

329 return (base.__impl_class, base.__impl_kwargs) 

330 

331 @classmethod 

332 def _restore_configuration(cls, saved): 

333 # type: (Tuple[Optional[Type[Configurable]], Dict[str, Any]]) -> None 

334 base = cls.configurable_base() 

335 base.__impl_class = saved[0] 

336 base.__impl_kwargs = saved[1] 

337 

338 

339class ArgReplacer: 

340 """Replaces one value in an ``args, kwargs`` pair. 

341 

342 Inspects the function signature to find an argument by name 

343 whether it is passed by position or keyword. For use in decorators 

344 and similar wrappers. 

345 """ 

346 

347 def __init__(self, func: Callable, name: str) -> None: 

348 self.name = name 

349 try: 

350 self.arg_pos = self._getargnames(func).index(name) # type: Optional[int] 

351 except ValueError: 

352 # Not a positional parameter 

353 self.arg_pos = None 

354 

355 def _getargnames(self, func: Callable) -> List[str]: 

356 try: 

357 return getfullargspec(func).args 

358 except TypeError: 

359 if hasattr(func, "func_code"): 

360 # Cython-generated code has all the attributes needed 

361 # by inspect.getfullargspec, but the inspect module only 

362 # works with ordinary functions. Inline the portion of 

363 # getfullargspec that we need here. Note that for static 

364 # functions the @cython.binding(True) decorator must 

365 # be used (for methods it works out of the box). 

366 code = func.func_code # type: ignore 

367 return code.co_varnames[: code.co_argcount] 

368 raise 

369 

370 def get_old_value( 

371 self, args: Sequence[Any], kwargs: Dict[str, Any], default: Any = None 

372 ) -> Any: 

373 """Returns the old value of the named argument without replacing it. 

374 

375 Returns ``default`` if the argument is not present. 

376 """ 

377 if self.arg_pos is not None and len(args) > self.arg_pos: 

378 return args[self.arg_pos] 

379 else: 

380 return kwargs.get(self.name, default) 

381 

382 def replace( 

383 self, new_value: Any, args: Sequence[Any], kwargs: Dict[str, Any] 

384 ) -> Tuple[Any, Sequence[Any], Dict[str, Any]]: 

385 """Replace the named argument in ``args, kwargs`` with ``new_value``. 

386 

387 Returns ``(old_value, args, kwargs)``. The returned ``args`` and 

388 ``kwargs`` objects may not be the same as the input objects, or 

389 the input objects may be mutated. 

390 

391 If the named argument was not found, ``new_value`` will be added 

392 to ``kwargs`` and None will be returned as ``old_value``. 

393 """ 

394 if self.arg_pos is not None and len(args) > self.arg_pos: 

395 # The arg to replace is passed positionally 

396 old_value = args[self.arg_pos] 

397 args = list(args) # *args is normally a tuple 

398 args[self.arg_pos] = new_value 

399 else: 

400 # The arg to replace is either omitted or passed by keyword. 

401 old_value = kwargs.get(self.name) 

402 kwargs[self.name] = new_value 

403 return old_value, args, kwargs 

404 

405 

406def timedelta_to_seconds(td): 

407 # type: (datetime.timedelta) -> float 

408 """Equivalent to ``td.total_seconds()`` (introduced in Python 2.7).""" 

409 return td.total_seconds() 

410 

411 

412def _websocket_mask_python(mask: bytes, data: bytes) -> bytes: 

413 """Websocket masking function. 

414 

415 `mask` is a `bytes` object of length 4; `data` is a `bytes` object of any length. 

416 Returns a `bytes` object of the same length as `data` with the mask applied 

417 as specified in section 5.3 of RFC 6455. 

418 

419 This pure-python implementation may be replaced by an optimized version when available. 

420 """ 

421 mask_arr = array.array("B", mask) 

422 unmasked_arr = array.array("B", data) 

423 for i in range(len(data)): 

424 unmasked_arr[i] = unmasked_arr[i] ^ mask_arr[i % 4] 

425 return unmasked_arr.tobytes() 

426 

427 

428if os.environ.get("TORNADO_NO_EXTENSION") or os.environ.get("TORNADO_EXTENSION") == "0": 

429 # These environment variables exist to make it easier to do performance 

430 # comparisons; they are not guaranteed to remain supported in the future. 

431 _websocket_mask = _websocket_mask_python 

432else: 

433 try: 

434 from tornado.speedups import websocket_mask as _websocket_mask 

435 except ImportError: 

436 if os.environ.get("TORNADO_EXTENSION") == "1": 

437 raise 

438 _websocket_mask = _websocket_mask_python 

439 

440 

441def doctests(): 

442 # type: () -> unittest.TestSuite 

443 import doctest 

444 

445 return doctest.DocTestSuite()