Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/invoke/runners.py: 23%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

438 statements  

1import errno 

2import locale 

3import os 

4import struct 

5import sys 

6import threading 

7import time 

8import signal 

9from subprocess import Popen, PIPE 

10from types import TracebackType 

11from typing import ( 

12 TYPE_CHECKING, 

13 Any, 

14 Callable, 

15 Dict, 

16 Generator, 

17 IO, 

18 List, 

19 Optional, 

20 Tuple, 

21 Type, 

22) 

23 

24# Import some platform-specific things at top level so they can be mocked for 

25# tests. 

26try: 

27 import pty 

28except ImportError: 

29 pty = None # type: ignore[assignment] 

30try: 

31 import fcntl 

32except ImportError: 

33 fcntl = None # type: ignore[assignment] 

34try: 

35 import termios 

36except ImportError: 

37 termios = None # type: ignore[assignment] 

38 

39from .exceptions import ( 

40 UnexpectedExit, 

41 Failure, 

42 ThreadException, 

43 WatcherError, 

44 SubprocessPipeError, 

45 CommandTimedOut, 

46) 

47from .terminals import ( 

48 WINDOWS, 

49 pty_size, 

50 character_buffered, 

51 ready_for_reading, 

52 bytes_to_read, 

53) 

54from .util import has_fileno, isatty, ExceptionHandlingThread 

55 

56if TYPE_CHECKING: 

57 from .context import Context 

58 from .watchers import StreamWatcher 

59 

60 

61class Runner: 

62 """ 

63 Partially-abstract core command-running API. 

64 

65 This class is not usable by itself and must be subclassed, implementing a 

66 number of methods such as `start`, `wait` and `returncode`. For a subclass 

67 implementation example, see the source code for `.Local`. 

68 

69 .. versionadded:: 1.0 

70 """ 

71 

72 opts: Dict[str, Any] 

73 using_pty: bool 

74 read_chunk_size = 1000 

75 input_sleep = 0.01 

76 

77 def __init__(self, context: "Context") -> None: 

78 """ 

79 Create a new runner with a handle on some `.Context`. 

80 

81 :param context: 

82 a `.Context` instance, used to transmit default options and provide 

83 access to other contextualized information (e.g. a remote-oriented 

84 `.Runner` might want a `.Context` subclass holding info about 

85 hostnames and ports.) 

86 

87 .. note:: 

88 The `.Context` given to `.Runner` instances **must** contain 

89 default config values for the `.Runner` class in question. At a 

90 minimum, this means values for each of the default 

91 `.Runner.run` keyword arguments such as ``echo`` and ``warn``. 

92 

93 :raises exceptions.ValueError: 

94 if not all expected default values are found in ``context``. 

95 """ 

96 #: The `.Context` given to the same-named argument of `__init__`. 

97 self.context = context 

98 #: A `threading.Event` signaling program completion. 

99 #: 

100 #: Typically set after `wait` returns. Some IO mechanisms rely on this 

101 #: to know when to exit an infinite read loop. 

102 self.program_finished = threading.Event() 

103 # I wish Sphinx would organize all class/instance attrs in the same 

104 # place. If I don't do this here, it goes 'class vars -> __init__ 

105 # docstring -> instance vars' :( TODO: consider just merging class and 

106 # __init__ docstrings, though that's annoying too. 

107 #: How many bytes (at maximum) to read per iteration of stream reads. 

108 self.read_chunk_size = self.__class__.read_chunk_size 

109 # Ditto re: declaring this in 2 places for doc reasons. 

110 #: How many seconds to sleep on each iteration of the stdin read loop 

111 #: and other otherwise-fast loops. 

112 self.input_sleep = self.__class__.input_sleep 

113 #: Whether pty fallback warning has been emitted. 

114 self.warned_about_pty_fallback = False 

115 #: A list of `.StreamWatcher` instances for use by `respond`. Is filled 

116 #: in at runtime by `run`. 

117 self.watchers: List["StreamWatcher"] = [] 

118 # Optional timeout timer placeholder 

119 self._timer: Optional[threading.Timer] = None 

120 # Async flags (initialized for 'finally' referencing in case something 

121 # goes REAL bad during options parsing) 

122 self._asynchronous = False 

123 self._disowned = False 

124 

125 def run(self, command: str, **kwargs: Any) -> Optional["Result"]: 

