Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/tornado/util.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

178 statements  

1"""Miscellaneous utility functions and classes. 

2 

3This module is used internally by Tornado. It is not necessarily expected 

4that the functions and classes defined here will be useful to other 

5applications, but they are documented here in case they are. 

6 

7The one public-facing part of this module is the `Configurable` class 

8and its `~Configurable.configure` method, which becomes a part of the 

9interface of its subclasses, including `.AsyncHTTPClient`, `.IOLoop`, 

10and `.Resolver`. 

11""" 

12 

13import array 

14import asyncio 

15import atexit 

16from inspect import getfullargspec 

17import os 

18import re 

19import typing 

20import zlib 

21 

22from typing import ( 

23 Any, 

24 Optional, 

25 Dict, 

26 Mapping, 

27 List, 

28 Tuple, 

29 Match, 

30 Callable, 

31 Type, 

32 Sequence, 

33) 

34 

35if typing.TYPE_CHECKING: 

36 # Additional imports only used in type comments. 

37 # This lets us make these imports lazy. 

38 import datetime # noqa: F401 

39 from types import TracebackType # noqa: F401 

40 from typing import Union # noqa: F401 

41 import unittest # noqa: F401 

42 

43# Aliases for types that are spelled differently in different Python 

44# versions. bytes_type is deprecated and no longer used in Tornado 

45# itself but is left in case anyone outside Tornado is using it. 

46bytes_type = bytes 

47unicode_type = str 

48basestring_type = str 

49 

50try: 

51 from sys import is_finalizing 

52except ImportError: 

53 # Emulate it 

54 def _get_emulated_is_finalizing() -> Callable[[], bool]: 

55 L = [] # type: List[None] 

56 atexit.register(lambda: L.append(None)) 

57 

58 def is_finalizing() -> bool: 

59 # Not referencing any globals here 

60 return L != [] 

61 

62 return is_finalizing 

63 

64 is_finalizing = _get_emulated_is_finalizing() 

65 

66 

67# versionchanged:: 6.2 

68# no longer our own TimeoutError, use standard asyncio class 

69TimeoutError = asyncio.TimeoutError 

70 

71 

72class ObjectDict(Dict[str, Any]): 

73 """Makes a dictionary behave like an object, with attribute-style access.""" 

74 

75 def __getattr__(self, name: str) -> Any: 

76 try: 

77 return self[name] 

78 except KeyError: 

79 raise AttributeError(name) 

80 

81 def __setattr__(self, name: str, value: Any) -> None: 

82 self[name] = value 

83 

84 

85class GzipDecompressor(object): 

86 """Streaming gzip decompressor. 

87 

88 The interface is like that of `zlib.decompressobj` (without some of the 

89 optional arguments, but it understands gzip headers and checksums. 

90 """ 

91 

92 def __init__(self) -> None: 

93 # Magic parameter makes zlib module understand gzip header 

94 # http://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib 

95 # This works on cpython and pypy, but not jython. 

96 self.decompressobj = zlib.decompressobj(16 + zlib.MAX_WBITS) 

97 

98 def decompress(self, value: bytes, max_length: int = 0) -> bytes: 

99 """Decompress a chunk, returning newly-available data. 

100 

101 Some data may be buffered for later processing; `flush` must 

102 be called when there is no more input data to ensure that 

103 all data was processed. 

104 

105 If ``max_length`` is given, some input data may be left over 

106 in ``unconsumed_tail``; you must retrieve this value and pass 

107 it back to a future call to `decompress` if it is not empty. 

108 """ 

109 return self.decompressobj.decompress(value, max_length) 

110 

111 @property 

112 def unconsumed_tail(self) -> bytes: 

113 """Returns the unconsumed portion left over""" 

114 return self.decompressobj.unconsumed_tail 

115 

116 def flush(self) -> bytes: 

117 """Return any remaining buffered data not yet returned by decompress. 

118 

119 Also checks for errors such as truncated input. 

120 No other methods may be called on this object after `flush`. 

121 """ 

122 return self.decompressobj.flush() 

123 

124 

125def import_object(name: str) -> Any: 

