Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/invoke/util.py: 41%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

76 statements  

1from collections import namedtuple 

2from contextlib import contextmanager 

3from types import TracebackType 

4from typing import Any, Generator, List, IO, Optional, Tuple, Type, Union 

5import io 

6import logging 

7import os 

8import threading 

9import sys 

10 

11# NOTE: This is the canonical location for commonly-used vendored modules, 

12# which is the only spot that performs this try/except to allow repackaged 

13# Invoke to function (e.g. distro packages which unvendor the vendored bits and 

14# thus must import our 'vendored' stuff from the overall environment.) 

15# All other uses of Lexicon, etc should do 'from .util import lexicon' etc. 

16# Saves us from having to update the same logic in a dozen places. 

17# TODO: would this make more sense to put _into_ invoke.vendor? That way, the 

18# import lines which now read 'from .util import <third party stuff>' would be 

19# more obvious. Requires packagers to leave invoke/vendor/__init__.py alone tho 

20try: 

21 from .vendor.lexicon import Lexicon # noqa 

22 from .vendor import yaml # noqa 

23except ImportError: 

24 from lexicon import Lexicon # type: ignore[no-redef] # noqa 

25 import yaml # type: ignore[no-redef] # noqa 

26 

27 

28LOG_FORMAT = "%(name)s.%(module)s.%(funcName)s: %(message)s" 

29 

30 

31def enable_logging() -> None: 

32 logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT) 

33 

34 

35# Allow from-the-start debugging (vs toggled during load of tasks module) via 

36# shell env var. 

37if os.environ.get("INVOKE_DEBUG"): 

38 enable_logging() 

39 

40# Add top level logger functions to global namespace. Meh. 

41log = logging.getLogger("invoke") 

42debug = log.debug 

43 

44 

45def task_name_sort_key(name: str) -> Tuple[List[str], str]: 

46 """ 

47 Return key tuple for use sorting dotted task names, via e.g. `sorted`. 

48 

49 .. versionadded:: 1.0 

50 """ 

51 parts = name.split(".") 

52 return ( 

53 # First group/sort by non-leaf path components. This keeps everything 

54 # grouped in its hierarchy, and incidentally puts top-level tasks 

55 # (whose non-leaf path set is the empty list) first, where we want them 

56 parts[:-1], 

57 # Then we sort lexicographically by the actual task name 

58 parts[-1], 

59 ) 

60 

61 

62# TODO: Make part of public API sometime 

63@contextmanager 

64def cd(where: str) -> Generator[None, None, None]: 

65 cwd = os.getcwd() 

66 os.chdir(where) 

67 try: 

68 yield 

69 finally: 

70 os.chdir(cwd) 

71 

72 

73def has_fileno(stream: IO) -> bool: 

74 """ 

75 Cleanly determine whether ``stream`` has a useful ``.fileno()``. 

76 

77 .. note:: 

78 This function helps determine if a given file-like object can be used 

79 with various terminal-oriented modules and functions such as `select`, 

80 `termios`, and `tty`. For most of those, a fileno is all that is 

81 required; they'll function even if ``stream.isatty()`` is ``False``. 

82 

83 :param stream: A file-like object. 

84 

85 :returns: 

86 ``True`` if ``stream.fileno()`` returns an integer, ``False`` otherwise 

87 (this includes when ``stream`` lacks a ``fileno`` method). 

88 

89 .. versionadded:: 1.0 

90 """ 

91 try: 

92 return isinstance(stream.fileno(), int) 

93 except (AttributeError, io.UnsupportedOperation): 

94 return False 

95 

96 

97def isatty(stream: IO) -> Union[bool, Any]: 

98 """ 

99 Cleanly determine whether ``stream`` is a TTY. 

100 

101 Specifically, first try calling ``stream.isatty()``, and if that fails 

102 (e.g. due to lacking the method entirely) fallback to `os.isatty`. 

103 

104 .. note:: 

105 Most of the time, we don't actually care about true TTY-ness, but 

106 merely whether the stream seems to have a fileno (per `has_fileno`). 

107 However, in some cases (notably the use of `pty.fork` to present a 

108 local pseudoterminal) we need to tell if a given stream has a valid 

109 fileno but *isn't* tied to an actual terminal. Thus, this function. 

110 

111 :param stream: A file-like object. 

112 

113 :returns: 

114 A boolean depending on the result of calling ``.isatty()`` and/or 

115 `os.isatty`. 

116 

117 .. versionadded:: 1.0 

118 """ 

119 # If there *is* an .isatty, ask it. 

120 if hasattr(stream, "isatty") and callable(stream.isatty): 

121 return stream.isatty() 

122 # If there wasn't, see if it has a fileno, and if so, ask os.isatty 

123 elif has_fileno(stream): 

124 return os.isatty(stream.fileno()) 

125 # If we got here, none of the above worked, so it's reasonable to assume 

126 # the darn thing isn't a real TTY. 

127 return False 

128 

129 

130def helpline(obj: object) -> Optional[str]: 

131 """ 

132 Yield an object's first docstring line, or None if there was no docstring. 

133 

134 .. versionadded:: 1.0 

135 """ 

136 docstring = obj.__doc__ 

137 if ( 

138 not docstring 

139 or not docstring.strip() 

140 or docstring == type(obj).__doc__ 

141 ): 