126 """ 

127 Execute ``command``, returning an instance of `Result` once complete. 

128 

129 By default, this method is synchronous (it only returns once the 

130 subprocess has completed), and allows interactive keyboard 

131 communication with the subprocess. 

132 

133 It can instead behave asynchronously (returning early & requiring 

134 interaction with the resulting object to manage subprocess lifecycle) 

135 if you specify ``asynchronous=True``. Furthermore, you can completely 

136 disassociate the subprocess from Invoke's control (allowing it to 

137 persist on its own after Python exits) by saying ``disown=True``. See 

138 the per-kwarg docs below for details on both of these. 

139 

140 .. note:: 

141 All kwargs will default to the values found in this instance's 

142 `~.Runner.context` attribute, specifically in its configuration's 

143 ``run`` subtree (e.g. ``run.echo`` provides the default value for 

144 the ``echo`` keyword, etc). The base default values are described 

145 in the parameter list below. 

146 

147 :param str command: The shell command to execute. 

148 

149 :param bool asynchronous: 

150 When set to ``True`` (default ``False``), enables asynchronous 

151 behavior, as follows: 

152 

153 - Connections to the controlling terminal are disabled, meaning you 

154 will not see the subprocess output and it will not respond to 

155 your keyboard input - similar to ``hide=True`` and 

156 ``in_stream=False`` (though explicitly given 

157 ``(out|err|in)_stream`` file-like objects will still be honored 

158 as normal). 

159 - `.run` returns immediately after starting the subprocess, and its 

160 return value becomes an instance of `Promise` instead of 

161 `Result`. 

162 - `Promise` objects are primarily useful for their `~Promise.join` 

163 method, which blocks until the subprocess exits (similar to 

164 threading APIs) and either returns a final `~Result` or raises an 

165 exception, just as a synchronous ``run`` would. 

166 

167 - As with threading and similar APIs, users of 

168 ``asynchronous=True`` should make sure to ``join`` their 

169 `Promise` objects to prevent issues with interpreter 

170 shutdown. 

171 - One easy way to handle such cleanup is to use the `Promise` 

172 as a context manager - it will automatically ``join`` at the 

173 exit of the context block. 

174 

175 .. versionadded:: 1.4 

176 

177 :param bool disown: 

178 When set to ``True`` (default ``False``), returns immediately like 

179 ``asynchronous=True``, but does not perform any background work 

180 related to that subprocess (it is completely ignored). This allows 

181 subprocesses using shell backgrounding or similar techniques (e.g. 

182 trailing ``&``, ``nohup``) to persist beyond the lifetime of the 

183 Python process running Invoke. 

184 

185 .. note:: 

186 If you're unsure whether you want this or ``asynchronous``, you 

187 probably want ``asynchronous``! 

188 

189 Specifically, ``disown=True`` has the following behaviors: 

190 

191 - The return value is ``None`` instead of a `Result` or subclass. 

192 - No I/O worker threads are spun up, so you will have no access to 

193 the subprocess' stdout/stderr, your stdin will not be forwarded, 

194 ``(out|err|in)_stream`` will be ignored, and features like 

195 ``watchers`` will not function. 

196 - No exit code is checked for, so you will not receive any errors 

197 if the subprocess fails to exit cleanly. 

198 - ``pty=True`` may not function correctly (subprocesses may not run 

199 at all; this seems to be a potential bug in Python's 

200 ``pty.fork``) unless your command line includes tools such as 

201 ``nohup`` or (the shell builtin) ``disown``. 

202 

203 .. versionadded:: 1.4 

204 

205 :param bool dry: 

206 Whether to dry-run instead of truly invoking the given command. See 

207 :option:`--dry` (which flips this on globally) for details on this 

208 behavior. 

209 

210 .. versionadded:: 1.3 

211 

212 :param bool echo: 

213 Controls whether `.run` prints the command string to local stdout 

214 prior to executing it. Default: ``False``. 

215 

216 .. note:: 

217 ``hide=True`` will override ``echo=True`` if both are given. 

218 

219 :param echo_format: 

220 A string, which when passed to Python's inbuilt ``.format`` method, 

221 will change the format of the output when ``run.echo`` is set to 

222 true. 

223 

224 Currently, only ``{command}`` is supported as a parameter. 

225 

226 Defaults to printing the full command string in ANSI-escaped bold. 

227 

228 :param bool echo_stdin: 

229 Whether to write data from ``in_stream`` back to ``out_stream``. 

230 

231 In other words, in normal interactive usage, this parameter 

232 controls whether Invoke mirrors what you type back to your 

233 terminal. 

234 

235 By default (when ``None``), this behavior is triggered by the 

236 following: 

237 

238 * Not using a pty to run the subcommand (i.e. ``pty=False``), 

239 as ptys natively echo stdin to stdout on their own; 

240 * And when the controlling terminal of Invoke itself (as per 

241 ``in_stream``) appears to be a valid terminal device or TTY. 

242 (Specifically, when `~invoke.util.isatty` yields a ``True`` 

243 result when given ``in_stream``.) 

244 

245 .. note:: 

246 This property tends to be ``False`` when piping another 

247 program's output into an Invoke session, or when running 

248 Invoke within another program (e.g. running Invoke from 

249 itself). 

250 

251 If both of those properties are true, echoing will occur; if either 

252 is false, no echoing will be performed. 

253 

254 When not ``None``, this parameter will override that auto-detection 

255 and force, or disable, echoing. 

256 

257 :param str encoding: 

258 Override auto-detection of which encoding the subprocess is using 

259 for its stdout/stderr streams (which defaults to the return value 

260 of `default_encoding`). 

261 

262 :param err_stream: 

263 Same as ``out_stream``, except for standard error, and defaulting 

264 to ``sys.stderr``. 

265 

266 :param dict env: 

267 By default, subprocesses receive a copy of Invoke's own environment 

268 (i.e. ``os.environ``). Supply a dict here to update that child 

269 environment. 

270 

271 For example, ``run('command', env={'PYTHONPATH': 

272 '/some/virtual/env/maybe'})`` would modify the ``PYTHONPATH`` env 

273 var, with the rest of the child's env looking identical to the 

274 parent. 

275 

276 .. seealso:: ``replace_env`` for changing 'update' to 'replace'. 

277 

278 :param bool fallback: 

279 Controls auto-fallback behavior re: problems offering a pty when 

280 ``pty=True``. Whether this has any effect depends on the specific 

281 `Runner` subclass being invoked. Default: ``True``. 

282 

283 :param hide: 

284 Allows the caller to disable ``run``'s default behavior of copying 

285 the subprocess' stdout and stderr to the controlling terminal. 

286 Specify ``hide='out'`` (or ``'stdout'``) to hide only the stdout 

287 stream, ``hide='err'`` (or ``'stderr'``) to hide only stderr, or 

288 ``hide='both'`` (or ``True``) to hide both streams. 

289 

290 The default value is ``None``, meaning to print everything; 

291 ``False`` will also disable hiding. 

292 

293 .. note:: 

294 Stdout and stderr are always captured and stored in the 

295 ``Result`` object, regardless of ``hide``'s value. 

296 

297 .. note:: 

298 ``hide=True`` will also override ``echo=True`` if both are 

299 given (either as kwargs or via config/CLI). 

300 

301 :param in_stream: 

302 A file-like stream object to used as the subprocess' standard 

303 input. If ``None`` (the default), ``sys.stdin`` will be used. 

304 

305 If ``False``, will disable stdin mirroring entirely (though other 

306 functionality which writes to the subprocess' stdin, such as 

307 autoresponding, will still function.) Disabling stdin mirroring can 

308 help when ``sys.stdin`` is a misbehaving non-stream object, such as 

309 under test harnesses or headless command runners. 

310 

311 :param out_stream: 

312 A file-like stream object to which the subprocess' standard output 

313 should be written. If ``None`` (the default), ``sys.stdout`` will 

314 be used. 

315 

316 :param bool pty: 

317 By default, ``run`` connects directly to the invoked process and 

318 reads its stdout/stderr streams. Some programs will buffer (or even 

319 behave) differently in this situation compared to using an actual 

320 terminal or pseudoterminal (pty). To use a pty instead of the 

321 default behavior, specify ``pty=True``. 

322 

323 .. warning:: 

324 Due to their nature, ptys have a single output stream, so the 

325 ability to tell stdout apart from stderr is **not possible** 

326 when ``pty=True``. As such, all output will appear on 

327 ``out_stream`` (see below) and be captured into the ``stdout`` 

328 result attribute. ``err_stream`` and ``stderr`` will always be 

329 empty when ``pty=True``. 

330 

331 :param bool replace_env: 

332 When ``True``, causes the subprocess to receive the dictionary 

333 given to ``env`` as its entire shell environment, instead of 

334 updating a copy of ``os.environ`` (which is the default behavior). 

335 Default: ``False``. 

336 

337 :param str shell: 

338 Which shell binary to use. Default: ``/bin/bash`` (on Unix; 

339 ``COMSPEC`` or ``cmd.exe`` on Windows.) 

340 

341 :param timeout: 

342 Cause the runner to submit an interrupt to the subprocess and raise 

343 `.CommandTimedOut`, if the command takes longer than ``timeout`` 

344 seconds to execute. Defaults to ``None``, meaning no timeout. 

345 

346 .. versionadded:: 1.3 

347 

348 :param bool warn: 

349 Whether to warn and continue, instead of raising 

350 `.UnexpectedExit`, when the executed command exits with a 

351 nonzero status. Default: ``False``. 

352 

353 .. note:: 

354 This setting has no effect on exceptions, which will still be 

355 raised, typically bundled in `.ThreadException` objects if they 

356 were raised by the IO worker threads. 

357 

358 Similarly, `.WatcherError` exceptions raised by 

359 `.StreamWatcher` instances will also ignore this setting, and 

360 will usually be bundled inside `.Failure` objects (in order to 

361 preserve the execution context). 

362 

363 Ditto `.CommandTimedOut` - basically, anything that prevents a 

364 command from actually getting to "exited with an exit code" 

365 ignores this flag. 

366 

367 :param watchers: 

368 A list of `.StreamWatcher` instances which will be used to scan the 

369 program's ``stdout`` or ``stderr`` and may write into its ``stdin`` 

370 (typically ``bytes`` objects) in response to patterns or other 

371 heuristics. 

372 

373 See :doc:`/concepts/watchers` for details on this functionality. 

374 

375 Default: ``[]``. 

376 

377 :returns: 

378 `Result`, or a subclass thereof. 

379 

380 :raises: 

381 `.UnexpectedExit`, if the command exited nonzero and 

382 ``warn`` was ``False``. 

383 

384 :raises: 

385 `.Failure`, if the command didn't even exit cleanly, e.g. if a 

386 `.StreamWatcher` raised `.WatcherError`. 

387 

388 :raises: 

389 `.ThreadException` (if the background I/O threads encountered 

390 exceptions other than `.WatcherError`). 

391 

392 .. versionadded:: 1.0 

393 """ 

394 try: 

395 return self._run_body(command, **kwargs) 

396 finally: 

397 if not (self._asynchronous or self._disowned): 

398 self.stop() 

399 

400 def echo(self, command: str) -> None: 

401 print(self.opts["echo_format"].format(command=command)) 

402 

403 def _setup(self, command: str, kwargs: Any) -> None: 

404 """ 

405 Prepare data on ``self`` so we're ready to start running. 

406 """ 

407 # Normalize kwargs w/ config; sets self.opts, self.streams 

408 self._unify_kwargs_with_config(kwargs) 

409 # Environment setup 

410 self.env = self.generate_env( 

411 self.opts["env"], self.opts["replace_env"] 

412 ) 

413 # Arrive at final encoding if neither config nor kwargs had one 

414 self.encoding = self.opts["encoding"] or self.default_encoding() 

415 # Echo running command (wants to be early to be included in dry-run) 