126 """Imports an object by name. 

127 

128 ``import_object('x')`` is equivalent to ``import x``. 

129 ``import_object('x.y.z')`` is equivalent to ``from x.y import z``. 

130 

131 >>> import tornado.escape 

132 >>> import_object('tornado.escape') is tornado.escape 

133 True 

134 >>> import_object('tornado.escape.utf8') is tornado.escape.utf8 

135 True 

136 >>> import_object('tornado') is tornado 

137 True 

138 >>> import_object('tornado.missing_module') 

139 Traceback (most recent call last): 

140 ... 

141 ImportError: No module named missing_module 

142 """ 

143 if name.count(".") == 0: 

144 return __import__(name) 

145 

146 parts = name.split(".") 

147 obj = __import__(".".join(parts[:-1]), fromlist=[parts[-1]]) 

148 try: 

149 return getattr(obj, parts[-1]) 

150 except AttributeError: 

151 raise ImportError("No module named %s" % parts[-1]) 

152 

153 

154def exec_in( 

155 code: Any, glob: Dict[str, Any], loc: Optional[Optional[Mapping[str, Any]]] = None 

156) -> None: 

157 if isinstance(code, str): 

158 # exec(string) inherits the caller's future imports; compile 

159 # the string first to prevent that. 

160 code = compile(code, "<string>", "exec", dont_inherit=True) 

161 exec(code, glob, loc) 

162 

163 

164def raise_exc_info( 

165 exc_info: Tuple[Optional[type], Optional[BaseException], Optional["TracebackType"]] 

166) -> typing.NoReturn: 

167 try: 

168 if exc_info[1] is not None: 

169 raise exc_info[1].with_traceback(exc_info[2]) 

170 else: 

171 raise TypeError("raise_exc_info called with no exception") 

172 finally: 

173 # Clear the traceback reference from our stack frame to 

174 # minimize circular references that slow down GC. 

175 exc_info = (None, None, None) 

176 

177 

178def errno_from_exception(e: BaseException) -> Optional[int]: 

179 """Provides the errno from an Exception object. 

180 

181 There are cases that the errno attribute was not set so we pull 

182 the errno out of the args but if someone instantiates an Exception 

183 without any args you will get a tuple error. So this function 

184 abstracts all that behavior to give you a safe way to get the 

185 errno. 

186 """ 

187 

188 if hasattr(e, "errno"): 

189 return e.errno # type: ignore 

190 elif e.args: 

191 return e.args[0] 

192 else: 

193 return None 

194 

195 

196_alphanum = frozenset("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789") 

197 

198 

199def _re_unescape_replacement(match: Match[str]) -> str: 

200 group = match.group(1) 

201 if group[0] in _alphanum: 

202 raise ValueError("cannot unescape '\\\\%s'" % group[0]) 

203 return group 

204 

205 

206_re_unescape_pattern = re.compile(r"\\(.)", re.DOTALL) 

207 

208 

209def re_unescape(s: str) -> str: 

210 r"""Unescape a string escaped by `re.escape`. 

211 

212 May raise ``ValueError`` for regular expressions which could not 

213 have been produced by `re.escape` (for example, strings containing 

214 ``\d`` cannot be unescaped). 

215 

216 .. versionadded:: 4.4 

217 """ 

218 return _re_unescape_pattern.sub(_re_unescape_replacement, s) 

219 

220 

221class Configurable(object): 

222 """Base class for configurable interfaces. 

223 

224 A configurable interface is an (abstract) class whose constructor 

225 acts as a factory function for one of its implementation subclasses. 

226 The implementation subclass as well as optional keyword arguments to 

227 its initializer can be set globally at runtime with `configure`. 

228 

229 By using the constructor as the factory method, the interface 

230 looks like a normal class, `isinstance` works as usual, etc. This 

231 pattern is most useful when the choice of implementation is likely 

232 to be a global decision (e.g. when `~select.epoll` is available, 

233 always use it instead of `~select.select`), or when a 

234 previously-monolithic class has been split into specialized 

235 subclasses. 

236 

237 Configurable subclasses must define the class methods 

238 `configurable_base` and `configurable_default`, and use the instance 

239 method `initialize` instead of ``__init__``. 

240 

241 .. versionchanged:: 5.0 

242 

243 It is now possible for configuration to be specified at 

244 multiple levels of a class hierarchy. 

245 

246 """ 

247 

248 # Type annotations on this class are mostly done with comments 

249 # because they need to refer to Configurable, which isn't defined 