142 return None 

143 return docstring.lstrip().splitlines()[0] 

144 

145 

146class ExceptionHandlingThread(threading.Thread): 

147 """ 

148 Thread handler making it easier for parent to handle thread exceptions. 

149 

150 Based in part on Fabric 1's ThreadHandler. See also Fabric GH issue #204. 

151 

152 When used directly, can be used in place of a regular ``threading.Thread``. 

153 If subclassed, the subclass must do one of: 

154 

155 - supply ``target`` to ``__init__`` 

156 - define ``_run()`` instead of ``run()`` 

157 

158 This is because this thread's entire point is to wrap behavior around the 

159 thread's execution; subclasses could not redefine ``run()`` without 

160 breaking that functionality. 

161 

162 .. versionadded:: 1.0 

163 """ 

164 

165 def __init__(self, **kwargs: Any) -> None: 

166 """ 

167 Create a new exception-handling thread instance. 

168 

169 Takes all regular `threading.Thread` keyword arguments, via 

170 ``**kwargs`` for easier display of thread identity when raising 

171 captured exceptions. 

172 """ 

173 super().__init__(**kwargs) 

174 # No record of why, but Fabric used daemon threads ever since the 

175 # switch from select.select, so let's keep doing that. 

176 self.daemon = True 

177 # Track exceptions raised in run() 

178 self.kwargs = kwargs 

179 # TODO: legacy cruft that needs to be removed 

180 self.exc_info: Optional[ 

181 Union[ 

182 Tuple[Type[BaseException], BaseException, TracebackType], 

183 Tuple[None, None, None], 

184 ] 

185 ] = None 

186 

187 def run(self) -> None: 

188 try: 

189 # Allow subclasses implemented using the "override run()'s body" 

190 # approach to work, by using _run() instead of run(). If that 

191 # doesn't appear to be the case, then assume we're being used 

192 # directly and just use super() ourselves. 

193 # XXX https://github.com/python/mypy/issues/1424 

194 if hasattr(self, "_run") and callable(self._run): # type: ignore 

195 # TODO: this could be: 

196 # - io worker with no 'result' (always local) 

197 # - tunnel worker, also with no 'result' (also always local) 

198 # - threaded concurrent run(), sudo(), put(), etc, with a 

199 # result (not necessarily local; might want to be a subproc or 

200 # whatever eventually) 

201 # TODO: so how best to conditionally add a "capture result 

202 # value of some kind"? 

203 # - update so all use cases use subclassing, add functionality 

204 # alongside self.exception() that is for the result of _run() 

205 # - split out class that does not care about result of _run() 

206 # and let it continue acting like a normal thread (meh) 

207 # - assume the run/sudo/etc case will use a queue inside its 

208 # worker body, orthogonal to how exception handling works 

209 self._run() # type: ignore 

210 else: 

211 super().run() 

212 except BaseException: 

213 # Store for actual reraising later 

214 self.exc_info = sys.exc_info() 

215 # And log now, in case we never get to later (e.g. if executing 

216 # program is hung waiting for us to do something) 

217 msg = "Encountered exception {!r} in thread for {!r}" 

218 # Name is either target function's dunder-name, or just "_run" if 

219 # we were run subclass-wise. 

220 name = "_run" 

221 if "target" in self.kwargs: 

222 name = self.kwargs["target"].__name__ 

223 debug(msg.format(self.exc_info[1], name)) # noqa 

224 

225 def exception(self) -> Optional["ExceptionWrapper"]: 

226 """ 

227 If an exception occurred, return an `.ExceptionWrapper` around it. 

228 

229 :returns: 

230 An `.ExceptionWrapper` managing the result of `sys.exc_info`, if an 

231 exception was raised during thread execution. If no exception 

232 occurred, returns ``None`` instead. 

233 

234 .. versionadded:: 1.0 

235 """ 

236 if self.exc_info is None: 

237 return None 

238 return ExceptionWrapper(self.kwargs, *self.exc_info) 

239 

240 @property 

241 def is_dead(self) -> bool: 

242 """ 

243 Returns ``True`` if not alive and has a stored exception. 

244 

245 Used to detect threads that have excepted & shut down. 

246 

247 .. versionadded:: 1.0 

248 """ 

249 # NOTE: it seems highly unlikely that a thread could still be 

250 # is_alive() but also have encountered an exception. But hey. Why not 

251 # be thorough? 

252 return (not self.is_alive()) and self.exc_info is not None 

253 

254 def __repr__(self) -> str: 

255 # TODO: beef this up more 

256 return str(self.kwargs["target"].__name__) 

257 

258 

259# NOTE: ExceptionWrapper defined here, not in exceptions.py, to avoid circular 

260# dependency issues (e.g. Failure subclasses need to use some bits from this 

261# module...) 

262#: A namedtuple wrapping a thread-borne exception & that thread's arguments. 

263#: Mostly used as an intermediate between `.ExceptionHandlingThread` (which 

264#: preserves initial exceptions) and `.ThreadException` (which holds 1..N such 

265#: exceptions, as typically multiple threads are involved.) 

266ExceptionWrapper = namedtuple( 

267 "ExceptionWrapper", "kwargs type value traceback" 

268)