416 if self.opts["echo"]: 

417 self.echo(command) 

418 # Prepare common result args. 

419 # TODO: I hate this. Needs a deeper separate think about tweaking 

420 # Runner.generate_result in a way that isn't literally just this same 

421 # two-step process, and which also works w/ downstream. 

422 self.result_kwargs = dict( 

423 command=command, 

424 shell=self.opts["shell"], 

425 env=self.env, 

426 pty=self.using_pty, 

427 hide=self.opts["hide"], 

428 encoding=self.encoding, 

429 ) 

430 

431 def _run_body(self, command: str, **kwargs: Any) -> Optional["Result"]: 

432 # Prepare all the bits n bobs. 

433 self._setup(command, kwargs) 

434 # If dry-run, stop here. 

435 if self.opts["dry"]: 

436 return self.generate_result( 

437 **dict(self.result_kwargs, stdout="", stderr="", exited=0) 

438 ) 

439 # Start executing the actual command (runs in background) 

440 self.start(command, self.opts["shell"], self.env) 

441 # If disowned, we just stop here - no threads, no timer, no error 

442 # checking, nada. 

443 if self._disowned: 

444 return None 

445 # Stand up & kick off IO, timer threads 

446 self.start_timer(self.opts["timeout"]) 

447 self.threads, self.stdout, self.stderr = self.create_io_threads() 

448 for thread in self.threads.values(): 

449 thread.start() 

450 # Wrap up or promise that we will, depending 

451 return self.make_promise() if self._asynchronous else self._finish() 

452 

453 def make_promise(self) -> "Promise": 

454 """ 

455 Return a `Promise` allowing async control of the rest of lifecycle. 

456 

457 .. versionadded:: 1.4 

458 """ 

459 return Promise(self) 

460 

461 def _finish(self) -> "Result": 

462 # Wait for subprocess to run, forwarding signals as we get them. 

463 try: 

464 while True: 

465 try: 

466 self.wait() 

467 break # done waiting! 

468 # Don't locally stop on ^C, only forward it: 

469 # - if remote end really stops, we'll naturally stop after 

470 # - if remote end does not stop (eg REPL, editor) we don't want 

471 # to stop prematurely 

472 except KeyboardInterrupt as e: 

473 self.send_interrupt(e) 

474 # TODO: honor other signals sent to our own process and 

475 # transmit them to the subprocess before handling 'normally'. 

476 # Make sure we tie off our worker threads, even if something exploded. 

477 # Any exceptions that raised during self.wait() above will appear after 

478 # this block. 

479 finally: 

480 # Inform stdin-mirroring worker to stop its eternal looping 

481 self.program_finished.set() 

482 # Join threads, storing inner exceptions, & set a timeout if 

483 # necessary. (Segregate WatcherErrors as they are "anticipated 

484 # errors" that want to show up at the end during creation of 

485 # Failure objects.) 

486 watcher_errors = [] 

487 thread_exceptions = [] 

488 for target, thread in self.threads.items(): 

489 thread.join(self._thread_join_timeout(target)) 

490 exception = thread.exception() 

491 if exception is not None: 

492 real = exception.value 

493 if isinstance(real, WatcherError): 

494 watcher_errors.append(real) 

495 else: 

496 thread_exceptions.append(exception) 

497 # If any exceptions appeared inside the threads, raise them now as an 

498 # aggregate exception object. 

499 # NOTE: this is kept outside the 'finally' so that main-thread 

500 # exceptions are raised before worker-thread exceptions; they're more 

501 # likely to be Big Serious Problems. 

502 if thread_exceptions: 

503 raise ThreadException(thread_exceptions) 

504 # Collate stdout/err, calculate exited, and get final result obj 

505 result = self._collate_result(watcher_errors) 

506 # Any presence of WatcherError from the threads indicates a watcher was 

507 # upset and aborted execution; make a generic Failure out of it and 

508 # raise that. 

509 if watcher_errors: 

510 # TODO: ambiguity exists if we somehow get WatcherError in *both* 

511 # threads...as unlikely as that would normally be. 

512 raise Failure(result, reason=watcher_errors[0]) 

513 # If a timeout was requested and the subprocess did time out, shout. 

514 timeout = self.opts["timeout"] 

515 if timeout is not None and self.timed_out: 

516 raise CommandTimedOut(result, timeout=timeout) 

517 if not (result or self.opts["warn"]): 

518 raise UnexpectedExit(result) 

519 return result 

520 

521 def _unify_kwargs_with_config(self, kwargs: Any) -> None: 

522 """ 

523 Unify `run` kwargs with config options to arrive at local options. 

524 

525 Sets: 

526 

527 - ``self.opts`` - opts dict 

528 - ``self.streams`` - map of stream names to stream target values 

529 """ 

530 opts = {} 

531 for key, value in self.context.config.run.items(): 

532 runtime = kwargs.pop(key, None) 

533 opts[key] = value if runtime is None else runtime 

534 # Pull in command execution timeout, which stores config elsewhere, 

535 # but only use it if it's actually set (backwards compat) 

536 config_timeout = self.context.config.timeouts.command 

537 opts["timeout"] = kwargs.pop("timeout", config_timeout) 

538 # Handle invalid kwarg keys (anything left in kwargs). 

539 # Act like a normal function would, i.e. TypeError 

540 if kwargs: 

541 err = "run() got an unexpected keyword argument '{}'" 

542 raise TypeError(err.format(list(kwargs.keys())[0])) 

543 # Update disowned, async flags 

544 self._asynchronous = opts["asynchronous"] 

545 self._disowned = opts["disown"] 

546 if self._asynchronous and self._disowned: 

547 err = "Cannot give both 'asynchronous' and 'disown' at the same time!" # noqa 

548 raise ValueError(err) 

549 # If hide was True, turn off echoing 

550 if opts["hide"] is True: 

551 opts["echo"] = False 

552 # Conversely, ensure echoing is always on when dry-running 

553 if opts["dry"] is True: 

554 opts["echo"] = True 

555 # Always hide if async 

556 if self._asynchronous: 

557 opts["hide"] = True 

558 # Then normalize 'hide' from one of the various valid input values, 

559 # into a stream-names tuple. Also account for the streams. 

560 out_stream, err_stream = opts["out_stream"], opts["err_stream"] 

561 opts["hide"] = normalize_hide(opts["hide"], out_stream, err_stream) 

562 # Derive stream objects 

563 if out_stream is None: 

564 out_stream = sys.stdout 

565 if err_stream is None: 

566 err_stream = sys.stderr 

567 in_stream = opts["in_stream"] 

568 if in_stream is None: 

569 # If in_stream hasn't been overridden, and we're async, we don't 

570 # want to read from sys.stdin (otherwise the default) - so set 

571 # False instead. 

572 in_stream = False if self._asynchronous else sys.stdin 

573 # Determine pty or no 

574 self.using_pty = self.should_use_pty(opts["pty"], opts["fallback"]) 

575 if opts["watchers"]: 

576 self.watchers = opts["watchers"] 

577 # Set data 

578 self.opts = opts 

579 self.streams = {"out": out_stream, "err": err_stream, "in": in_stream} 

580 

581 def _collate_result(self, watcher_errors: List[WatcherError]) -> "Result": 

582 # At this point, we had enough success that we want to be returning or 

583 # raising detailed info about our execution; so we generate a Result. 

584 stdout = "".join(self.stdout) 

585 stderr = "".join(self.stderr) 

586 if WINDOWS: 

587 # "Universal newlines" - replace all standard forms of 

588 # newline with \n. This is not technically Windows related 

589 # (\r as newline is an old Mac convention) but we only apply 

590 # the translation for Windows as that's the only platform 

591 # it is likely to matter for these days. 

592 stdout = stdout.replace("\r\n", "\n").replace("\r", "\n") 

593 stderr = stderr.replace("\r\n", "\n").replace("\r", "\n") 

594 # Get return/exit code, unless there were WatcherErrors to handle. 

595 # NOTE: In that case, returncode() may block waiting on the process 

596 # (which may be waiting for user input). Since most WatcherError 

597 # situations lack a useful exit code anyways, skipping this doesn't 