250 # until after the class definition block. These can use regular 

251 # annotations when our minimum python version is 3.7. 

252 # 

253 # There may be a clever way to use generics here to get more 

254 # precise types (i.e. for a particular Configurable subclass T, 

255 # all the types are subclasses of T, not just Configurable). 

256 __impl_class = None # type: Optional[Type[Configurable]] 

257 __impl_kwargs = None # type: Dict[str, Any] 

258 

259 def __new__(cls, *args: Any, **kwargs: Any) -> Any: 

260 base = cls.configurable_base() 

261 init_kwargs = {} # type: Dict[str, Any] 

262 if cls is base: 

263 impl = cls.configured_class() 

264 if base.__impl_kwargs: 

265 init_kwargs.update(base.__impl_kwargs) 

266 else: 

267 impl = cls 

268 init_kwargs.update(kwargs) 

269 if impl.configurable_base() is not base: 

270 # The impl class is itself configurable, so recurse. 

271 return impl(*args, **init_kwargs) 

272 instance = super(Configurable, cls).__new__(impl) 

273 # initialize vs __init__ chosen for compatibility with AsyncHTTPClient 

274 # singleton magic. If we get rid of that we can switch to __init__ 

275 # here too. 

276 instance.initialize(*args, **init_kwargs) 

277 return instance 

278 

279 @classmethod 

280 def configurable_base(cls): 

281 # type: () -> Type[Configurable] 

282 """Returns the base class of a configurable hierarchy. 

283 

284 This will normally return the class in which it is defined. 

285 (which is *not* necessarily the same as the ``cls`` classmethod 

286 parameter). 

287 

288 """ 

289 raise NotImplementedError() 

290 

291 @classmethod 

292 def configurable_default(cls): 

293 # type: () -> Type[Configurable] 

294 """Returns the implementation class to be used if none is configured.""" 

295 raise NotImplementedError() 

296 

297 def _initialize(self) -> None: 

298 pass 

299 

300 initialize = _initialize # type: Callable[..., None] 

301 """Initialize a `Configurable` subclass instance. 

302 

303 Configurable classes should use `initialize` instead of ``__init__``. 

304 

305 .. versionchanged:: 4.2 

306 Now accepts positional arguments in addition to keyword arguments. 

307 """ 

308 

309 @classmethod 

310 def configure(cls, impl, **kwargs): 

311 # type: (Union[None, str, Type[Configurable]], Any) -> None 

312 """Sets the class to use when the base class is instantiated. 

313 

314 Keyword arguments will be saved and added to the arguments passed 

315 to the constructor. This can be used to set global defaults for 

316 some parameters. 

317 """ 

318 base = cls.configurable_base() 

319 if isinstance(impl, str): 

320 impl = typing.cast(Type[Configurable], import_object(impl)) 

321 if impl is not None and not issubclass(impl, cls): 

322 raise ValueError("Invalid subclass of %s" % cls) 

323 base.__impl_class = impl 

324 base.__impl_kwargs = kwargs 

325 

326 @classmethod 

327 def configured_class(cls): 

328 # type: () -> Type[Configurable] 

329 """Returns the currently configured class.""" 

330 base = cls.configurable_base() 

331 # Manually mangle the private name to see whether this base 

332 # has been configured (and not another base higher in the 

333 # hierarchy). 

334 if base.__dict__.get("_Configurable__impl_class") is None: 

335 base.__impl_class = cls.configurable_default() 

336 if base.__impl_class is not None: 

337 return base.__impl_class 

338 else: 

339 # Should be impossible, but mypy wants an explicit check. 

340 raise ValueError("configured class not found") 

341 

342 @classmethod 

343 def _save_configuration(cls): 

344 # type: () -> Tuple[Optional[Type[Configurable]], Dict[str, Any]] 

345 base = cls.configurable_base() 

346 return (base.__impl_class, base.__impl_kwargs) 

347 

348 @classmethod 

349 def _restore_configuration(cls, saved): 

350 # type: (Tuple[Optional[Type[Configurable]], Dict[str, Any]]) -> None 

351 base = cls.configurable_base() 

352 base.__impl_class = saved[0] 

353 base.__impl_kwargs = saved[1] 

354 

355 

356class ArgReplacer(object): 

