Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/tornado/util.py: 36%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Miscellaneous utility functions and classes.
3This module is used internally by Tornado. It is not necessarily expected
4that the functions and classes defined here will be useful to other
5applications, but they are documented here in case they are.
7The one public-facing part of this module is the `Configurable` class
8and its `~Configurable.configure` method, which becomes a part of the
9interface of its subclasses, including `.AsyncHTTPClient`, `.IOLoop`,
10and `.Resolver`.
11"""
13import array
14import asyncio
15from inspect import getfullargspec
16import os
17import re
18import typing
19import zlib
21from typing import (
22 Any,
23 Optional,
24 Dict,
25 Mapping,
26 List,
27 Tuple,
28 Match,
29 Callable,
30 Type,
31 Sequence,
32)
34if typing.TYPE_CHECKING:
35 # Additional imports only used in type comments.
36 # This lets us make these imports lazy.
37 import datetime # noqa: F401
38 from types import TracebackType # noqa: F401
39 from typing import Union # noqa: F401
40 import unittest # noqa: F401
42# Aliases for types that are spelled differently in different Python
43# versions. bytes_type is deprecated and no longer used in Tornado
44# itself but is left in case anyone outside Tornado is using it.
45bytes_type = bytes
46unicode_type = str
47basestring_type = str
50# versionchanged:: 6.2
51# no longer our own TimeoutError, use standard asyncio class
52TimeoutError = asyncio.TimeoutError
55class ObjectDict(Dict[str, Any]):
56 """Makes a dictionary behave like an object, with attribute-style access."""
58 def __getattr__(self, name: str) -> Any:
59 try:
60 return self[name]
61 except KeyError:
62 raise AttributeError(name)
64 def __setattr__(self, name: str, value: Any) -> None:
65 self[name] = value
68class GzipDecompressor:
69 """Streaming gzip decompressor.
71 The interface is like that of `zlib.decompressobj` (without some of the
72 optional arguments, but it understands gzip headers and checksums.
73 """
75 def __init__(self) -> None:
76 # Magic parameter makes zlib module understand gzip header
77 # http://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib
78 # This works on cpython and pypy, but not jython.
79 self.decompressobj = zlib.decompressobj(16 + zlib.MAX_WBITS)
81 def decompress(self, value: bytes, max_length: int = 0) -> bytes:
82 """Decompress a chunk, returning newly-available data.
84 Some data may be buffered for later processing; `flush` must
85 be called when there is no more input data to ensure that
86 all data was processed.
88 If ``max_length`` is given, some input data may be left over
89 in ``unconsumed_tail``; you must retrieve this value and pass
90 it back to a future call to `decompress` if it is not empty.
91 """
92 return self.decompressobj.decompress(value, max_length)
94 @property
95 def unconsumed_tail(self) -> bytes:
96 """Returns the unconsumed portion left over"""
97 return self.decompressobj.unconsumed_tail
99 def flush(self) -> bytes:
100 """Return any remaining buffered data not yet returned by decompress.
102 Also checks for errors such as truncated input.
103 No other methods may be called on this object after `flush`.
104 """
105 return self.decompressobj.flush()
108def import_object(name: str) -> Any:
109 """Imports an object by name.
111 ``import_object('x')`` is equivalent to ``import x``.
112 ``import_object('x.y.z')`` is equivalent to ``from x.y import z``.
114 >>> import tornado.escape
115 >>> import_object('tornado.escape') is tornado.escape
116 True
117 >>> import_object('tornado.escape.utf8') is tornado.escape.utf8
118 True
119 >>> import_object('tornado') is tornado
120 True
121 >>> import_object('tornado.missing_module')
122 Traceback (most recent call last):
123 ...
124 ImportError: No module named missing_module
125 """
126 if name.count(".") == 0:
127 return __import__(name)
129 parts = name.split(".")
130 obj = __import__(".".join(parts[:-1]), fromlist=[parts[-1]])
131 try:
132 return getattr(obj, parts[-1])
133 except AttributeError:
134 raise ImportError("No module named %s" % parts[-1])
137def exec_in(
138 code: Any, glob: Dict[str, Any], loc: Optional[Optional[Mapping[str, Any]]] = None
139) -> None:
140 if isinstance(code, str):
141 # exec(string) inherits the caller's future imports; compile
142 # the string first to prevent that.
143 code = compile(code, "<string>", "exec", dont_inherit=True)
144 exec(code, glob, loc)
147def raise_exc_info(
148 exc_info: Tuple[Optional[type], Optional[BaseException], Optional["TracebackType"]]
149) -> typing.NoReturn:
150 try:
151 if exc_info[1] is not None:
152 raise exc_info[1].with_traceback(exc_info[2])
153 else:
154 raise TypeError("raise_exc_info called with no exception")
155 finally:
156 # Clear the traceback reference from our stack frame to
157 # minimize circular references that slow down GC.
158 exc_info = (None, None, None)
161def errno_from_exception(e: BaseException) -> Optional[int]:
162 """Provides the errno from an Exception object.
164 There are cases that the errno attribute was not set so we pull
165 the errno out of the args but if someone instantiates an Exception
166 without any args you will get a tuple error. So this function
167 abstracts all that behavior to give you a safe way to get the
168 errno.
169 """
171 if hasattr(e, "errno"):
172 return e.errno # type: ignore
173 elif e.args:
174 return e.args[0]
175 else:
176 return None
179_alphanum = frozenset("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
182def _re_unescape_replacement(match: Match[str]) -> str:
183 group = match.group(1)
184 if group[0] in _alphanum:
185 raise ValueError("cannot unescape '\\\\%s'" % group[0])
186 return group
189_re_unescape_pattern = re.compile(r"\\(.)", re.DOTALL)
192def re_unescape(s: str) -> str:
193 r"""Unescape a string escaped by `re.escape`.
195 May raise ``ValueError`` for regular expressions which could not
196 have been produced by `re.escape` (for example, strings containing
197 ``\d`` cannot be unescaped).
199 .. versionadded:: 4.4
200 """
201 return _re_unescape_pattern.sub(_re_unescape_replacement, s)
204class Configurable:
205 """Base class for configurable interfaces.
207 A configurable interface is an (abstract) class whose constructor
208 acts as a factory function for one of its implementation subclasses.
209 The implementation subclass as well as optional keyword arguments to
210 its initializer can be set globally at runtime with `configure`.
212 By using the constructor as the factory method, the interface
213 looks like a normal class, `isinstance` works as usual, etc. This
214 pattern is most useful when the choice of implementation is likely
215 to be a global decision (e.g. when `~select.epoll` is available,
216 always use it instead of `~select.select`), or when a
217 previously-monolithic class has been split into specialized
218 subclasses.
220 Configurable subclasses must define the class methods
221 `configurable_base` and `configurable_default`, and use the instance
222 method `initialize` instead of ``__init__``.
224 .. versionchanged:: 5.0
226 It is now possible for configuration to be specified at
227 multiple levels of a class hierarchy.
229 """
231 # Type annotations on this class are mostly done with comments
232 # because they need to refer to Configurable, which isn't defined
233 # until after the class definition block. These can use regular
234 # annotations when our minimum python version is 3.7.
235 #
236 # There may be a clever way to use generics here to get more
237 # precise types (i.e. for a particular Configurable subclass T,
238 # all the types are subclasses of T, not just Configurable).
239 __impl_class = None # type: Optional[Type[Configurable]]
240 __impl_kwargs = None # type: Dict[str, Any]
242 def __new__(cls, *args: Any, **kwargs: Any) -> Any:
243 base = cls.configurable_base()
244 init_kwargs = {} # type: Dict[str, Any]
245 if cls is base:
246 impl = cls.configured_class()
247 if base.__impl_kwargs:
248 init_kwargs.update(base.__impl_kwargs)
249 else:
250 impl = cls
251 init_kwargs.update(kwargs)
252 if impl.configurable_base() is not base:
253 # The impl class is itself configurable, so recurse.
254 return impl(*args, **init_kwargs)
255 instance = super().__new__(impl)
256 # initialize vs __init__ chosen for compatibility with AsyncHTTPClient
257 # singleton magic. If we get rid of that we can switch to __init__
258 # here too.
259 instance.initialize(*args, **init_kwargs)
260 return instance
262 @classmethod
263 def configurable_base(cls):
264 # type: () -> Type[Configurable]
265 """Returns the base class of a configurable hierarchy.
267 This will normally return the class in which it is defined.
268 (which is *not* necessarily the same as the ``cls`` classmethod
269 parameter).
271 """
272 raise NotImplementedError()
274 @classmethod
275 def configurable_default(cls):
276 # type: () -> Type[Configurable]
277 """Returns the implementation class to be used if none is configured."""
278 raise NotImplementedError()
280 def _initialize(self) -> None:
281 pass
283 initialize = _initialize # type: Callable[..., None]
284 """Initialize a `Configurable` subclass instance.
286 Configurable classes should use `initialize` instead of ``__init__``.
288 .. versionchanged:: 4.2
289 Now accepts positional arguments in addition to keyword arguments.
290 """
292 @classmethod
293 def configure(cls, impl, **kwargs):
294 # type: (Union[None, str, Type[Configurable]], Any) -> None
295 """Sets the class to use when the base class is instantiated.
297 Keyword arguments will be saved and added to the arguments passed
298 to the constructor. This can be used to set global defaults for
299 some parameters.
300 """
301 base = cls.configurable_base()
302 if isinstance(impl, str):
303 impl = typing.cast(Type[Configurable], import_object(impl))
304 if impl is not None and not issubclass(impl, cls):
305 raise ValueError("Invalid subclass of %s" % cls)
306 base.__impl_class = impl
307 base.__impl_kwargs = kwargs
309 @classmethod
310 def configured_class(cls):
311 # type: () -> Type[Configurable]
312 """Returns the currently configured class."""
313 base = cls.configurable_base()
314 # Manually mangle the private name to see whether this base
315 # has been configured (and not another base higher in the
316 # hierarchy).
317 if base.__dict__.get("_Configurable__impl_class") is None:
318 base.__impl_class = cls.configurable_default()
319 if base.__impl_class is not None:
320 return base.__impl_class
321 else:
322 # Should be impossible, but mypy wants an explicit check.
323 raise ValueError("configured class not found")
325 @classmethod
326 def _save_configuration(cls):
327 # type: () -> Tuple[Optional[Type[Configurable]], Dict[str, Any]]
328 base = cls.configurable_base()
329 return (base.__impl_class, base.__impl_kwargs)
331 @classmethod
332 def _restore_configuration(cls, saved):
333 # type: (Tuple[Optional[Type[Configurable]], Dict[str, Any]]) -> None
334 base = cls.configurable_base()
335 base.__impl_class = saved[0]
336 base.__impl_kwargs = saved[1]
339class ArgReplacer:
340 """Replaces one value in an ``args, kwargs`` pair.
342 Inspects the function signature to find an argument by name
343 whether it is passed by position or keyword. For use in decorators
344 and similar wrappers.
345 """
347 def __init__(self, func: Callable, name: str) -> None:
348 self.name = name
349 try:
350 self.arg_pos = self._getargnames(func).index(name) # type: Optional[int]
351 except ValueError:
352 # Not a positional parameter
353 self.arg_pos = None
355 def _getargnames(self, func: Callable) -> List[str]:
356 try:
357 return getfullargspec(func).args
358 except TypeError:
359 if hasattr(func, "func_code"):
360 # Cython-generated code has all the attributes needed
361 # by inspect.getfullargspec, but the inspect module only
362 # works with ordinary functions. Inline the portion of
363 # getfullargspec that we need here. Note that for static
364 # functions the @cython.binding(True) decorator must
365 # be used (for methods it works out of the box).
366 code = func.func_code # type: ignore
367 return code.co_varnames[: code.co_argcount]
368 raise
370 def get_old_value(
371 self, args: Sequence[Any], kwargs: Dict[str, Any], default: Any = None
372 ) -> Any:
373 """Returns the old value of the named argument without replacing it.
375 Returns ``default`` if the argument is not present.
376 """
377 if self.arg_pos is not None and len(args) > self.arg_pos:
378 return args[self.arg_pos]
379 else:
380 return kwargs.get(self.name, default)
382 def replace(
383 self, new_value: Any, args: Sequence[Any], kwargs: Dict[str, Any]
384 ) -> Tuple[Any, Sequence[Any], Dict[str, Any]]:
385 """Replace the named argument in ``args, kwargs`` with ``new_value``.
387 Returns ``(old_value, args, kwargs)``. The returned ``args`` and
388 ``kwargs`` objects may not be the same as the input objects, or
389 the input objects may be mutated.
391 If the named argument was not found, ``new_value`` will be added
392 to ``kwargs`` and None will be returned as ``old_value``.
393 """
394 if self.arg_pos is not None and len(args) > self.arg_pos:
395 # The arg to replace is passed positionally
396 old_value = args[self.arg_pos]
397 args = list(args) # *args is normally a tuple
398 args[self.arg_pos] = new_value
399 else:
400 # The arg to replace is either omitted or passed by keyword.
401 old_value = kwargs.get(self.name)
402 kwargs[self.name] = new_value
403 return old_value, args, kwargs
406def timedelta_to_seconds(td):
407 # type: (datetime.timedelta) -> float
408 """Equivalent to ``td.total_seconds()`` (introduced in Python 2.7)."""
409 return td.total_seconds()
412def _websocket_mask_python(mask: bytes, data: bytes) -> bytes:
413 """Websocket masking function.
415 `mask` is a `bytes` object of length 4; `data` is a `bytes` object of any length.
416 Returns a `bytes` object of the same length as `data` with the mask applied
417 as specified in section 5.3 of RFC 6455.
419 This pure-python implementation may be replaced by an optimized version when available.
420 """
421 mask_arr = array.array("B", mask)
422 unmasked_arr = array.array("B", data)
423 for i in range(len(data)):
424 unmasked_arr[i] = unmasked_arr[i] ^ mask_arr[i % 4]
425 return unmasked_arr.tobytes()
428if os.environ.get("TORNADO_NO_EXTENSION") or os.environ.get("TORNADO_EXTENSION") == "0":
429 # These environment variables exist to make it easier to do performance
430 # comparisons; they are not guaranteed to remain supported in the future.
431 _websocket_mask = _websocket_mask_python
432else:
433 try:
434 from tornado.speedups import websocket_mask as _websocket_mask
435 except ImportError:
436 if os.environ.get("TORNADO_EXTENSION") == "1":
437 raise
438 _websocket_mask = _websocket_mask_python
441def doctests():
442 # type: () -> unittest.TestSuite
443 import doctest
445 return doctest.DocTestSuite()