598 # really hurt any. 

599 exited = None if watcher_errors else self.returncode() 

600 # TODO: as noted elsewhere, I kinda hate this. Consider changing 

601 # generate_result()'s API in next major rev so we can tidy up. 

602 result = self.generate_result( 

603 **dict( 

604 self.result_kwargs, stdout=stdout, stderr=stderr, exited=exited 

605 ) 

606 ) 

607 return result 

608 

609 def _thread_join_timeout(self, target: Callable) -> Optional[int]: 

610 # Add a timeout to out/err thread joins when it looks like they're not 

611 # dead but their counterpart is dead; this indicates issue #351 (fixed 

612 # by #432) where the subproc may hang because its stdout (or stderr) is 

613 # no longer being consumed by the dead thread (and a pipe is filling 

614 # up.) In that case, the non-dead thread is likely to block forever on 

615 # a `recv` unless we add this timeout. 

616 if target == self.handle_stdin: 

617 return None 

618 opposite = self.handle_stderr 

619 if target == self.handle_stderr: 

620 opposite = self.handle_stdout 

621 if opposite in self.threads and self.threads[opposite].is_dead: 

622 return 1 

623 return None 

624 

625 def create_io_threads( 

626 self, 

627 ) -> Tuple[Dict[Callable, ExceptionHandlingThread], List[str], List[str]]: 

628 """ 

629 Create and return a dictionary of IO thread worker objects. 

630 

631 Caller is expected to handle persisting and/or starting the wrapped 

632 threads. 

633 """ 

634 stdout: List[str] = [] 

635 stderr: List[str] = [] 

636 # Set up IO thread parameters (format - body_func: {kwargs}) 

637 thread_args: Dict[Callable, Any] = { 

638 self.handle_stdout: { 

639 "buffer_": stdout, 

640 "hide": "stdout" in self.opts["hide"], 

641 "output": self.streams["out"], 

642 } 

643 } 

644 # After opt processing above, in_stream will be a real stream obj or 

645 # False, so we can truth-test it. We don't even create a stdin-handling 

646 # thread if it's False, meaning user indicated stdin is nonexistent or 

647 # problematic. 

648 if self.streams["in"]: 

649 thread_args[self.handle_stdin] = { 

650 "input_": self.streams["in"], 

651 "output": self.streams["out"], 

652 "echo": self.opts["echo_stdin"], 

653 } 

654 if not self.using_pty: 

655 thread_args[self.handle_stderr] = { 

656 "buffer_": stderr, 

657 "hide": "stderr" in self.opts["hide"], 

658 "output": self.streams["err"], 

659 } 

660 # Kick off IO threads 

661 threads = {} 

662 for target, kwargs in thread_args.items(): 

663 t = ExceptionHandlingThread(target=target, kwargs=kwargs) 

664 threads[target] = t 

665 return threads, stdout, stderr 

666 

667 def generate_result(self, **kwargs: Any) -> "Result": 

668 """ 

669 Create & return a suitable `Result` instance from the given ``kwargs``. 

670 

671 Subclasses may wish to override this in order to manipulate things or 

672 generate a `Result` subclass (e.g. ones containing additional metadata 

673 besides the default). 

674 

675 .. versionadded:: 1.0 

676 """ 

677 return Result(**kwargs) 

678 

679 def read_proc_output(self, reader: Callable) -> Generator[str, None, None]: 

680 """ 

681 Iteratively read & decode bytes from a subprocess' out/err stream. 

682 

683 :param reader: 

684 A literal reader function/partial, wrapping the actual stream 

685 object in question, which takes a number of bytes to read, and 

686 returns that many bytes (or ``None``). 

687 

688 ``reader`` should be a reference to either `read_proc_stdout` or 

689 `read_proc_stderr`, which perform the actual, platform/library 

690 specific read calls. 

691 

692 :returns: 

693 A generator yielding strings. 

694 

695 Specifically, each resulting string is the result of decoding 

696 `read_chunk_size` bytes read from the subprocess' out/err stream. 

697 

698 .. versionadded:: 1.0 

699 """ 

700 # NOTE: Typically, reading from any stdout/err (local, remote or 

701 # otherwise) can be thought of as "read until you get nothing back". 

702 # This is preferable over "wait until an out-of-band signal claims the 

703 # process is done running" because sometimes that signal will appear 

704 # before we've actually read all the data in the stream (i.e.: a race 

705 # condition). 

706 while True: 

707 data = reader(self.read_chunk_size) 

708 if not data: 

709 break 

710 yield self.decode(data) 

711 

712 def write_our_output(self, stream: IO, string: str) -> None: 

713 """ 

714 Write ``string`` to ``stream``. 

715 

716 Also calls ``.flush()`` on ``stream`` to ensure that real terminal 

717 streams don't buffer. 

718 

719 :param stream: 

720 A file-like stream object, mapping to the ``out_stream`` or 

721 ``err_stream`` parameters of `run`. 

722 

723 :param string: A Unicode string object. 

724 

725 :returns: ``None``. 

726 

727 .. versionadded:: 1.0 

728 """ 

729 stream.write(string) 

730 stream.flush() 

731 

732 def _handle_output( 

733 self, 

734 buffer_: List[str], 

735 hide: bool, 

736 output: IO, 

737 reader: Callable, 

738 ) -> None: 

739 # TODO: store un-decoded/raw bytes somewhere as well... 

740 for data in self.read_proc_output(reader): 

741 # Echo to local stdout if necessary 

742 # TODO: should we rephrase this as "if you want to hide, give me a 

743 # dummy output stream, e.g. something like /dev/null"? Otherwise, a 

744 # combo of 'hide=stdout' + 'here is an explicit out_stream' means 

745 # out_stream is never written to, and that seems...odd. 

746 if not hide: 

747 self.write_our_output(stream=output, string=data) 

748 # Store in shared buffer so main thread can do things with the 

749 # result after execution completes. 

750 # NOTE: this is threadsafe insofar as no reading occurs until after 

751 # the thread is join()'d. 

752 buffer_.append(data) 

753 # Run our specific buffer through the autoresponder framework 

754 self.respond(buffer_) 

755 

756 def handle_stdout( 

757 self, buffer_: List[str], hide: bool, output: IO 

758 ) -> None: 

759 """ 

760 Read process' stdout, storing into a buffer & printing/parsing. 

761 

762 Intended for use as a thread target. Only terminates when all stdout 

763 from the subprocess has been read. 

764 

765 :param buffer_: The capture buffer shared with the main thread. 

766 :param bool hide: Whether or not to replay data into ``output``. 

767 :param output: 

768 Output stream (file-like object) to write data into when not 

769 hiding. 

770 

771 :returns: ``None``. 

772 

773 .. versionadded:: 1.0 

774 """ 

775 self._handle_output( 

776 buffer_, hide, output, reader=self.read_proc_stdout 

777 ) 

778 

779 def handle_stderr( 

780 self, buffer_: List[str], hide: bool, output: IO 

781 ) -> None: 

782 """ 

783 Read process' stderr, storing into a buffer & printing/parsing. 

784 

785 Identical to `handle_stdout` except for the stream read from; see its 

786 docstring for API details. 

787 

788 .. versionadded:: 1.0 

789 """ 

790 self._handle_output( 

791 buffer_, hide, output, reader=self.read_proc_stderr 

792 ) 

793 

794 def read_our_stdin(self, input_: IO) -> Optional[str]: 

795 """ 

796 Read & decode bytes from a local stdin stream. 

797 

798 :param input_: 

799 Actual stream object to read from. Maps to ``in_stream`` in `run`, 

800 so will often be ``sys.stdin``, but might be any stream-like 

801 object. 

802 

803 :returns: 

804 A Unicode string, the result of decoding the read bytes (this might 

805 be the empty string if the pipe has closed/reached EOF); or 

806 ``None`` if stdin wasn't ready for reading yet. 

807 

808 .. versionadded:: 1.0 

809 """ 

810 # TODO: consider moving the character_buffered contextmanager call in 

811 # here? Downside is it would be flipping those switches for every byte 

812 # read instead of once per session, which could be costly (?). 

