Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/invoke/util.py: 41%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from collections import namedtuple
2from contextlib import contextmanager
3from types import TracebackType
4from typing import Any, Generator, List, IO, Optional, Tuple, Type, Union
5import io
6import logging
7import os
8import threading
9import sys
11# NOTE: This is the canonical location for commonly-used vendored modules,
12# which is the only spot that performs this try/except to allow repackaged
13# Invoke to function (e.g. distro packages which unvendor the vendored bits and
14# thus must import our 'vendored' stuff from the overall environment.)
15# All other uses of Lexicon, etc should do 'from .util import lexicon' etc.
16# Saves us from having to update the same logic in a dozen places.
17# TODO: would this make more sense to put _into_ invoke.vendor? That way, the
18# import lines which now read 'from .util import <third party stuff>' would be
19# more obvious. Requires packagers to leave invoke/vendor/__init__.py alone tho
20try:
21 from .vendor.lexicon import Lexicon # noqa
22 from .vendor import yaml # noqa
23except ImportError:
24 from lexicon import Lexicon # type: ignore[no-redef] # noqa
25 import yaml # type: ignore[no-redef] # noqa
28LOG_FORMAT = "%(name)s.%(module)s.%(funcName)s: %(message)s"
31def enable_logging() -> None:
32 logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
35# Allow from-the-start debugging (vs toggled during load of tasks module) via
36# shell env var.
37if os.environ.get("INVOKE_DEBUG"):
38 enable_logging()
40# Add top level logger functions to global namespace. Meh.
41log = logging.getLogger("invoke")
42debug = log.debug
45def task_name_sort_key(name: str) -> Tuple[List[str], str]:
46 """
47 Return key tuple for use sorting dotted task names, via e.g. `sorted`.
49 .. versionadded:: 1.0
50 """
51 parts = name.split(".")
52 return (
53 # First group/sort by non-leaf path components. This keeps everything
54 # grouped in its hierarchy, and incidentally puts top-level tasks
55 # (whose non-leaf path set is the empty list) first, where we want them
56 parts[:-1],
57 # Then we sort lexicographically by the actual task name
58 parts[-1],
59 )
62# TODO: Make part of public API sometime
63@contextmanager
64def cd(where: str) -> Generator[None, None, None]:
65 cwd = os.getcwd()
66 os.chdir(where)
67 try:
68 yield
69 finally:
70 os.chdir(cwd)
73def has_fileno(stream: IO) -> bool:
74 """
75 Cleanly determine whether ``stream`` has a useful ``.fileno()``.
77 .. note::
78 This function helps determine if a given file-like object can be used
79 with various terminal-oriented modules and functions such as `select`,
80 `termios`, and `tty`. For most of those, a fileno is all that is
81 required; they'll function even if ``stream.isatty()`` is ``False``.
83 :param stream: A file-like object.
85 :returns:
86 ``True`` if ``stream.fileno()`` returns an integer, ``False`` otherwise
87 (this includes when ``stream`` lacks a ``fileno`` method).
89 .. versionadded:: 1.0
90 """
91 try:
92 return isinstance(stream.fileno(), int)
93 except (AttributeError, io.UnsupportedOperation):
94 return False
97def isatty(stream: IO) -> Union[bool, Any]:
98 """
99 Cleanly determine whether ``stream`` is a TTY.
101 Specifically, first try calling ``stream.isatty()``, and if that fails
102 (e.g. due to lacking the method entirely) fallback to `os.isatty`.
104 .. note::
105 Most of the time, we don't actually care about true TTY-ness, but
106 merely whether the stream seems to have a fileno (per `has_fileno`).
107 However, in some cases (notably the use of `pty.fork` to present a
108 local pseudoterminal) we need to tell if a given stream has a valid
109 fileno but *isn't* tied to an actual terminal. Thus, this function.
111 :param stream: A file-like object.
113 :returns:
114 A boolean depending on the result of calling ``.isatty()`` and/or
115 `os.isatty`.
117 .. versionadded:: 1.0
118 """
119 # If there *is* an .isatty, ask it.
120 if hasattr(stream, "isatty") and callable(stream.isatty):
121 return stream.isatty()
122 # If there wasn't, see if it has a fileno, and if so, ask os.isatty
123 elif has_fileno(stream):
124 return os.isatty(stream.fileno())
125 # If we got here, none of the above worked, so it's reasonable to assume
126 # the darn thing isn't a real TTY.
127 return False
130def helpline(obj: object) -> Optional[str]:
131 """
132 Yield an object's first docstring line, or None if there was no docstring.
134 .. versionadded:: 1.0
135 """
136 docstring = obj.__doc__
137 if (
138 not docstring
139 or not docstring.strip()
140 or docstring == type(obj).__doc__
141 ):
142 return None
143 return docstring.lstrip().splitlines()[0]
146class ExceptionHandlingThread(threading.Thread):
147 """
148 Thread handler making it easier for parent to handle thread exceptions.
150 Based in part on Fabric 1's ThreadHandler. See also Fabric GH issue #204.
152 When used directly, can be used in place of a regular ``threading.Thread``.
153 If subclassed, the subclass must do one of:
155 - supply ``target`` to ``__init__``
156 - define ``_run()`` instead of ``run()``
158 This is because this thread's entire point is to wrap behavior around the
159 thread's execution; subclasses could not redefine ``run()`` without
160 breaking that functionality.
162 .. versionadded:: 1.0
163 """
165 def __init__(self, **kwargs: Any) -> None:
166 """
167 Create a new exception-handling thread instance.
169 Takes all regular `threading.Thread` keyword arguments, via
170 ``**kwargs`` for easier display of thread identity when raising
171 captured exceptions.
172 """
173 super().__init__(**kwargs)
174 # No record of why, but Fabric used daemon threads ever since the
175 # switch from select.select, so let's keep doing that.
176 self.daemon = True
177 # Track exceptions raised in run()
178 self.kwargs = kwargs
179 # TODO: legacy cruft that needs to be removed
180 self.exc_info: Optional[
181 Union[
182 Tuple[Type[BaseException], BaseException, TracebackType],
183 Tuple[None, None, None],
184 ]
185 ] = None
187 def run(self) -> None:
188 try:
189 # Allow subclasses implemented using the "override run()'s body"
190 # approach to work, by using _run() instead of run(). If that
191 # doesn't appear to be the case, then assume we're being used
192 # directly and just use super() ourselves.
193 # XXX https://github.com/python/mypy/issues/1424
194 if hasattr(self, "_run") and callable(self._run): # type: ignore
195 # TODO: this could be:
196 # - io worker with no 'result' (always local)
197 # - tunnel worker, also with no 'result' (also always local)
198 # - threaded concurrent run(), sudo(), put(), etc, with a
199 # result (not necessarily local; might want to be a subproc or
200 # whatever eventually)
201 # TODO: so how best to conditionally add a "capture result
202 # value of some kind"?
203 # - update so all use cases use subclassing, add functionality
204 # alongside self.exception() that is for the result of _run()
205 # - split out class that does not care about result of _run()
206 # and let it continue acting like a normal thread (meh)
207 # - assume the run/sudo/etc case will use a queue inside its
208 # worker body, orthogonal to how exception handling works
209 self._run() # type: ignore
210 else:
211 super().run()
212 except BaseException:
213 # Store for actual reraising later
214 self.exc_info = sys.exc_info()
215 # And log now, in case we never get to later (e.g. if executing
216 # program is hung waiting for us to do something)
217 msg = "Encountered exception {!r} in thread for {!r}"
218 # Name is either target function's dunder-name, or just "_run" if
219 # we were run subclass-wise.
220 name = "_run"
221 if "target" in self.kwargs:
222 name = self.kwargs["target"].__name__
223 debug(msg.format(self.exc_info[1], name)) # noqa
225 def exception(self) -> Optional["ExceptionWrapper"]:
226 """
227 If an exception occurred, return an `.ExceptionWrapper` around it.
229 :returns:
230 An `.ExceptionWrapper` managing the result of `sys.exc_info`, if an
231 exception was raised during thread execution. If no exception
232 occurred, returns ``None`` instead.
234 .. versionadded:: 1.0
235 """
236 if self.exc_info is None:
237 return None
238 return ExceptionWrapper(self.kwargs, *self.exc_info)
240 @property
241 def is_dead(self) -> bool:
242 """
243 Returns ``True`` if not alive and has a stored exception.
245 Used to detect threads that have excepted & shut down.
247 .. versionadded:: 1.0
248 """
249 # NOTE: it seems highly unlikely that a thread could still be
250 # is_alive() but also have encountered an exception. But hey. Why not
251 # be thorough?
252 return (not self.is_alive()) and self.exc_info is not None
254 def __repr__(self) -> str:
255 # TODO: beef this up more
256 return str(self.kwargs["target"].__name__)
259# NOTE: ExceptionWrapper defined here, not in exceptions.py, to avoid circular
260# dependency issues (e.g. Failure subclasses need to use some bits from this
261# module...)
262#: A namedtuple wrapping a thread-borne exception & that thread's arguments.
263#: Mostly used as an intermediate between `.ExceptionHandlingThread` (which
264#: preserves initial exceptions) and `.ThreadException` (which holds 1..N such
265#: exceptions, as typically multiple threads are involved.)
266ExceptionWrapper = namedtuple(
267 "ExceptionWrapper", "kwargs type value traceback"
268)