357 """Replaces one value in an ``args, kwargs`` pair. 

358 

359 Inspects the function signature to find an argument by name 

360 whether it is passed by position or keyword. For use in decorators 

361 and similar wrappers. 

362 """ 

363 

364 def __init__(self, func: Callable, name: str) -> None: 

365 self.name = name 

366 try: 

367 self.arg_pos = self._getargnames(func).index(name) # type: Optional[int] 

368 except ValueError: 

369 # Not a positional parameter 

370 self.arg_pos = None 

371 

372 def _getargnames(self, func: Callable) -> List[str]: 

373 try: 

374 return getfullargspec(func).args 

375 except TypeError: 

376 if hasattr(func, "func_code"): 

377 # Cython-generated code has all the attributes needed 

378 # by inspect.getfullargspec, but the inspect module only 

379 # works with ordinary functions. Inline the portion of 

380 # getfullargspec that we need here. Note that for static 

381 # functions the @cython.binding(True) decorator must 

382 # be used (for methods it works out of the box). 

383 code = func.func_code # type: ignore 

384 return code.co_varnames[: code.co_argcount] 

385 raise 

386 

387 def get_old_value( 

388 self, args: Sequence[Any], kwargs: Dict[str, Any], default: Any = None 

389 ) -> Any: 

390 """Returns the old value of the named argument without replacing it. 

391 

392 Returns ``default`` if the argument is not present. 

393 """ 

394 if self.arg_pos is not None and len(args) > self.arg_pos: 

395 return args[self.arg_pos] 

396 else: 

397 return kwargs.get(self.name, default) 

398 

399 def replace( 

400 self, new_value: Any, args: Sequence[Any], kwargs: Dict[str, Any] 

401 ) -> Tuple[Any, Sequence[Any], Dict[str, Any]]: 

402 """Replace the named argument in ``args, kwargs`` with ``new_value``. 

403 

404 Returns ``(old_value, args, kwargs)``. The returned ``args`` and 

405 ``kwargs`` objects may not be the same as the input objects, or 

406 the input objects may be mutated. 

407 

408 If the named argument was not found, ``new_value`` will be added 

409 to ``kwargs`` and None will be returned as ``old_value``. 

410 """ 

411 if self.arg_pos is not None and len(args) > self.arg_pos: 

412 # The arg to replace is passed positionally 

413 old_value = args[self.arg_pos] 

414 args = list(args) # *args is normally a tuple 

415 args[self.arg_pos] = new_value 

416 else: 

417 # The arg to replace is either omitted or passed by keyword. 

418 old_value = kwargs.get(self.name) 

419 kwargs[self.name] = new_value 

420 return old_value, args, kwargs 

421 

422 

423def timedelta_to_seconds(td): 

424 # type: (datetime.timedelta) -> float 

425 """Equivalent to ``td.total_seconds()`` (introduced in Python 2.7).""" 

426 return td.total_seconds() 

427 

428 

429def _websocket_mask_python(mask: bytes, data: bytes) -> bytes: 

430 """Websocket masking function. 

431 

432 `mask` is a `bytes` object of length 4; `data` is a `bytes` object of any length. 

433 Returns a `bytes` object of the same length as `data` with the mask applied 

434 as specified in section 5.3 of RFC 6455. 

435 

436 This pure-python implementation may be replaced by an optimized version when available. 

437 """ 

438 mask_arr = array.array("B", mask) 

439 unmasked_arr = array.array("B", data) 

440 for i in range(len(data)): 

441 unmasked_arr[i] = unmasked_arr[i] ^ mask_arr[i % 4] 

442 return unmasked_arr.tobytes() 

443 

444 

445if os.environ.get("TORNADO_NO_EXTENSION") or os.environ.get("TORNADO_EXTENSION") == "0": 

446 # These environment variables exist to make it easier to do performance 

447 # comparisons; they are not guaranteed to remain supported in the future. 

448 _websocket_mask = _websocket_mask_python 

449else: 

450 try: 

451 from tornado.speedups import websocket_mask as _websocket_mask 

452 except ImportError: 

453 if os.environ.get("TORNADO_EXTENSION") == "1": 

454 raise 

455 _websocket_mask = _websocket_mask_python 

456 

457 

458def doctests(): 

459 # type: () -> unittest.TestSuite 

460 import doctest 

461 

462 return doctest.DocTestSuite()