813 bytes_ = None 

814 if ready_for_reading(input_): 

815 try: 

816 bytes_ = input_.read(bytes_to_read(input_)) 

817 except OSError as e: 

818 # Assume EBADF in this situation implies running under nohup or 

819 # similar, where: 

820 # - we cannot reliably detect a bad FD up front 

821 # - trying to read it would explode 

822 # - user almost surely doesn't care about stdin anyways 

823 # and ignore it (but not other OSErrors!) 

824 if e.errno != errno.EBADF: 

825 raise 

826 # Decode if it appears to be binary-type. (From real terminal 

827 # streams, usually yes; from file-like objects, often no.) 

828 if bytes_ and isinstance(bytes_, bytes): 

829 # TODO: will decoding 1 byte at a time break multibyte 

830 # character encodings? How to square interactivity with that? 

831 bytes_ = self.decode(bytes_) 

832 return bytes_ 

833 

834 def handle_stdin( 

835 self, 

836 input_: IO, 

837 output: IO, 

838 echo: bool = False, 

839 ) -> None: 

840 """ 

841 Read local stdin, copying into process' stdin as necessary. 

842 

843 Intended for use as a thread target. 

844 

845 .. note:: 

846 Because real terminal stdin streams have no well-defined "end", if 

847 such a stream is detected (based on existence of a callable 

848 ``.fileno()``) this method will wait until `program_finished` is 

849 set, before terminating. 

850 

851 When the stream doesn't appear to be from a terminal, the same 

852 semantics as `handle_stdout` are used - the stream is simply 

853 ``read()`` from until it returns an empty value. 

854 

855 :param input_: Stream (file-like object) from which to read. 

856 :param output: Stream (file-like object) to which echoing may occur. 

857 :param bool echo: User override option for stdin-stdout echoing. 

858 

859 :returns: ``None``. 

860 

861 .. versionadded:: 1.0 

862 """ 

863 # TODO: reinstate lock/whatever thread logic from fab v1 which prevents 

864 # reading from stdin while other parts of the code are prompting for 

865 # runtime passwords? (search for 'input_enabled') 

866 # TODO: fabric#1339 is strongly related to this, if it's not literally 

867 # exposing some regression in Fabric 1.x itself. 

868 closed_stdin = False 

869 with character_buffered(input_): 

870 while True: 

871 data = self.read_our_stdin(input_) 

872 if data: 

873 # Mirror what we just read to process' stdin. 

874 # We encode to ensure bytes, but skip the decode step since 

875 # there's presumably no need (nobody's interacting with 

876 # this data programmatically). 

877 self.write_proc_stdin(data) 

878 # Also echo it back to local stdout (or whatever 

879 # out_stream is set to) when necessary. 

880 if echo is None: 

881 echo = self.should_echo_stdin(input_, output) 

882 if echo: 

883 self.write_our_output(stream=output, string=data) 

884 # Empty string/char/byte != None. Can't just use 'else' here. 

885 elif data is not None: 

886 # When reading from file-like objects that aren't "real" 

887 # terminal streams, an empty byte signals EOF. 

888 if not self.using_pty and not closed_stdin: 

889 self.close_proc_stdin() 

890 closed_stdin = True 

891 # Dual all-done signals: program being executed is done 

892 # running, *and* we don't seem to be reading anything out of 

893 # stdin. (NOTE: If we only test the former, we may encounter 

894 # race conditions re: unread stdin.) 

895 if self.program_finished.is_set() and not data: 

896 break 

897 # Take a nap so we're not chewing CPU. 

898 time.sleep(self.input_sleep) 

899 

900 def should_echo_stdin(self, input_: IO, output: IO) -> bool: 

901 """ 

902 Determine whether data read from ``input_`` should echo to ``output``. 

903 

904 Used by `handle_stdin`; tests attributes of ``input_`` and ``output``. 

905 

906 :param input_: Input stream (file-like object). 

907 :param output: Output stream (file-like object). 

908 :returns: A ``bool``. 

909 

910 .. versionadded:: 1.0 

911 """ 

912 return (not self.using_pty) and isatty(input_) 

913 

914 def respond(self, buffer_: List[str]) -> None: 

915 """ 

916 Write to the program's stdin in response to patterns in ``buffer_``. 

917 

918 The patterns and responses are driven by the `.StreamWatcher` instances 

919 from the ``watchers`` kwarg of `run` - see :doc:`/concepts/watchers` 

920 for a conceptual overview. 

921 

922 :param buffer: 

923 The capture buffer for this thread's particular IO stream. 

924 

925 :returns: ``None``. 

926 

927 .. versionadded:: 1.0 

928 """ 

929 # Join buffer contents into a single string; without this, 

930 # StreamWatcher subclasses can't do things like iteratively scan for 

931 # pattern matches. 

932 # NOTE: using string.join should be "efficient enough" for now, re: 

933 # speed and memory use. Should that become false, consider using 

934 # StringIO or cStringIO (tho the latter doesn't do Unicode well?) which 

935 # is apparently even more efficient. 

936 stream = "".join(buffer_) 

937 for watcher in self.watchers: 

938 for response in watcher.submit(stream): 

939 self.write_proc_stdin(response) 

940 

941 def generate_env( 

942 self, env: Dict[str, Any], replace_env: bool 

943 ) -> Dict[str, Any]: 

944 """ 

945 Return a suitable environment dict based on user input & behavior. 

946 

947 :param dict env: Dict supplying overrides or full env, depending. 

948 :param bool replace_env: 

949 Whether ``env`` updates, or is used in place of, the value of 

950 `os.environ`. 

951 

952 :returns: A dictionary of shell environment vars. 

953 

954 .. versionadded:: 1.0 

955 """ 

956 return env if replace_env else dict(os.environ, **env) 

957 

958 def should_use_pty(self, pty: bool, fallback: bool) -> bool: 

959 """ 

960 Should execution attempt to use a pseudo-terminal? 

961 

962 :param bool pty: 

963 Whether the user explicitly asked for a pty. 

964 :param bool fallback: 

965 Whether falling back to non-pty execution should be allowed, in 

966 situations where ``pty=True`` but a pty could not be allocated. 

967 

968 .. versionadded:: 1.0 

969 """ 

970 # NOTE: fallback not used: no falling back implemented by default. 

971 return pty 

972 

973 @property 

974 def has_dead_threads(self) -> bool: 

975 """ 

976 Detect whether any IO threads appear to have terminated unexpectedly. 

977 

978 Used during process-completion waiting (in `wait`) to ensure we don't 

979 deadlock our child process if our IO processing threads have 

980 errored/died. 

981 

982 :returns: 

983 ``True`` if any threads appear to have terminated with an 

984 exception, ``False`` otherwise. 

985 

986 .. versionadded:: 1.0 

987 """ 

988 return any(x.is_dead for x in self.threads.values()) 

989 

990 def wait(self) -> None: 

991 """ 

992 Block until the running command appears to have exited. 

993 

994 :returns: ``None``. 

995 

996 .. versionadded:: 1.0 

997 """ 

998 while True: 

999 proc_finished = self.process_is_finished 

1000 dead_threads = self.has_dead_threads 

1001 if proc_finished or dead_threads: 

1002 break 

1003 time.sleep(self.input_sleep) 

1004 

1005 def write_proc_stdin(self, data: str) -> None: 

1006 """ 

1007 Write encoded ``data`` to the running process' stdin. 

1008 

1009 :param data: A Unicode string. 

1010 

1011 :returns: ``None``. 

1012 

1013 .. versionadded:: 1.0 

1014 """ 

1015 # Encode always, then request implementing subclass to perform the 

1016 # actual write to subprocess' stdin. 

1017 self._write_proc_stdin(data.encode(self.encoding)) 

1018 

1019 def decode(self, data: bytes) -> str: 

1020 """ 

1021 Decode some ``data`` bytes, returning Unicode. 

1022 

1023 .. versionadded:: 1.0 

1024 """ 

1025 # NOTE: yes, this is a 1-liner. The point is to make it much harder to 

1026 # forget to use 'replace' when decoding :) 

1027 return data.decode(self.encoding, "replace") 

1028 

1029 @property 

1030 def process_is_finished(self) -> bool: 

1031 """ 

1032 Determine whether our subprocess has terminated. 

1033 

1034 .. note:: 

1035 The implementation of this method should be nonblocking, as it is 

1036 used within a query/poll loop. 

1037 

1038 :returns: 

1039 ``True`` if the subprocess has finished running, ``False`` 

1040 otherwise. 

1041 

1042 .. versionadded:: 1.0 

1043 """ 

1044 raise NotImplementedError 

1045 

1046 def start(self, command: str, shell: str, env: Dict[str, Any]) -> None: 

1047 """ 

1048 Initiate execution of ``command`` (via ``shell``, with ``env``). 

1049 

1050 Typically this means use of a forked subprocess or requesting start of 

1051 execution on a remote system. 

1052 

1053 In most cases, this method will also set subclass-specific member 

1054 variables used in other methods such as `wait` and/or `returncode`. 

1055 

1056 :param str command: 

1057 Command string to execute. 

1058 

1059 :param str shell: 

1060 Shell to use when executing ``command``. 

1061 

1062 :param dict env: 

1063 Environment dict used to prep shell environment. 

1064 

1065 .. versionadded:: 1.0 

1066 """ 

1067 raise NotImplementedError 

1068 

1069 def start_timer(self, timeout: int) -> None: 

1070 """ 

1071 Start a timer to `kill` our subprocess after ``timeout`` seconds. 

1072 """ 

1073 if timeout is not None: 

1074 self._timer = threading.Timer(timeout, self.kill) 

1075 self._timer.start() 

1076 

1077 def read_proc_stdout(self, num_bytes: int) -> Optional[bytes]: 

1078 """ 

1079 Read ``num_bytes`` from the running process' stdout stream. 

1080 

1081 :param int num_bytes: Number of bytes to read at maximum. 

1082 

1083 :returns: A string/bytes object. 

1084 

1085 .. versionadded:: 1.0 

1086 """ 

1087 raise NotImplementedError 

1088 

1089 def read_proc_stderr(self, num_bytes: int) -> Optional[bytes]: 

1090 """ 

1091 Read ``num_bytes`` from the running process' stderr stream. 

1092 

1093 :param int num_bytes: Number of bytes to read at maximum. 

1094 

1095 :returns: A string/bytes object. 

1096 

1097 .. versionadded:: 1.0 

1098 """ 

1099 raise NotImplementedError 

1100 

1101 def _write_proc_stdin(self, data: bytes) -> None: 

1102 """ 

1103 Write ``data`` to running process' stdin. 

1104 

1105 This should never be called directly; it's for subclasses to implement. 

1106 See `write_proc_stdin` for the public API call. 

1107 

1108 :param data: Already-encoded byte data suitable for writing. 

1109 

1110 :returns: ``None``. 

1111 

1112 .. versionadded:: 1.0 

1113 """ 

1114 raise NotImplementedError 

1115 

1116 def close_proc_stdin(self) -> None: 

1117 """ 

1118 Close running process' stdin. 

1119 

1120 :returns: ``None``. 

1121 

1122 .. versionadded:: 1.3 

1123 """ 

1124 raise NotImplementedError 

1125 

1126 def default_encoding(self) -> str: 

1127 """ 

1128 Return a string naming the expected encoding of subprocess streams. 

1129 

1130 This return value should be suitable for use by encode/decode methods. 

1131 

1132 .. versionadded:: 1.0 

1133 """ 

1134 # TODO: probably wants to be 2 methods, one for local and one for 

1135 # subprocess. For now, good enough to assume both are the same. 

1136 return default_encoding() 

1137 

1138 def send_interrupt(self, interrupt: "KeyboardInterrupt") -> None: 

1139 """ 

1140 Submit an interrupt signal to the running subprocess. 

1141 

1142 In almost all implementations, the default behavior is what will be 

1143 desired: submit ``\x03`` to the subprocess' stdin pipe. However, we 

1144 leave this as a public method in case this default needs to be 

1145 augmented or replaced. 

1146 

1147 :param interrupt: 

1148 The locally-sourced ``KeyboardInterrupt`` causing the method call. 

1149 

1150 :returns: ``None``. 

1151 

1152 .. versionadded:: 1.0 

1153 """ 

1154 self.write_proc_stdin("\x03") 

1155 

1156 def returncode(self) -> Optional[int]: 

1157 """ 

1158 Return the numeric return/exit code resulting from command execution. 

1159 

1160 :returns: 

1161 `int`, if any reasonable return code could be determined, or 

1162 ``None`` in corner cases where that was not possible. 

1163 

1164 .. versionadded:: 1.0 

1165 """ 

1166 raise NotImplementedError 

1167 

1168 def stop(self) -> None: 

1169 """ 

1170 Perform final cleanup, if necessary. 

1171 

1172 This method is called within a ``finally`` clause inside the main `run` 

1173 method. Depending on the subclass, it may be a no-op, or it may do 

1174 things such as close network connections or open files. 

1175 

1176 :returns: ``None`` 

1177 

1178 .. versionadded:: 1.0 

1179 """ 

1180 if self._timer: 

1181 self._timer.cancel() 

1182 

1183 def kill(self) -> None: 

1184 """ 

1185 Forcibly terminate the subprocess. 

1186 

1187 Typically only used by the timeout functionality. 

1188 

1189 This is often a "best-effort" attempt, e.g. remote subprocesses often 

1190 must settle for simply shutting down the local side of the network 

1191 connection and hoping the remote end eventually gets the message. 

1192 """ 

1193 raise NotImplementedError 

1194 

1195 @property 

1196 def timed_out(self) -> bool: 

1197 """ 

1198 Returns ``True`` if the subprocess stopped because it timed out. 

1199 

1200 .. versionadded:: 1.3 

1201 """ 

1202 # Timer expiry implies we did time out. (The timer itself will have 

1203 # killed the subprocess, allowing us to even get to this point.) 

1204 return bool(self._timer and not self._timer.is_alive()) 

1205 

1206 

1207class Local(Runner): 

1208 """ 

1209 Execute a command on the local system in a subprocess. 

1210 

1211 .. note:: 

1212 When Invoke itself is executed without a controlling terminal (e.g. 

1213 when ``sys.stdin`` lacks a useful ``fileno``), it's not possible to 

1214 present a handle on our PTY to local subprocesses. In such situations, 

1215 `Local` will fallback to behaving as if ``pty=False`` (on the theory 

1216 that degraded execution is better than none at all) as well as printing 

1217 a warning to stderr. 

1218 

1219 To disable this behavior, say ``fallback=False``. 

1220 

1221 .. versionadded:: 1.0 

1222 """ 

1223 

1224 def __init__(self, context: "Context") -> None: 

1225 super().__init__(context) 

1226 # Bookkeeping var for pty use case 

1227 self.status = 0 

1228 

1229 def should_use_pty(self, pty: bool = False, fallback: bool = True) -> bool: 

1230 use_pty = False 

1231 if pty: 

1232 use_pty = True 

1233 # TODO: pass in & test in_stream, not sys.stdin 

1234 if not has_fileno(sys.stdin) and fallback: 

1235 if not self.warned_about_pty_fallback: 

1236 err = "WARNING: stdin has no fileno; falling back to non-pty execution!\n" # noqa 

1237 sys.stderr.write(err) 

1238 self.warned_about_pty_fallback = True 

1239 use_pty = False 

1240 return use_pty 

1241 

1242 def read_proc_stdout(self, num_bytes: int) -> Optional[bytes]: 

1243 # Obtain useful read-some-bytes function 

1244 if self.using_pty: 

1245 # Need to handle spurious OSErrors on some Linux platforms. 

1246 try: 

1247 data = os.read(self.parent_fd, num_bytes) 

1248 except OSError as e: 

1249 # Only eat I/O specific OSErrors so we don't hide others 

1250 stringified = str(e) 

1251 io_errors = ( 

1252 # The typical default 

1253 "Input/output error", 

1254 # Some less common platforms phrase it this way 

1255 "I/O error", 

1256 ) 

1257 if not any(error in stringified for error in io_errors): 

1258 raise 

1259 # The bad OSErrors happen after all expected output has 

1260 # appeared, so we return a falsey value, which triggers the 

1261 # "end of output" logic in code using reader functions. 

1262 data = None 

1263 elif self.process and self.process.stdout: 

1264 data = os.read(self.process.stdout.fileno(), num_bytes) 

1265 else: 

1266 data = None 

1267 return data 

1268 

1269 def read_proc_stderr(self, num_bytes: int) -> Optional[bytes]: 

1270 # NOTE: when using a pty, this will never be called. 

1271 # TODO: do we ever get those OSErrors on stderr? Feels like we could? 

1272 if self.process and self.process.stderr: 

1273 return os.read(self.process.stderr.fileno(), num_bytes) 

1274 return None 

1275 

1276 def _write_proc_stdin(self, data: bytes) -> None: 

1277 # NOTE: parent_fd from os.fork() is a read/write pipe attached to our 

1278 # forked process' stdout/stdin, respectively. 

1279 if self.using_pty: 

1280 fd = self.parent_fd 

1281 elif self.process and self.process.stdin: 

1282 fd = self.process.stdin.fileno() 

1283 else: 

1284 raise SubprocessPipeError( 

1285 "Unable to write to missing subprocess or stdin!" 

1286 ) 

1287 # Try to write, ignoring broken pipes if encountered (implies child 

1288 # process exited before the process piping stdin to us finished; 

1289 # there's nothing we can do about that!) 

1290 try: 

1291 os.write(fd, data) 

1292 except OSError as e: 

1293 if "Broken pipe" not in str(e): 

1294 raise 

1295 

1296 def close_proc_stdin(self) -> None: 

1297 if self.using_pty: 

1298 # there is no working scenario to tell the process that stdin 

1299 # closed when using pty 

1300 raise SubprocessPipeError("Cannot close stdin when pty=True") 

1301 elif self.process and self.process.stdin: 

1302 self.process.stdin.close() 

1303 else: 

1304 raise SubprocessPipeError( 

1305 "Unable to close missing subprocess or stdin!" 

1306 ) 

1307 

1308 def start(self, command: str, shell: str, env: Dict[str, Any]) -> None: 

1309 if self.using_pty: 

1310 if pty is None: # Encountered ImportError 

1311 err = "You indicated pty=True, but your platform doesn't support the 'pty' module!" # noqa 

1312 sys.exit(err) 

1313 cols, rows = pty_size() 

1314 self.pid, self.parent_fd = pty.fork() 

1315 # If we're the child process, load up the actual command in a 

1316 # shell, just as subprocess does; this replaces our process - whose 

1317 # pipes are all hooked up to the PTY - with the "real" one. 

1318 if self.pid == 0: 

1319 # TODO: both pty.spawn() and pexpect.spawn() do a lot of 

1320 # setup/teardown involving tty.setraw, getrlimit, signal. 

1321 # Ostensibly we'll want some of that eventually, but if 

1322 # possible write tests - integration-level if necessary - 

1323 # before adding it! 

1324 # 

1325 # Set pty window size based on what our own controlling 

1326 # terminal's window size appears to be. 

1327 # TODO: make subroutine? 

1328 winsize = struct.pack("HHHH", rows, cols, 0, 0) 

1329 fcntl.ioctl(sys.stdout.fileno(), termios.TIOCSWINSZ, winsize) 

1330 # Use execve for bare-minimum "exec w/ variable # args + env" 

1331 # behavior. No need for the 'p' (use PATH to find executable) 

1332 # for now. 

1333 # NOTE: stdlib subprocess (actually its posix flavor, which is 

1334 # written in C) uses either execve or execv, depending. 

1335 os.execve(shell, [shell, "-c", command], env) 

1336 else: 

1337 self.process = Popen( 

1338 command, 

1339 shell=True, 

1340 executable=shell, 

1341 env=env, 

1342 stdout=PIPE, 

1343 stderr=PIPE, 

1344 stdin=PIPE, 

1345 ) 

1346 

1347 def kill(self) -> None: 

1348 pid = self.pid if self.using_pty else self.process.pid 

1349 try: 

1350 os.kill(pid, signal.SIGKILL) 

1351 except ProcessLookupError: 

1352 # In odd situations where our subprocess is already dead, don't 

1353 # throw this upwards. 

1354 pass 

1355 

1356 @property 

1357 def process_is_finished(self) -> bool: 

1358 if self.using_pty: 

1359 # NOTE: 

1360 # https://github.com/pexpect/ptyprocess/blob/4058faa05e2940662ab6da1330aa0586c6f9cd9c/ptyprocess/ptyprocess.py#L680-L687 

1361 # implies that Linux "requires" use of the blocking, non-WNOHANG 

1362 # version of this call. Our testing doesn't verify this, however, 

1363 # so... 

1364 # NOTE: It does appear to be totally blocking on Windows, so our 

1365 # issue #351 may be totally unsolvable there. Unclear. 

1366 pid_val, self.status = os.waitpid(self.pid, os.WNOHANG) 

1367 return pid_val != 0 

1368 else: 

1369 return self.process.poll() is not None 

1370 

1371 def returncode(self) -> Optional[int]: 

1372 if self.using_pty: 

1373 # No subprocess.returncode available; use WIFEXITED/WIFSIGNALED to 

1374 # determine whch of WEXITSTATUS / WTERMSIG to use. 

1375 # TODO: is it safe to just say "call all WEXITSTATUS/WTERMSIG and 

1376 # return whichever one of them is nondefault"? Probably not? 

1377 # NOTE: doing this in an arbitrary order should be safe since only 

1378 # one of the WIF* methods ought to ever return True. 

1379 code = None 

1380 if os.WIFEXITED(self.status): 

1381 code = os.WEXITSTATUS(self.status) 

1382 elif os.WIFSIGNALED(self.status): 

1383 code = os.WTERMSIG(self.status) 

1384 # Match subprocess.returncode by turning signals into negative 

1385 # 'exit code' integers. 

1386 code = -1 * code 

1387 return code 

1388 # TODO: do we care about WIFSTOPPED? Maybe someday? 

1389 else: 

1390 return self.process.returncode 

1391 

1392 def stop(self) -> None: 

1393 super().stop() 

1394 # If we opened a PTY for child communications, make sure to close() it, 

1395 # otherwise long-running Invoke-using processes exhaust their file 

1396 # descriptors eventually. 

1397 if self.using_pty: 

1398 try: 

1399 os.close(self.parent_fd) 

1400 except Exception: 

1401 # If something weird happened preventing the close, there's 

1402 # nothing to be done about it now... 

1403 pass 

1404 

1405 

1406class Result: 

1407 """ 

1408 A container for information about the result of a command execution. 

1409 

1410 All params are exposed as attributes of the same name and type. 

1411 

1412 :param str stdout: 

1413 The subprocess' standard output. 

1414 

1415 :param str stderr: 

1416 Same as ``stdout`` but containing standard error (unless the process 

1417 was invoked via a pty, in which case it will be empty; see 

1418 `.Runner.run`.) 

1419 

1420 :param str encoding: 

1421 The string encoding used by the local shell environment. 

1422 

1423 :param str command: 

1424 The command which was executed. 

1425 

1426 :param str shell: 

1427 The shell binary used for execution. 

1428 

1429 :param dict env: 

1430 The shell environment used for execution. (Default is the empty dict, 

1431 ``{}``, not ``None`` as displayed in the signature.) 

1432 

1433 :param int exited: 

1434 An integer representing the subprocess' exit/return code. 

1435 

1436 .. note:: 

1437 This may be ``None`` in situations where the subprocess did not run 

1438 to completion, such as when auto-responding failed or a timeout was 

1439 reached. 

1440 

1441 :param bool pty: 

1442 A boolean describing whether the subprocess was invoked with a pty or 

1443 not; see `.Runner.run`. 

1444 

1445 :param tuple hide: 

1446 A tuple of stream names (none, one or both of ``('stdout', 'stderr')``) 

1447 which were hidden from the user when the generating command executed; 

1448 this is a normalized value derived from the ``hide`` parameter of 

1449 `.Runner.run`. 

1450 

1451 For example, ``run('command', hide='stdout')`` will yield a `Result` 

1452 where ``result.hide == ('stdout',)``; ``hide=True`` or ``hide='both'`` 

1453 results in ``result.hide == ('stdout', 'stderr')``; and ``hide=False`` 

1454 (the default) generates ``result.hide == ()`` (the empty tuple.) 

1455 

1456 .. note:: 

1457 `Result` objects' truth evaluation is equivalent to their `.ok` 

1458 attribute's value. Therefore, quick-and-dirty expressions like the 

1459 following are possible:: 

1460 

1461 if run("some shell command"): 

1462 do_something() 

1463 else: 

1464 handle_problem() 

1465 

1466 However, remember `Zen of Python #2 

1467 <http://zen-of-python.info/explicit-is-better-than-implicit.html#2>`_. 

1468 

1469 .. versionadded:: 1.0 

1470 """ 

1471 

1472 # TODO: inherit from namedtuple instead? heh (or: use attrs from pypi) 

1473 def __init__( 

1474 self, 

1475 stdout: str = "", 

1476 stderr: str = "", 

1477 encoding: Optional[str] = None, 

1478 command: str = "", 

1479 shell: str = "", 

1480 env: Optional[Dict[str, Any]] = None, 

1481 exited: int = 0, 

1482 pty: bool = False, 

1483 hide: Tuple[str, ...] = tuple(), 

1484 ): 

1485 self.stdout = stdout 

1486 self.stderr = stderr 

1487 if encoding is None: 

1488 encoding = default_encoding() 

1489 self.encoding = encoding 

1490 self.command = command 

1491 self.shell = shell 

1492 self.env = {} if env is None else env 

1493 self.exited = exited 

1494 self.pty = pty 

1495 self.hide = hide 

1496 

1497 @property 

1498 def return_code(self) -> int: 

1499 """ 

1500 An alias for ``.exited``. 

1501 

1502 .. versionadded:: 1.0 

1503 """ 

1504 return self.exited 

1505 

1506 def __bool__(self) -> bool: 

1507 return self.ok 

1508 

1509 def __str__(self) -> str: 

1510 if self.exited is not None: 

1511 desc = "Command exited with status {}.".format(self.exited) 

1512 else: 

1513 desc = "Command was not fully executed due to watcher error." 

1514 ret = [desc] 

1515 for x in ("stdout", "stderr"): 

1516 val = getattr(self, x) 

1517 ret.append( 

1518 """=== {} === 

1519{} 

1520""".format( 

1521 x, val.rstrip() 

1522 ) 

1523 if val 

1524 else "(no {})".format(x) 

1525 ) 

1526 return "\n".join(ret) 

1527 

1528 def __repr__(self) -> str: 

1529 # TODO: more? e.g. len of stdout/err? (how to represent cleanly in a 

1530 # 'x=y' format like this? e.g. '4b' is ambiguous as to what it 

1531 # represents 

1532 template = "<Result cmd={!r} exited={}>" 

1533 return template.format(self.command, self.exited) 

1534 

1535 @property 

1536 def ok(self) -> bool: 

1537 """ 

1538 A boolean equivalent to ``exited == 0``. 

1539 

1540 .. versionadded:: 1.0 

1541 """ 

1542 return bool(self.exited == 0) 

1543 

1544 @property 

1545 def failed(self) -> bool: 

1546 """ 

1547 The inverse of ``ok``. 

1548 

1549 I.e., ``True`` if the program exited with a nonzero return code, and 

1550 ``False`` otherwise. 

1551 

1552 .. versionadded:: 1.0 

1553 """ 

1554 return not self.ok 

1555 

1556 def tail(self, stream: str, count: int = 10) -> str: 

1557 """ 

1558 Return the last ``count`` lines of ``stream``, plus leading whitespace. 

1559 

1560 :param str stream: 

1561 Name of some captured stream attribute, eg ``"stdout"``. 

1562 :param int count: 

1563 Number of lines to preserve. 

1564 

1565 .. versionadded:: 1.3 

1566 """ 

1567 # TODO: preserve alternate line endings? Mehhhh 

1568 # NOTE: no trailing \n preservation; easier for below display if 

1569 # normalized 

1570 return "\n\n" + "\n".join(getattr(self, stream).splitlines()[-count:]) 

1571 

1572 

1573class Promise(Result): 

1574 """ 

1575 A promise of some future `Result`, yielded from asynchronous execution. 

1576 

1577 This class' primary API member is `join`; instances may also be used as 

1578 context managers, which will automatically call `join` when the block 

1579 exits. In such cases, the context manager yields ``self``. 

1580 

1581 `Promise` also exposes copies of many `Result` attributes, specifically 

1582 those that derive from `~Runner.run` kwargs and not the result of command 

1583 execution. For example, ``command`` is replicated here, but ``stdout`` is 

1584 not. 

1585 

1586 .. versionadded:: 1.4 

1587 """ 

1588 

1589 def __init__(self, runner: "Runner") -> None: 

1590 """ 

1591 Create a new promise. 

1592 

1593 :param runner: 

1594 An in-flight `Runner` instance making this promise. 

1595 

1596 Must already have started the subprocess and spun up IO threads. 

1597 """ 

1598 self.runner = runner 

1599 # Basically just want exactly this (recently refactored) kwargs dict. 

1600 # TODO: consider proxying vs copying, but prob wait for refactor 

1601 for key, value in self.runner.result_kwargs.items(): 

1602 setattr(self, key, value) 

1603 

1604 def join(self) -> Result: 

1605 """ 

1606 Block until associated subprocess exits, returning/raising the result. 

1607 

1608 This acts identically to the end of a synchronously executed ``run``, 

1609 namely that: 

1610 

1611 - various background threads (such as IO workers) are themselves 

1612 joined; 

1613 - if the subprocess exited normally, a `Result` is returned; 

1614 - in any other case (unforeseen exceptions, IO sub-thread 

1615 `.ThreadException`, `.Failure`, `.WatcherError`) the relevant 

1616 exception is raised here. 

1617 

1618 See `~Runner.run` docs, or those of the relevant classes, for further 

1619 details. 

1620 """ 

1621 try: 

1622 return self.runner._finish() 

1623 finally: 

1624 self.runner.stop() 

1625 

1626 def __enter__(self) -> "Promise": 

1627 return self 

1628 

1629 def __exit__( 

1630 self, 

1631 exc_type: Optional[Type[BaseException]], 

1632 exc_value: BaseException, 

1633 exc_tb: Optional[TracebackType], 

1634 ) -> None: 

1635 self.join() 

1636 

1637 

1638def normalize_hide( 

1639 val: Any, 

1640 out_stream: Optional[str] = None, 

1641 err_stream: Optional[str] = None, 

1642) -> Tuple[str, ...]: 

1643 # Normalize to list-of-stream-names 

1644 hide_vals = (None, False, "out", "stdout", "err", "stderr", "both", True) 

1645 if val not in hide_vals: 

1646 err = "'hide' got {!r} which is not in {!r}" 

1647 raise ValueError(err.format(val, hide_vals)) 

1648 if val in (None, False): 

1649 hide = [] 

1650 elif val in ("both", True): 

1651 hide = ["stdout", "stderr"] 

1652 elif val == "out": 

1653 hide = ["stdout"] 

1654 elif val == "err": 

1655 hide = ["stderr"] 

1656 else: 

1657 hide = [val] 

1658 # Revert any streams that have been overridden from the default value 

1659 if out_stream is not None and "stdout" in hide: 

1660 hide.remove("stdout") 

1661 if err_stream is not None and "stderr" in hide: 

1662 hide.remove("stderr") 

1663 return tuple(hide) 

1664 

1665 

1666def default_encoding() -> str: 

1667 """ 

1668 Obtain apparent interpreter-local default text encoding. 

1669 

1670 Often used as a baseline in situations where we must use SOME encoding for 

1671 unknown-but-presumably-text bytes, and the user has not specified an 

1672 override. 

1673 """ 

1674 encoding = locale.getpreferredencoding(False) 

1675 return encoding