Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/invoke/runners.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

448 statements  

1import errno 

2import locale 

3import os 

4import signal 

5import struct 

6import sys 

7import threading 

8import time 

9from contextlib import AbstractContextManager 

10from subprocess import PIPE, Popen 

11from types import TracebackType 

12from typing import ( 

13 IO, 

14 TYPE_CHECKING, 

15 Any, 

16 Callable, 

17 Dict, 

18 Generator, 

19 List, 

20 Optional, 

21 Tuple, 

22 Type, 

23) 

24 

25# Import some platform-specific things at top level so they can be mocked for 

26# tests. 

27try: 

28 import pty 

29except ImportError: 

30 pty = None # type: ignore[assignment] 

31try: 

32 import fcntl 

33except ImportError: 

34 fcntl = None # type: ignore[assignment] 

35try: 

36 import termios 

37except ImportError: 

38 termios = None # type: ignore[assignment] 

39 

40from .exceptions import ( 

41 CommandTimedOut, 

42 Failure, 

43 SubprocessPipeError, 

44 ThreadException, 

45 UnexpectedExit, 

46 WatcherError, 

47) 

48from .terminals import ( 

49 WINDOWS, 

50 bytes_to_read, 

51 character_buffered, 

52 pty_size, 

53 ready_for_reading, 

54) 

55from .util import ExceptionHandlingThread, has_fileno, isatty 

56 

57if TYPE_CHECKING: 

58 from .context import Context 

59 from .watchers import StreamWatcher 

60 

61 

62class Runner: 

63 """ 

64 Partially-abstract core command-running API. 

65 

66 This class is not usable by itself and must be subclassed, implementing a 

67 number of methods such as `start`, `wait` and `returncode`. For a subclass 

68 implementation example, see the source code for `.Local`. 

69 

70 .. versionadded:: 1.0 

71 """ 

72 

73 opts: Dict[str, Any] 

74 using_pty: bool 

75 read_chunk_size = 1000 

76 input_sleep = 0.01 

77 

78 def __init__(self, context: "Context") -> None: 

79 """ 

80 Create a new runner with a handle on some `.Context`. 

81 

82 :param context: 

83 a `.Context` instance, used to transmit default options and provide 

84 access to other contextualized information (e.g. a remote-oriented 

85 `.Runner` might want a `.Context` subclass holding info about 

86 hostnames and ports.) 

87 

88 .. note:: 

89 The `.Context` given to `.Runner` instances **must** contain 

90 default config values for the `.Runner` class in question. At a 

91 minimum, this means values for each of the default 

92 `.Runner.run` keyword arguments such as ``echo`` and ``warn``. 

93 

94 :raises ValueError: 

95 if not all expected default values are found in ``context``. 

96 """ 

97 #: The `.Context` given to the same-named argument of `__init__`. 

98 self.context = context 

99 #: A `threading.Event` signaling program completion. 

100 #: 

101 #: Typically set after `wait` returns. Some IO mechanisms rely on this 

102 #: to know when to exit an infinite read loop. 

103 self.program_finished = threading.Event() 

104 # I wish Sphinx would organize all class/instance attrs in the same 

105 # place. If I don't do this here, it goes 'class vars -> __init__ 

106 # docstring -> instance vars' :( TODO: consider just merging class and 

107 # __init__ docstrings, though that's annoying too. 

108 #: How many bytes (at maximum) to read per iteration of stream reads. 

109 self.read_chunk_size = self.__class__.read_chunk_size 

110 # Ditto re: declaring this in 2 places for doc reasons. 

111 #: How many seconds to sleep on each iteration of the stdin read loop 

112 #: and other otherwise-fast loops. 

113 self.input_sleep = self.__class__.input_sleep 

114 #: Whether pty fallback warning has been emitted. 

115 self.warned_about_pty_fallback = False 

116 #: A list of `.StreamWatcher` instances for use by `respond`. Is filled 

117 #: in at runtime by `run`. 

118 self.watchers: List["StreamWatcher"] = [] 

119 # Optional timeout timer placeholder 

120 self._timer: Optional[threading.Timer] = None 

121 # Async flags (initialized for 'finally' referencing in case something 

122 # goes REAL bad during options parsing) 

123 self._asynchronous = False 

124 self._disowned = False 

125 

126 def run(self, command: str, **kwargs: Any) -> "Result": 

127 """ 

128 Execute ``command``, returning an instance of `Result` once complete. 

129 

130 By default, this method is synchronous (it only returns once the 

131 subprocess has completed), and allows interactive keyboard 

132 communication with the subprocess. 

133 

134 It can instead behave asynchronously (returning early & requiring 

135 interaction with the resulting object to manage subprocess lifecycle) 

136 if you specify ``asynchronous=True``. Furthermore, you can completely 

137 disassociate the subprocess from Invoke's control (allowing it to 

138 persist on its own after Python exits) by saying ``disown=True``. See 

139 the per-kwarg docs below for details on both of these. 

140 

141 .. note:: 

142 All kwargs will default to the values found in this instance's 

143 `~.Runner.context` attribute, specifically in its configuration's 

144 ``run`` subtree (e.g. ``run.echo`` provides the default value for 

145 the ``echo`` keyword, etc). The base default values are described 

146 in the parameter list below. 

147 

148 :param str command: The shell command to execute. 

149 

150 :param bool asynchronous: 

151 When set to ``True`` (default ``False``), enables asynchronous 

152 behavior, as follows: 

153 

154 - Connections to the controlling terminal are disabled, meaning you 

155 will not see the subprocess output and it will not respond to 

156 your keyboard input - similar to ``hide=True`` and 

157 ``in_stream=False`` (though explicitly given 

158 ``(out|err|in)_stream`` file-like objects will still be honored 

159 as normal). 

160 - `.run` returns immediately after starting the subprocess, and its 

161 return value becomes an instance of `Promise` instead of 

162 `Result`. 

163 - `Promise` objects are primarily useful for their `~Promise.join` 

164 method, which blocks until the subprocess exits (similar to 

165 threading APIs) and either returns a final `~Result` or raises an 

166 exception, just as a synchronous ``run`` would. 

167 

168 - As with threading and similar APIs, users of 

169 ``asynchronous=True`` should make sure to ``join`` their 

170 `Promise` objects to prevent issues with interpreter 

171 shutdown. 

172 - One easy way to handle such cleanup is to use the `Promise` 

173 as a context manager - it will automatically ``join`` at the 

174 exit of the context block. 

175 

176 .. versionadded:: 1.4 

177 

178 :param bool disown: 

179 When set to ``True`` (default ``False``), returns immediately like 

180 ``asynchronous=True``, but does not perform any background work 

181 related to that subprocess (it is completely ignored). This allows 

182 subprocesses using shell backgrounding or similar techniques (e.g. 

183 trailing ``&``, ``nohup``) to persist beyond the lifetime of the 

184 Python process running Invoke. 

185 

186 .. note:: 

187 If you're unsure whether you want this or ``asynchronous``, you 

188 probably want ``asynchronous``! 

189 

190 Specifically, ``disown=True`` has the following behaviors: 

191 

192 - The return value is still a `Result`, but it is severely limited 

193 in functionality & data, per below. 

194 - No I/O worker threads are spun up, so you will have no access to 

195 the subprocess' stdout/stderr (those attributes on the result 

196 will be empty strings), your stdin will not be forwarded, 

197 ``(out|err|in)_stream`` will be ignored, and features like 

198 ``watchers`` will not function. 

199 - No exit code is checked for, so you will not receive any errors 

200 if the subprocess fails to exit cleanly, the result's ``exited`` 

201 will be ``None``, and convenience properties like `ok` and 

202 `failed` will not be reliable. 

203 - ``pty=True`` may not function correctly (subprocesses may not run 

204 at all; this seems to be a potential bug in Python's 

205 ``pty.fork``) unless your command line includes tools such as 

206 ``nohup`` or (the shell builtin) ``disown``. 

207 

208 .. versionadded:: 1.4 

209 .. versionchanged:: 3.0 

210 The return value when ``disown=True`` changed from `None` to 

211 `Result`. 

212 

213 :param bool dry: 

214 Whether to dry-run instead of truly invoking the given command. See 

215 :option:`--dry` (which flips this on globally) for details on this 

216 behavior. 

217 

218 .. versionadded:: 1.3 

219 

220 :param bool echo: 

221 Controls whether `.run` prints the command string to local stdout 

222 prior to executing it. Default: ``False``. 

223 

224 .. note:: 

225 ``hide=True`` will override ``echo=True`` if both are given. 

226 

227 :param echo_format: 

228 A string, which when passed to Python's inbuilt ``.format`` method, 

229 will change the format of the output when ``run.echo`` is set to 

230 true. 

231 

232 Currently, only ``{command}`` is supported as a parameter. 

233 

234 Defaults to printing the full command string in ANSI-escaped bold. 

235 

236 :param bool echo_stdin: 

237 Whether to write data from ``in_stream`` back to ``out_stream``. 

238 

239 In other words, in normal interactive usage, this parameter 

240 controls whether Invoke mirrors what you type back to your 

241 terminal. 

242 

243 By default (when ``None``), this behavior is triggered by the 

244 following: 

245 

246 * Not using a pty to run the subcommand (i.e. ``pty=False``), 

247 as ptys natively echo stdin to stdout on their own; 

248 * And when the controlling terminal of Invoke itself (as per 

249 ``in_stream``) appears to be a valid terminal device or TTY. 

250 (Specifically, when `~invoke.util.isatty` yields a ``True`` 

251 result when given ``in_stream``.) 

252 

253 .. note:: 

254 This property tends to be ``False`` when piping another 

255 program's output into an Invoke session, or when running 

256 Invoke within another program (e.g. running Invoke from 

257 itself). 

258 

259 If both of those properties are true, echoing will occur; if either 

260 is false, no echoing will be performed. 

261 

262 When not ``None``, this parameter will override that auto-detection 

263 and force, or disable, echoing. 

264 

265 :param str encoding: 

266 Override auto-detection of which encoding the subprocess is using 

267 for its stdout/stderr streams (which defaults to the return value 

268 of `default_encoding`). 

269 

270 :param err_stream: 

271 Same as ``out_stream``, except for standard error, and defaulting 

272 to ``sys.stderr``. 

273 

274 :param dict env: 

275 By default, subprocesses receive a copy of Invoke's own environment 

276 (i.e. ``os.environ``). Supply a dict here to update that child 

277 environment. 

278 

279 For example, ``run('command', env={'PYTHONPATH': 

280 '/some/virtual/env/maybe'})`` would modify the ``PYTHONPATH`` env 

281 var, with the rest of the child's env looking identical to the 

282 parent. 

283 

284 .. seealso:: ``replace_env`` for changing 'update' to 'replace'. 

285 

286 :param bool fallback: 

287 Controls auto-fallback behavior re: problems offering a pty when 

288 ``pty=True``. Whether this has any effect depends on the specific 

289 `Runner` subclass being invoked. Default: ``True``. 

290 

291 :param hide: 

292 Allows the caller to disable ``run``'s default behavior of copying 

293 the subprocess' stdout and stderr to the controlling terminal. 

294 Specify ``hide='out'`` (or ``'stdout'``) to hide only the stdout 

295 stream, ``hide='err'`` (or ``'stderr'``) to hide only stderr, or 

296 ``hide='both'`` (or ``True``) to hide both streams. 

297 

298 The default value is ``None``, meaning to print everything; 

299 ``False`` will also disable hiding. 

300 

301 .. note:: 

302 Stdout and stderr are always captured and stored in the 

303 ``Result`` object, regardless of ``hide``'s value. 

304 

305 .. note:: 

306 ``hide=True`` will also override ``echo=True`` if both are 

307 given (either as kwargs or via config/CLI). 

308 

309 :param in_stream: 

310 A file-like stream object to used as the subprocess' standard 

311 input. If ``None`` (the default), ``sys.stdin`` will be used. 

312 

313 If ``False``, will disable stdin mirroring entirely (though other 

314 functionality which writes to the subprocess' stdin, such as 

315 autoresponding, will still function.) Disabling stdin mirroring can 

316 help when ``sys.stdin`` is a misbehaving non-stream object, such as 

317 under test harnesses or headless command runners. 

318 

319 :param out_stream: 

320 A file-like stream object to which the subprocess' standard output 

321 should be written. If ``None`` (the default), ``sys.stdout`` will 

322 be used. 

323 

324 :param bool pty: 

325 By default, ``run`` connects directly to the invoked process and 

326 reads its stdout/stderr streams. Some programs will buffer (or even 

327 behave) differently in this situation compared to using an actual 

328 terminal or pseudoterminal (pty). To use a pty instead of the 

329 default behavior, specify ``pty=True``. 

330 

331 .. warning:: 

332 Due to their nature, ptys have a single output stream, so the 

333 ability to tell stdout apart from stderr is **not possible** 

334 when ``pty=True``. As such, all output will appear on 

335 ``out_stream`` (see below) and be captured into the ``stdout`` 

336 result attribute. ``err_stream`` and ``stderr`` will always be 

337 empty when ``pty=True``. 

338 

339 :param bool replace_env: 

340 When ``True``, causes the subprocess to receive the dictionary 

341 given to ``env`` as its entire shell environment, instead of 

342 updating a copy of ``os.environ`` (which is the default behavior). 

343 Default: ``False``. 

344 

345 :param str shell: 

346 Which shell binary to use. Default: ``/bin/bash`` (on Unix; 

347 ``COMSPEC`` or ``cmd.exe`` on Windows.) 

348 

349 :param timeout: 

350 Cause the runner to submit an interrupt to the subprocess and raise 

351 `.CommandTimedOut`, if the command takes longer than ``timeout`` 

352 seconds to execute. Defaults to ``None``, meaning no timeout. 

353 

354 .. versionadded:: 1.3 

355 

356 :param bool warn: 

357 Whether to warn and continue, instead of raising 

358 `.UnexpectedExit`, when the executed command exits with a 

359 nonzero status. Default: ``False``. 

360 

361 .. note:: 

362 This setting has no effect on exceptions, which will still be 

363 raised, typically bundled in `.ThreadException` objects if they 

364 were raised by the IO worker threads. 

365 

366 Similarly, `.WatcherError` exceptions raised by 

367 `.StreamWatcher` instances will also ignore this setting, and 

368 will usually be bundled inside `.Failure` objects (in order to 

369 preserve the execution context). 

370 

371 Ditto `.CommandTimedOut` - basically, anything that prevents a 

372 command from actually getting to "exited with an exit code" 

373 ignores this flag. 

374 

375 :param watchers: 

376 A list of `.StreamWatcher` instances which will be used to scan the 

377 program's ``stdout`` or ``stderr`` and may write into its ``stdin`` 

378 (typically ``bytes`` objects) in response to patterns or other 

379 heuristics. 

380 

381 See :doc:`/concepts/watchers` for details on this functionality. 

382 

383 Default: ``[]``. 

384 

385 :returns: 

386 `Result`, or a subclass thereof. 

387 

388 :raises: 

389 `.UnexpectedExit`, if the command exited nonzero and 

390 ``warn`` was ``False``. 

391 

392 :raises: 

393 `.Failure`, if the command didn't even exit cleanly, e.g. if a 

394 `.StreamWatcher` raised `.WatcherError`. 

395 

396 :raises: 

397 `.ThreadException` (if the background I/O threads encountered 

398 exceptions other than `.WatcherError`). 

399 

400 .. versionadded:: 1.0 

401 """ 

402 try: 

403 return self._run_body(command, **kwargs) 

404 finally: 

405 if not (self._asynchronous or self._disowned): 

406 self.stop() 

407 

408 def echo(self, command: str) -> None: 

409 print(self.opts["echo_format"].format(command=command)) 

410 

411 def _setup(self, command: str, kwargs: Any) -> None: 

412 """ 

413 Prepare data on ``self`` so we're ready to start running. 

414 """ 

415 # Normalize kwargs w/ config; sets self.opts, self.streams 

416 self._unify_kwargs_with_config(kwargs) 

417 # Environment setup 

418 self.env = self.generate_env( 

419 self.opts["env"], self.opts["replace_env"] 

420 ) 

421 # Arrive at final encoding if neither config nor kwargs had one 

422 self.encoding = self.opts["encoding"] or self.default_encoding() 

423 # Echo running command (wants to be early to be included in dry-run) 

424 if self.opts["echo"]: 

425 self.echo(command) 

426 # Prepare common result args. 

427 # TODO: I hate this. Needs a deeper separate think about tweaking 

428 # Runner.generate_result in a way that isn't literally just this same 

429 # two-step process, and which also works w/ downstream. 

430 self.result_kwargs = dict( 

431 command=command, 

432 shell=self.opts["shell"], 

433 env=self.env, 

434 pty=self.using_pty, 

435 hide=self.opts["hide"], 

436 encoding=self.encoding, 

437 ) 

438 

439 def _run_body(self, command: str, **kwargs: Any) -> "Result": 

440 # Prepare all the bits n bobs. 

441 self._setup(command, kwargs) 

442 # If dry-run, stop here. 

443 if self.opts["dry"]: 

444 return self.generate_result( 

445 **dict(self.result_kwargs, stdout="", stderr="", exited=0) 

446 ) 

447 # Start executing the actual command (runs in background) 

448 self.start(command, self.opts["shell"], self.env) 

449 # Update result data with anything only obtainable post-start. 

450 self.result_kwargs["pid"] = self.get_pid() 

451 # If disowned, we just stop here - no threads, no timer, no error 

452 # checking, nada. 

453 if self._disowned: 

454 return self.generate_result( 

455 **dict( 

456 self.result_kwargs, 

457 stdout="", 

458 stderr="", 

459 exited=None, 

460 disowned=True, 

461 ) 

462 ) 

463 # Stand up & kick off IO, timer threads 

464 self.start_timer(self.opts["timeout"]) 

465 self.threads, self.stdout, self.stderr = self.create_io_threads() 

466 for thread in self.threads.values(): 

467 thread.start() 

468 # Wrap up or promise that we will, depending 

469 return self.make_promise() if self._asynchronous else self._finish() 

470 

471 def make_promise(self) -> "Promise": 

472 """ 

473 Return a `Promise` allowing async control of the rest of lifecycle. 

474 

475 .. versionadded:: 1.4 

476 """ 

477 return Promise(self) 

478 

479 def _finish(self) -> "Result": 

480 # Wait for subprocess to run, forwarding signals as we get them. 

481 try: 

482 while True: 

483 try: 

484 self.wait() 

485 break # done waiting! 

486 # Don't locally stop on ^C, only forward it: 

487 # - if remote end really stops, we'll naturally stop after 

488 # - if remote end does not stop (eg REPL, editor) we don't want 

489 # to stop prematurely 

490 except KeyboardInterrupt as e: 

491 self.send_interrupt(e) 

492 # TODO: honor other signals sent to our own process and 

493 # transmit them to the subprocess before handling 'normally'. 

494 # Make sure we tie off our worker threads, even if something exploded. 

495 # Any exceptions that raised during self.wait() above will appear after 

496 # this block. 

497 finally: 

498 # Inform stdin-mirroring worker to stop its eternal looping 

499 self.program_finished.set() 

500 # Join threads, storing inner exceptions, & set a timeout if 

501 # necessary. (Segregate WatcherErrors as they are "anticipated 

502 # errors" that want to show up at the end during creation of 

503 # Failure objects.) 

504 watcher_errors = [] 

505 thread_exceptions = [] 

506 for target, thread in self.threads.items(): 

507 thread.join(self._thread_join_timeout(target)) 

508 exception = thread.exception() 

509 if exception is not None: 

510 real = exception.value 

511 if isinstance(real, WatcherError): 

512 watcher_errors.append(real) 

513 else: 

514 thread_exceptions.append(exception) 

515 # If any exceptions appeared inside the threads, raise them now as an 

516 # aggregate exception object. 

517 # NOTE: this is kept outside the 'finally' so that main-thread 

518 # exceptions are raised before worker-thread exceptions; they're more 

519 # likely to be Big Serious Problems. 

520 if thread_exceptions: 

521 raise ThreadException(thread_exceptions) 

522 # Collate stdout/err, calculate exited, and get final result obj 

523 result = self._collate_result(watcher_errors) 

524 # Any presence of WatcherError from the threads indicates a watcher was 

525 # upset and aborted execution; make a generic Failure out of it and 

526 # raise that. 

527 if watcher_errors: 

528 # TODO: ambiguity exists if we somehow get WatcherError in *both* 

529 # threads...as unlikely as that would normally be. 

530 raise Failure(result, reason=watcher_errors[0]) 

531 # If a timeout was requested and the subprocess did time out, shout. 

532 timeout = self.opts["timeout"] 

533 if timeout is not None and self.timed_out: 

534 raise CommandTimedOut(result, timeout=timeout) 

535 if not (result or self.opts["warn"]): 

536 raise UnexpectedExit(result) 

537 return result 

538 

539 def _unify_kwargs_with_config(self, kwargs: Any) -> None: 

540 """ 

541 Unify `run` kwargs with config options to arrive at local options. 

542 

543 Sets: 

544 

545 - ``self.opts`` - opts dict 

546 - ``self.streams`` - map of stream names to stream target values 

547 """ 

548 opts = {} 

549 for key, value in self.context.config.run.items(): 

550 runtime = kwargs.pop(key, None) 

551 opts[key] = value if runtime is None else runtime 

552 # Pull in command execution timeout, which stores config elsewhere, 

553 # but only use it if it's actually set (backwards compat) 

554 config_timeout = self.context.config.timeouts.command 

555 opts["timeout"] = kwargs.pop("timeout", config_timeout) 

556 # Handle invalid kwarg keys (anything left in kwargs). 

557 # Act like a normal function would, i.e. TypeError 

558 if kwargs: 

559 err = "run() got an unexpected keyword argument '{}'" 

560 raise TypeError(err.format(list(kwargs.keys())[0])) 

561 # Update disowned, async flags 

562 self._asynchronous = opts["asynchronous"] 

563 self._disowned = opts["disown"] 

564 if self._asynchronous and self._disowned: 

565 err = "Cannot give both 'asynchronous' and 'disown' at the same time!" # noqa 

566 raise ValueError(err) 

567 # If hide was True, turn off echoing 

568 if opts["hide"] is True: 

569 opts["echo"] = False 

570 # Conversely, ensure echoing is always on when dry-running 

571 if opts["dry"] is True: 

572 opts["echo"] = True 

573 # Always hide if async 

574 if self._asynchronous: 

575 opts["hide"] = True 

576 # Then normalize 'hide' from one of the various valid input values, 

577 # into a stream-names tuple. Also account for the streams. 

578 out_stream, err_stream = opts["out_stream"], opts["err_stream"] 

579 opts["hide"] = normalize_hide(opts["hide"], out_stream, err_stream) 

580 # Derive stream objects 

581 if out_stream is None: 

582 out_stream = sys.stdout 

583 if err_stream is None: 

584 err_stream = sys.stderr 

585 in_stream = opts["in_stream"] 

586 if in_stream is None: 

587 # If in_stream hasn't been overridden, and we're async, we don't 

588 # want to read from sys.stdin (otherwise the default) - so set 

589 # False instead. 

590 in_stream = False if self._asynchronous else sys.stdin 

591 # Determine pty or no 

592 self.using_pty = self.should_use_pty(opts["pty"], opts["fallback"]) 

593 if opts["watchers"]: 

594 self.watchers = opts["watchers"] 

595 # Set data 

596 self.opts = opts 

597 self.streams = {"out": out_stream, "err": err_stream, "in": in_stream} 

598 

599 def _collate_result(self, watcher_errors: List[WatcherError]) -> "Result": 

600 # At this point, we had enough success that we want to be returning or 

601 # raising detailed info about our execution; so we generate a Result. 

602 stdout = "".join(self.stdout) 

603 stderr = "".join(self.stderr) 

604 if WINDOWS: 

605 # "Universal newlines" - replace all standard forms of 

606 # newline with \n. This is not technically Windows related 

607 # (\r as newline is an old Mac convention) but we only apply 

608 # the translation for Windows as that's the only platform 

609 # it is likely to matter for these days. 

610 stdout = stdout.replace("\r\n", "\n").replace("\r", "\n") 

611 stderr = stderr.replace("\r\n", "\n").replace("\r", "\n") 

612 # Get return/exit code, unless there were WatcherErrors to handle. 

613 # NOTE: In that case, returncode() may block waiting on the process 

614 # (which may be waiting for user input). Since most WatcherError 

615 # situations lack a useful exit code anyways, skipping this doesn't 

616 # really hurt any. 

617 exited = None if watcher_errors else self.returncode() 

618 # TODO: as noted elsewhere, I kinda hate this. Consider changing 

619 # generate_result()'s API in next major rev so we can tidy up. 

620 result = self.generate_result( 

621 **dict( 

622 self.result_kwargs, stdout=stdout, stderr=stderr, exited=exited 

623 ) 

624 ) 

625 return result 

626 

627 def _thread_join_timeout(self, target: Callable) -> Optional[int]: 

628 # Add a timeout to out/err thread joins when it looks like they're not 

629 # dead but their counterpart is dead; this indicates issue #351 (fixed 

630 # by #432) where the subproc may hang because its stdout (or stderr) is 

631 # no longer being consumed by the dead thread (and a pipe is filling 

632 # up.) In that case, the non-dead thread is likely to block forever on 

633 # a `recv` unless we add this timeout. 

634 if target == self.handle_stdin: 

635 return None 

636 opposite = self.handle_stderr 

637 if target == self.handle_stderr: 

638 opposite = self.handle_stdout 

639 if opposite in self.threads and self.threads[opposite].is_dead: 

640 return 1 

641 return None 

642 

643 def create_io_threads( 

644 self, 

645 ) -> Tuple[Dict[Callable, ExceptionHandlingThread], List[str], List[str]]: 

646 """ 

647 Create and return a dictionary of IO thread worker objects. 

648 

649 Caller is expected to handle persisting and/or starting the wrapped 

650 threads. 

651 """ 

652 stdout: List[str] = [] 

653 stderr: List[str] = [] 

654 # Set up IO thread parameters (format - body_func: {kwargs}) 

655 thread_args: Dict[Callable, Any] = { 

656 self.handle_stdout: { 

657 "buffer_": stdout, 

658 "hide": "stdout" in self.opts["hide"], 

659 "output": self.streams["out"], 

660 } 

661 } 

662 # After opt processing above, in_stream will be a real stream obj or 

663 # False, so we can truth-test it. We don't even create a stdin-handling 

664 # thread if it's False, meaning user indicated stdin is nonexistent or 

665 # problematic. 

666 if self.streams["in"]: 

667 thread_args[self.handle_stdin] = { 

668 "input_": self.streams["in"], 

669 "output": self.streams["out"], 

670 "echo": self.opts["echo_stdin"], 

671 } 

672 if not self.using_pty: 

673 thread_args[self.handle_stderr] = { 

674 "buffer_": stderr, 

675 "hide": "stderr" in self.opts["hide"], 

676 "output": self.streams["err"], 

677 } 

678 # Kick off IO threads 

679 threads = {} 

680 for target, kwargs in thread_args.items(): 

681 t = ExceptionHandlingThread(target=target, kwargs=kwargs) 

682 threads[target] = t 

683 return threads, stdout, stderr 

684 

685 def generate_result(self, **kwargs: Any) -> "Result": 

686 """ 

687 Create & return a suitable `Result` instance from the given ``kwargs``. 

688 

689 Subclasses may wish to override this in order to manipulate things or 

690 generate a `Result` subclass (e.g. ones containing additional metadata 

691 besides the default). 

692 

693 .. versionadded:: 1.0 

694 """ 

695 return Result(**kwargs) 

696 

697 def read_proc_output(self, reader: Callable) -> Generator[str, None, None]: 

698 """ 

699 Iteratively read & decode bytes from a subprocess' out/err stream. 

700 

701 :param reader: 

702 A literal reader function/partial, wrapping the actual stream 

703 object in question, which takes a number of bytes to read, and 

704 returns that many bytes (or ``None``). 

705 

706 ``reader`` should be a reference to either `read_proc_stdout` or 

707 `read_proc_stderr`, which perform the actual, platform/library 

708 specific read calls. 

709 

710 :returns: 

711 A generator yielding strings. 

712 

713 Specifically, each resulting string is the result of decoding 

714 `read_chunk_size` bytes read from the subprocess' out/err stream. 

715 

716 .. versionadded:: 1.0 

717 """ 

718 # NOTE: Typically, reading from any stdout/err (local, remote or 

719 # otherwise) can be thought of as "read until you get nothing back". 

720 # This is preferable over "wait until an out-of-band signal claims the 

721 # process is done running" because sometimes that signal will appear 

722 # before we've actually read all the data in the stream (i.e.: a race 

723 # condition). 

724 while True: 

725 data = reader(self.read_chunk_size) 

726 if not data: 

727 break 

728 yield self.decode(data) 

729 

730 def write_our_output(self, stream: IO, string: str) -> None: 

731 """ 

732 Write ``string`` to ``stream``. 

733 

734 Also calls ``.flush()`` on ``stream`` to ensure that real terminal 

735 streams don't buffer. 

736 

737 :param stream: 

738 A file-like stream object, mapping to the ``out_stream`` or 

739 ``err_stream`` parameters of `run`. 

740 

741 :param string: A Unicode string object. 

742 

743 :returns: ``None``. 

744 

745 .. versionadded:: 1.0 

746 """ 

747 stream.write(string) 

748 stream.flush() 

749 

750 def _handle_output( 

751 self, 

752 buffer_: List[str], 

753 hide: bool, 

754 output: IO, 

755 reader: Callable, 

756 ) -> None: 

757 # TODO: store un-decoded/raw bytes somewhere as well... 

758 for data in self.read_proc_output(reader): 

759 # Echo to local stdout if necessary 

760 # TODO: should we rephrase this as "if you want to hide, give me a 

761 # dummy output stream, e.g. something like /dev/null"? Otherwise, a 

762 # combo of 'hide=stdout' + 'here is an explicit out_stream' means 

763 # out_stream is never written to, and that seems...odd. 

764 if not hide: 

765 self.write_our_output(stream=output, string=data) 

766 # Store in shared buffer so main thread can do things with the 

767 # result after execution completes. 

768 # NOTE: this is threadsafe insofar as no reading occurs until after 

769 # the thread is join()'d. 

770 buffer_.append(data) 

771 # Run our specific buffer through the autoresponder framework 

772 self.respond(buffer_) 

773 

774 def handle_stdout( 

775 self, buffer_: List[str], hide: bool, output: IO 

776 ) -> None: 

777 """ 

778 Read process' stdout, storing into a buffer & printing/parsing. 

779 

780 Intended for use as a thread target. Only terminates when all stdout 

781 from the subprocess has been read. 

782 

783 :param buffer_: The capture buffer shared with the main thread. 

784 :param bool hide: Whether or not to replay data into ``output``. 

785 :param output: 

786 Output stream (file-like object) to write data into when not 

787 hiding. 

788 

789 :returns: ``None``. 

790 

791 .. versionadded:: 1.0 

792 """ 

793 self._handle_output( 

794 buffer_, hide, output, reader=self.read_proc_stdout 

795 ) 

796 

797 def handle_stderr( 

798 self, buffer_: List[str], hide: bool, output: IO 

799 ) -> None: 

800 """ 

801 Read process' stderr, storing into a buffer & printing/parsing. 

802 

803 Identical to `handle_stdout` except for the stream read from; see its 

804 docstring for API details. 

805 

806 .. versionadded:: 1.0 

807 """ 

808 self._handle_output( 

809 buffer_, hide, output, reader=self.read_proc_stderr 

810 ) 

811 

812 def read_our_stdin(self, input_: IO) -> Optional[str]: 

813 """ 

814 Read & decode bytes from a local stdin stream. 

815 

816 :param input_: 

817 Actual stream object to read from. Maps to ``in_stream`` in `run`, 

818 so will often be ``sys.stdin``, but might be any stream-like 

819 object. 

820 

821 :returns: 

822 A Unicode string, the result of decoding the read bytes (this might 

823 be the empty string if the pipe has closed/reached EOF); or 

824 ``None`` if stdin wasn't ready for reading yet. 

825 

826 .. versionadded:: 1.0 

827 """ 

828 # TODO: consider moving the character_buffered contextmanager call in 

829 # here? Downside is it would be flipping those switches for every byte 

830 # read instead of once per session, which could be costly (?). 

831 bytes_ = None 

832 if ready_for_reading(input_): 

833 try: 

834 bytes_ = input_.read(bytes_to_read(input_)) 

835 except OSError as e: 

836 # Assume EBADF in this situation implies running under nohup or 

837 # similar, where: 

838 # - we cannot reliably detect a bad FD up front 

839 # - trying to read it would explode 

840 # - user almost surely doesn't care about stdin anyways 

841 # and ignore it (but not other OSErrors!) 

842 if e.errno != errno.EBADF: 

843 raise 

844 # Decode if it appears to be binary-type. (From real terminal 

845 # streams, usually yes; from file-like objects, often no.) 

846 if bytes_ and isinstance(bytes_, bytes): 

847 # TODO: will decoding 1 byte at a time break multibyte 

848 # character encodings? How to square interactivity with that? 

849 bytes_ = self.decode(bytes_) 

850 return bytes_ 

851 

852 def handle_stdin( 

853 self, 

854 input_: IO, 

855 output: IO, 

856 echo: bool = False, 

857 ) -> None: 

858 """ 

859 Read local stdin, copying into process' stdin as necessary. 

860 

861 Intended for use as a thread target. 

862 

863 .. note:: 

864 Because real terminal stdin streams have no well-defined "end", if 

865 such a stream is detected (based on existence of a callable 

866 ``.fileno()``) this method will wait until `program_finished` is 

867 set, before terminating. 

868 

869 When the stream doesn't appear to be from a terminal, the same 

870 semantics as `handle_stdout` are used - the stream is simply 

871 ``read()`` from until it returns an empty value. 

872 

873 :param input_: Stream (file-like object) from which to read. 

874 :param output: Stream (file-like object) to which echoing may occur. 

875 :param bool echo: User override option for stdin-stdout echoing. 

876 

877 :returns: ``None``. 

878 

879 .. versionadded:: 1.0 

880 """ 

881 # TODO: reinstate lock/whatever thread logic from fab v1 which prevents 

882 # reading from stdin while other parts of the code are prompting for 

883 # runtime passwords? (search for 'input_enabled') 

884 # TODO: fabric#1339 is strongly related to this, if it's not literally 

885 # exposing some regression in Fabric 1.x itself. 

886 closed_stdin = False 

887 with character_buffered(input_): 

888 while True: 

889 data = self.read_our_stdin(input_) 

890 if data: 

891 # Mirror what we just read to process' stdin. 

892 # We encode to ensure bytes, but skip the decode step since 

893 # there's presumably no need (nobody's interacting with 

894 # this data programmatically). 

895 self.write_proc_stdin(data) 

896 # Also echo it back to local stdout (or whatever 

897 # out_stream is set to) when necessary. 

898 if echo is None: 

899 echo = self.should_echo_stdin(input_, output) 

900 if echo: 

901 self.write_our_output(stream=output, string=data) 

902 # Empty string/char/byte != None. Can't just use 'else' here. 

903 elif data is not None: 

904 # When reading from file-like objects that aren't "real" 

905 # terminal streams, an empty byte signals EOF. 

906 if not self.using_pty and not closed_stdin: 

907 self.close_proc_stdin() 

908 closed_stdin = True 

909 # Dual all-done signals: program being executed is done 

910 # running, *and* we don't seem to be reading anything out of 

911 # stdin. (NOTE: If we only test the former, we may encounter 

912 # race conditions re: unread stdin.) 

913 if self.program_finished.is_set() and not data: 

914 break 

915 # Take a nap so we're not chewing CPU. 

916 time.sleep(self.input_sleep) 

917 

918 def should_echo_stdin(self, input_: IO, output: IO) -> bool: 

919 """ 

920 Determine whether data read from ``input_`` should echo to ``output``. 

921 

922 Used by `handle_stdin`; tests attributes of ``input_`` and ``output``. 

923 

924 :param input_: Input stream (file-like object). 

925 :param output: Output stream (file-like object). 

926 :returns: A ``bool``. 

927 

928 .. versionadded:: 1.0 

929 """ 

930 return (not self.using_pty) and isatty(input_) 

931 

932 def respond(self, buffer_: List[str]) -> None: 

933 """ 

934 Write to the program's stdin in response to patterns in ``buffer_``. 

935 

936 The patterns and responses are driven by the `.StreamWatcher` instances 

937 from the ``watchers`` kwarg of `run` - see :doc:`/concepts/watchers` 

938 for a conceptual overview. 

939 

940 :param buffer: 

941 The capture buffer for this thread's particular IO stream. 

942 

943 :returns: ``None``. 

944 

945 .. versionadded:: 1.0 

946 """ 

947 # Join buffer contents into a single string; without this, 

948 # StreamWatcher subclasses can't do things like iteratively scan for 

949 # pattern matches. 

950 # NOTE: using string.join should be "efficient enough" for now, re: 

951 # speed and memory use. Should that become false, consider using 

952 # StringIO or cStringIO (tho the latter doesn't do Unicode well?) which 

953 # is apparently even more efficient. 

954 stream = "".join(buffer_) 

955 for watcher in self.watchers: 

956 for response in watcher.submit(stream): 

957 self.write_proc_stdin(response) 

958 

959 def generate_env( 

960 self, env: Dict[str, Any], replace_env: bool 

961 ) -> Dict[str, Any]: 

962 """ 

963 Return a suitable environment dict based on user input & behavior. 

964 

965 :param dict env: Dict supplying overrides or full env, depending. 

966 :param bool replace_env: 

967 Whether ``env`` updates, or is used in place of, the value of 

968 `os.environ`. 

969 

970 :returns: A dictionary of shell environment vars. 

971 

972 .. versionadded:: 1.0 

973 """ 

974 return env if replace_env else dict(os.environ, **env) 

975 

976 def should_use_pty(self, pty: bool, fallback: bool) -> bool: 

977 """ 

978 Should execution attempt to use a pseudo-terminal? 

979 

980 :param bool pty: 

981 Whether the user explicitly asked for a pty. 

982 :param bool fallback: 

983 Whether falling back to non-pty execution should be allowed, in 

984 situations where ``pty=True`` but a pty could not be allocated. 

985 

986 .. versionadded:: 1.0 

987 """ 

988 # NOTE: fallback not used: no falling back implemented by default. 

989 return pty 

990 

991 @property 

992 def has_dead_threads(self) -> bool: 

993 """ 

994 Detect whether any IO threads appear to have terminated unexpectedly. 

995 

996 Used during process-completion waiting (in `wait`) to ensure we don't 

997 deadlock our child process if our IO processing threads have 

998 errored/died. 

999 

1000 :returns: 

1001 ``True`` if any threads appear to have terminated with an 

1002 exception, ``False`` otherwise. 

1003 

1004 .. versionadded:: 1.0 

1005 """ 

1006 return any(x.is_dead for x in self.threads.values()) 

1007 

1008 def wait(self) -> None: 

1009 """ 

1010 Block until the running command appears to have exited. 

1011 

1012 :returns: ``None``. 

1013 

1014 .. versionadded:: 1.0 

1015 """ 

1016 while True: 

1017 proc_finished = self.process_is_finished 

1018 dead_threads = self.has_dead_threads 

1019 if proc_finished or dead_threads: 

1020 break 

1021 time.sleep(self.input_sleep) 

1022 

1023 def write_proc_stdin(self, data: str) -> None: 

1024 """ 

1025 Write encoded ``data`` to the running process' stdin. 

1026 

1027 :param data: A Unicode string. 

1028 

1029 :returns: ``None``. 

1030 

1031 .. versionadded:: 1.0 

1032 """ 

1033 # Encode always, then request implementing subclass to perform the 

1034 # actual write to subprocess' stdin. 

1035 self._write_proc_stdin(data.encode(self.encoding)) 

1036 

1037 def decode(self, data: bytes) -> str: 

1038 """ 

1039 Decode some ``data`` bytes, returning Unicode. 

1040 

1041 .. versionadded:: 1.0 

1042 """ 

1043 # NOTE: yes, this is a 1-liner. The point is to make it much harder to 

1044 # forget to use 'replace' when decoding :) 

1045 return data.decode(self.encoding, "replace") 

1046 

1047 @property 

1048 def process_is_finished(self) -> bool: 

1049 """ 

1050 Determine whether our subprocess has terminated. 

1051 

1052 .. note:: 

1053 The implementation of this method should be nonblocking, as it is 

1054 used within a query/poll loop. 

1055 

1056 :returns: 

1057 ``True`` if the subprocess has finished running, ``False`` 

1058 otherwise. 

1059 

1060 .. versionadded:: 1.0 

1061 """ 

1062 raise NotImplementedError 

1063 

1064 def start(self, command: str, shell: str, env: Dict[str, Any]) -> None: 

1065 """ 

1066 Initiate execution of ``command`` (via ``shell``, with ``env``). 

1067 

1068 Typically this means use of a forked subprocess or requesting start of 

1069 execution on a remote system. 

1070 

1071 In most cases, this method will also set subclass-specific member 

1072 variables used in other methods such as `.wait` and/or `returncode`. 

1073 

1074 :param str command: 

1075 Command string to execute. 

1076 

1077 :param str shell: 

1078 Shell to use when executing ``command``. 

1079 

1080 :param dict env: 

1081 Environment dict used to prep shell environment. 

1082 

1083 .. versionadded:: 1.0 

1084 """ 

1085 raise NotImplementedError 

1086 

1087 def start_timer(self, timeout: int) -> None: 

1088 """ 

1089 Start a timer to `kill` our subprocess after ``timeout`` seconds. 

1090 """ 

1091 if timeout is not None: 

1092 self._timer = threading.Timer(timeout, self.kill) 

1093 self._timer.start() 

1094 

1095 def read_proc_stdout(self, num_bytes: int) -> Optional[bytes]: 

1096 """ 

1097 Read ``num_bytes`` from the running process' stdout stream. 

1098 

1099 :param int num_bytes: Number of bytes to read at maximum. 

1100 

1101 :returns: A string/bytes object. 

1102 

1103 .. versionadded:: 1.0 

1104 """ 

1105 raise NotImplementedError 

1106 

1107 def read_proc_stderr(self, num_bytes: int) -> Optional[bytes]: 

1108 """ 

1109 Read ``num_bytes`` from the running process' stderr stream. 

1110 

1111 :param int num_bytes: Number of bytes to read at maximum. 

1112 

1113 :returns: A string/bytes object. 

1114 

1115 .. versionadded:: 1.0 

1116 """ 

1117 raise NotImplementedError 

1118 

1119 def _write_proc_stdin(self, data: bytes) -> None: 

1120 """ 

1121 Write ``data`` to running process' stdin. 

1122 

1123 This should never be called directly; it's for subclasses to implement. 

1124 See `write_proc_stdin` for the public API call. 

1125 

1126 :param data: Already-encoded byte data suitable for writing. 

1127 

1128 :returns: ``None``. 

1129 

1130 .. versionadded:: 1.0 

1131 """ 

1132 raise NotImplementedError 

1133 

1134 def close_proc_stdin(self) -> None: 

1135 """ 

1136 Close running process' stdin. 

1137 

1138 :returns: ``None``. 

1139 

1140 .. versionadded:: 1.3 

1141 """ 

1142 raise NotImplementedError 

1143 

1144 def default_encoding(self) -> str: 

1145 """ 

1146 Return a string naming the expected encoding of subprocess streams. 

1147 

1148 This return value should be suitable for use by encode/decode methods. 

1149 

1150 .. versionadded:: 1.0 

1151 """ 

1152 # TODO: probably wants to be 2 methods, one for local and one for 

1153 # subprocess. For now, good enough to assume both are the same. 

1154 return default_encoding() 

1155 

1156 def send_interrupt(self, interrupt: "KeyboardInterrupt") -> None: 

1157 """ 

1158 Submit an interrupt signal to the running subprocess. 

1159 

1160 In almost all implementations, the default behavior is what will be 

1161 desired: submit ``\x03`` to the subprocess' stdin pipe. However, we 

1162 leave this as a public method in case this default needs to be 

1163 augmented or replaced. 

1164 

1165 :param interrupt: 

1166 The locally-sourced ``KeyboardInterrupt`` causing the method call. 

1167 

1168 :returns: ``None``. 

1169 

1170 .. versionadded:: 1.0 

1171 """ 

1172 self.write_proc_stdin("\x03") 

1173 

1174 def returncode(self) -> Optional[int]: 

1175 """ 

1176 Return the numeric return/exit code resulting from command execution. 

1177 

1178 :returns: 

1179 `int`, if any reasonable return code could be determined, or 

1180 ``None`` in corner cases where that was not possible. 

1181 

1182 .. versionadded:: 1.0 

1183 """ 

1184 raise NotImplementedError 

1185 

1186 def stop(self) -> None: 

1187 """ 

1188 Perform final cleanup, if necessary. 

1189 

1190 This method is called within a ``finally`` clause inside the main 

1191 `~.Runner.run` method. Depending on the subclass, it may be a no-op, or 

1192 it may do things such as close network connections or open files. 

1193 

1194 :returns: ``None`` 

1195 

1196 .. versionadded:: 1.0 

1197 """ 

1198 if self._timer: 

1199 self._timer.cancel() 

1200 

1201 def kill(self) -> None: 

1202 """ 

1203 Forcibly terminate the subprocess. 

1204 

1205 Typically only used by the timeout functionality. 

1206 

1207 This is often a "best-effort" attempt, e.g. remote subprocesses often 

1208 must settle for simply shutting down the local side of the network 

1209 connection and hoping the remote end eventually gets the message. 

1210 """ 

1211 raise NotImplementedError 

1212 

1213 @property 

1214 def timed_out(self) -> bool: 

1215 """ 

1216 Returns ``True`` if the subprocess stopped because it timed out. 

1217 

1218 .. versionadded:: 1.3 

1219 """ 

1220 # Timer expiry implies we did time out. (The timer itself will have 

1221 # killed the subprocess, allowing us to even get to this point.) 

1222 return bool(self._timer and not self._timer.is_alive()) 

1223 

1224 def get_pid(self) -> Optional[int]: 

1225 """ 

1226 When possible, obtain the subprocess PID. 

1227 

1228 The default implementation returns None; subclasses may return useful 

1229 integers. 

1230 """ 

1231 return None 

1232 

1233 

1234class Local(Runner): 

1235 """ 

1236 Execute a command on the local system in a subprocess. 

1237 

1238 .. note:: 

1239 When Invoke itself is executed without a controlling terminal (e.g. 

1240 when ``sys.stdin`` lacks a useful ``fileno``), it's not possible to 

1241 present a handle on our PTY to local subprocesses. In such situations, 

1242 `Local` will fallback to behaving as if ``pty=False`` (on the theory 

1243 that degraded execution is better than none at all) as well as printing 

1244 a warning to stderr. 

1245 

1246 To disable this behavior, say ``fallback=False``. 

1247 

1248 .. versionadded:: 1.0 

1249 """ 

1250 

1251 def __init__(self, context: "Context") -> None: 

1252 super().__init__(context) 

1253 # Bookkeeping var for pty use case 

1254 self.status = 0 

1255 

1256 def should_use_pty(self, pty: bool = False, fallback: bool = True) -> bool: 

1257 use_pty = False 

1258 if pty: 

1259 use_pty = True 

1260 # TODO: pass in & test in_stream, not sys.stdin 

1261 if not has_fileno(sys.stdin) and fallback: 

1262 if not self.warned_about_pty_fallback: 

1263 err = "WARNING: stdin has no fileno; falling back to non-pty execution!\n" # noqa 

1264 sys.stderr.write(err) 

1265 self.warned_about_pty_fallback = True 

1266 use_pty = False 

1267 return use_pty 

1268 

1269 def read_proc_stdout(self, num_bytes: int) -> Optional[bytes]: 

1270 # Obtain useful read-some-bytes function 

1271 if self.using_pty: 

1272 # Need to handle spurious OSErrors on some Linux platforms. 

1273 try: 

1274 data = os.read(self.parent_fd, num_bytes) 

1275 except OSError as e: 

1276 # Only eat I/O specific OSErrors so we don't hide others 

1277 stringified = str(e) 

1278 io_errors = ( 

1279 # The typical default 

1280 "Input/output error", 

1281 # Some less common platforms phrase it this way 

1282 "I/O error", 

1283 ) 

1284 if not any(error in stringified for error in io_errors): 

1285 raise 

1286 # The bad OSErrors happen after all expected output has 

1287 # appeared, so we return a falsey value, which triggers the 

1288 # "end of output" logic in code using reader functions. 

1289 data = None 

1290 elif self.process and self.process.stdout: 

1291 data = os.read(self.process.stdout.fileno(), num_bytes) 

1292 else: 

1293 data = None 

1294 return data 

1295 

1296 def read_proc_stderr(self, num_bytes: int) -> Optional[bytes]: 

1297 # NOTE: when using a pty, this will never be called. 

1298 # TODO: do we ever get those OSErrors on stderr? Feels like we could? 

1299 if self.process and self.process.stderr: 

1300 return os.read(self.process.stderr.fileno(), num_bytes) 

1301 return None 

1302 

1303 def _write_proc_stdin(self, data: bytes) -> None: 

1304 # NOTE: parent_fd from os.fork() is a read/write pipe attached to our 

1305 # forked process' stdout/stdin, respectively. 

1306 if self.using_pty: 

1307 fd = self.parent_fd 

1308 elif self.process and self.process.stdin: 

1309 fd = self.process.stdin.fileno() 

1310 else: 

1311 raise SubprocessPipeError( 

1312 "Unable to write to missing subprocess or stdin!" 

1313 ) 

1314 # Try to write, ignoring broken pipes if encountered (implies child 

1315 # process exited before the process piping stdin to us finished; 

1316 # there's nothing we can do about that!) 

1317 try: 

1318 os.write(fd, data) 

1319 except OSError as e: 

1320 if "Broken pipe" not in str(e): 

1321 raise 

1322 

1323 def close_proc_stdin(self) -> None: 

1324 if self.using_pty: 

1325 # there is no working scenario to tell the process that stdin 

1326 # closed when using pty 

1327 raise SubprocessPipeError("Cannot close stdin when pty=True") 

1328 elif self.process and self.process.stdin: 

1329 self.process.stdin.close() 

1330 else: 

1331 raise SubprocessPipeError( 

1332 "Unable to close missing subprocess or stdin!" 

1333 ) 

1334 

1335 def start(self, command: str, shell: str, env: Dict[str, Any]) -> None: 

1336 if self.using_pty: 

1337 if pty is None: # Encountered ImportError 

1338 err = "You indicated pty=True, but your platform doesn't support the 'pty' module!" # noqa 

1339 sys.exit(err) 

1340 cols, rows = pty_size() 

1341 self.pid, self.parent_fd = pty.fork() 

1342 # If we're the child process, load up the actual command in a 

1343 # shell, just as subprocess does; this replaces our process - whose 

1344 # pipes are all hooked up to the PTY - with the "real" one. 

1345 if self.pid == 0: 

1346 # TODO: both pty.spawn() and pexpect.spawn() do a lot of 

1347 # setup/teardown involving tty.setraw, getrlimit, signal. 

1348 # Ostensibly we'll want some of that eventually, but if 

1349 # possible write tests - integration-level if necessary - 

1350 # before adding it! 

1351 # 

1352 # Set pty window size based on what our own controlling 

1353 # terminal's window size appears to be. 

1354 # TODO: make subroutine? 

1355 winsize = struct.pack("HHHH", rows, cols, 0, 0) 

1356 fcntl.ioctl(sys.stdout.fileno(), termios.TIOCSWINSZ, winsize) 

1357 # Use execvpe for bare-minimum "exec w/ variable # args + env" 

1358 # behavior. 

1359 # NOTE: stdlib subprocess (actually its posix flavor, which is 

1360 # written in C) uses either execve or execv, depending. 

1361 os.execvpe(shell, [shell, "-c", command], env) 

1362 else: 

1363 self.process = Popen( 

1364 command, 

1365 shell=True, 

1366 executable=shell, 

1367 env=env, 

1368 stdout=PIPE, 

1369 stderr=PIPE, 

1370 stdin=PIPE, 

1371 ) 

1372 

1373 def get_pid(self) -> int: 

1374 return self.pid if self.using_pty else self.process.pid 

1375 

1376 def kill(self) -> None: 

1377 try: 

1378 os.kill(self.get_pid(), signal.SIGKILL) 

1379 except ProcessLookupError: 

1380 # In odd situations where our subprocess is already dead, don't 

1381 # throw this upwards. 

1382 pass 

1383 

1384 @property 

1385 def process_is_finished(self) -> bool: 

1386 if self.using_pty: 

1387 # NOTE: 

1388 # https://github.com/pexpect/ptyprocess/blob/4058faa05e2940662ab6da1330aa0586c6f9cd9c/ptyprocess/ptyprocess.py#L680-L687 

1389 # implies that Linux "requires" use of the blocking, non-WNOHANG 

1390 # version of this call. Our testing doesn't verify this, however, 

1391 # so... 

1392 # NOTE: It does appear to be totally blocking on Windows, so our 

1393 # issue #351 may be totally unsolvable there. Unclear. 

1394 pid_val, self.status = os.waitpid(self.pid, os.WNOHANG) 

1395 return pid_val != 0 

1396 else: 

1397 return self.process.poll() is not None 

1398 

1399 def returncode(self) -> Optional[int]: 

1400 if self.using_pty: 

1401 # No subprocess.returncode available; use WIFEXITED/WIFSIGNALED to 

1402 # determine whch of WEXITSTATUS / WTERMSIG to use. 

1403 # TODO: is it safe to just say "call all WEXITSTATUS/WTERMSIG and 

1404 # return whichever one of them is nondefault"? Probably not? 

1405 # NOTE: doing this in an arbitrary order should be safe since only 

1406 # one of the WIF* methods ought to ever return True. 

1407 code = None 

1408 if os.WIFEXITED(self.status): 

1409 code = os.WEXITSTATUS(self.status) 

1410 elif os.WIFSIGNALED(self.status): 

1411 code = os.WTERMSIG(self.status) 

1412 # Match subprocess.returncode by turning signals into negative 

1413 # 'exit code' integers. 

1414 code = -1 * code 

1415 return code 

1416 # TODO: do we care about WIFSTOPPED? Maybe someday? 

1417 else: 

1418 return self.process.returncode 

1419 

1420 def stop(self) -> None: 

1421 super().stop() 

1422 # If we opened a PTY for child communications, make sure to close() it, 

1423 # otherwise long-running Invoke-using processes exhaust their file 

1424 # descriptors eventually. 

1425 if self.using_pty: 

1426 try: 

1427 os.close(self.parent_fd) 

1428 except Exception: 

1429 # If something weird happened preventing the close, there's 

1430 # nothing to be done about it now... 

1431 pass 

1432 

1433 

1434class Result: 

1435 """ 

1436 A container for information about the result of a command execution. 

1437 

1438 All params are exposed as attributes of the same name and type. 

1439 

1440 :param str stdout: 

1441 The subprocess' standard output. 

1442 

1443 :param str stderr: 

1444 Same as ``stdout`` but containing standard error (unless the process 

1445 was invoked via a pty, in which case it will be empty; see 

1446 `.Runner.run`.) 

1447 

1448 :param str encoding: 

1449 The string encoding used by the local shell environment. 

1450 

1451 :param str command: 

1452 The command which was executed. 

1453 

1454 :param str shell: 

1455 The shell binary used for execution. 

1456 

1457 :param dict env: 

1458 The shell environment used for execution. (Default is the empty dict, 

1459 ``{}``, not ``None`` as displayed in the signature.) 

1460 

1461 :param int exited: 

1462 An integer representing the subprocess' exit/return code. 

1463 

1464 .. note:: 

1465 This may be ``None`` in situations where the subprocess did not run 

1466 to completion, such as when auto-responding failed or a timeout was 

1467 reached. 

1468 

1469 :param bool pty: 

1470 A boolean describing whether the subprocess was invoked with a pty or 

1471 not; see `.Runner.run`. 

1472 

1473 :param tuple hide: 

1474 A tuple of stream names (none, one or both of ``('stdout', 'stderr')``) 

1475 which were hidden from the user when the generating command executed; 

1476 this is a normalized value derived from the ``hide`` parameter of 

1477 `.Runner.run`. 

1478 

1479 For example, ``run('command', hide='stdout')`` will yield a `Result` 

1480 where ``result.hide == ('stdout',)``; ``hide=True`` or ``hide='both'`` 

1481 results in ``result.hide == ('stdout', 'stderr')``; and ``hide=False`` 

1482 (the default) generates ``result.hide == ()`` (the empty tuple.) 

1483 

1484 :param int pid: 

1485 The process ID of the subprocess that was executed (normally) or is 

1486 executing (when asynchronous or disowned). 

1487 

1488 .. versionadded:: 3.0 

1489 

1490 :param bool disowned: 

1491 Whether the subprocess was 'disowned' or detached from ourselves; in 

1492 this mode may other attributes or methods of this `Result` will be 

1493 unreliable, see `Runner.run` docs for details. You may want to use your 

1494 own methods to interrogate your operating system about this object's 

1495 ``pid``, however, which will usually be valid. 

1496 

1497 .. versionadded:: 3.0 

1498 

1499 .. note:: 

1500 `Result` objects' truth evaluation is equivalent to their `.ok` 

1501 attribute's value. Therefore, quick-and-dirty expressions like the 

1502 following are possible:: 

1503 

1504 if run("some shell command"): 

1505 do_something() 

1506 else: 

1507 handle_problem() 

1508 

1509 However, remember `Zen of Python #2 

1510 <http://zen-of-python.info/explicit-is-better-than-implicit.html#2>`_. 

1511 

1512 .. versionadded:: 1.0 

1513 """ 

1514 

1515 # TODO: inherit from namedtuple instead? heh (or: use attrs from pypi) 

1516 def __init__( 

1517 self, 

1518 stdout: str = "", 

1519 stderr: str = "", 

1520 encoding: Optional[str] = None, 

1521 command: str = "", 

1522 shell: str = "", 

1523 env: Optional[Dict[str, Any]] = None, 

1524 exited: int = 0, 

1525 pty: bool = False, 

1526 hide: Tuple[str, ...] = tuple(), 

1527 pid: Optional[int] = None, 

1528 disowned: bool = False, 

1529 ): 

1530 self.stdout = stdout 

1531 self.stderr = stderr 

1532 if encoding is None: 

1533 encoding = default_encoding() 

1534 self.encoding = encoding 

1535 self.command = command 

1536 self.shell = shell 

1537 self.env = {} if env is None else env 

1538 self.exited = exited 

1539 self.pty = pty 

1540 self.hide = hide 

1541 self.pid = pid 

1542 self.disowned = disowned 

1543 

1544 @property 

1545 def return_code(self) -> int: 

1546 """ 

1547 An alias for ``.exited``. 

1548 

1549 .. versionadded:: 1.0 

1550 """ 

1551 return self.exited 

1552 

1553 def __bool__(self) -> bool: 

1554 return self.ok 

1555 

1556 def __str__(self) -> str: 

1557 if self.exited is not None: 

1558 desc = "Command exited with status {}.".format(self.exited) 

1559 else: 

1560 desc = "Command was not fully executed due to watcher error." 

1561 ret = [desc] 

1562 for x in ("stdout", "stderr"): 

1563 val = getattr(self, x) 

1564 ret.append( 

1565 f"""=== {x} === 

1566{val.rstrip()} 

1567""" 

1568 if val 

1569 else "(no {})".format(x) 

1570 ) 

1571 return "\n".join(ret) 

1572 

1573 def __repr__(self) -> str: 

1574 # TODO: more? e.g. len of stdout/err? (how to represent cleanly in a 

1575 # 'x=y' format like this? e.g. '4b' is ambiguous as to what it 

1576 # represents 

1577 template = "<Result cmd={!r} exited={}>" 

1578 return template.format(self.command, self.exited) 

1579 

1580 @property 

1581 def ok(self) -> bool: 

1582 """ 

1583 A boolean equivalent to ``exited == 0``. 

1584 

1585 .. versionadded:: 1.0 

1586 """ 

1587 return bool(self.exited == 0) 

1588 

1589 @property 

1590 def failed(self) -> bool: 

1591 """ 

1592 The inverse of ``ok``. 

1593 

1594 I.e., ``True`` if the program exited with a nonzero return code, and 

1595 ``False`` otherwise. 

1596 

1597 .. versionadded:: 1.0 

1598 """ 

1599 return not self.ok 

1600 

1601 def tail(self, stream: str, count: int = 10) -> str: 

1602 """ 

1603 Return the last ``count`` lines of ``stream``, plus leading whitespace. 

1604 

1605 :param str stream: 

1606 Name of some captured stream attribute, eg ``"stdout"``. 

1607 :param int count: 

1608 Number of lines to preserve. 

1609 

1610 .. versionadded:: 1.3 

1611 """ 

1612 # TODO: preserve alternate line endings? Mehhhh 

1613 # NOTE: no trailing \n preservation; easier for below display if 

1614 # normalized 

1615 return "\n\n" + "\n".join(getattr(self, stream).splitlines()[-count:]) 

1616 

1617 

1618class Promise(Result, AbstractContextManager): 

1619 """ 

1620 A promise of some future `Result`, yielded from asynchronous execution. 

1621 

1622 This class' primary API member is `join`; instances may also be used as 

1623 context managers, which will automatically call `join` when the block 

1624 exits. In such cases, the context manager yields ``self``. 

1625 

1626 `Promise` also exposes copies of many `Result` attributes, specifically 

1627 those that derive from `~Runner.run` kwargs and not the result of command 

1628 execution. For example, ``command`` is replicated here, but ``stdout`` is 

1629 not. 

1630 

1631 .. versionadded:: 1.4 

1632 """ 

1633 

1634 #: The `.Runner` instance which made this `.Promise`. 

1635 runner: Runner 

1636 

1637 def __init__(self, runner: "Runner") -> None: 

1638 """ 

1639 Create a new promise. 

1640 

1641 :param runner: 

1642 An in-flight `Runner` instance making this promise. 

1643 

1644 Must already have started the subprocess and spun up IO threads. 

1645 """ 

1646 self.runner = runner 

1647 # Basically just want exactly this (recently refactored) kwargs dict. 

1648 # TODO: consider proxying vs copying, but prob wait for refactor 

1649 for key, value in self.runner.result_kwargs.items(): 

1650 setattr(self, key, value) 

1651 

1652 def join(self) -> Result: 

1653 """ 

1654 Block until associated subprocess exits, returning/raising the result. 

1655 

1656 This acts identically to the end of a synchronously executed ``run``, 

1657 namely that: 

1658 

1659 - various background threads (such as IO workers) are themselves 

1660 joined; 

1661 - if the subprocess exited normally, a `Result` is returned; 

1662 - in any other case (unforeseen exceptions, IO sub-thread 

1663 `.ThreadException`, `.Failure`, `.WatcherError`) the relevant 

1664 exception is raised here. 

1665 

1666 See `~Runner.run` docs, or those of the relevant classes, for further 

1667 details. 

1668 """ 

1669 try: 

1670 return self.runner._finish() 

1671 finally: 

1672 self.runner.stop() 

1673 

1674 def __enter__(self) -> "Promise": 

1675 return self 

1676 

1677 def __exit__( 

1678 self, 

1679 exc_type: Optional[Type[BaseException]], 

1680 exc_value: BaseException, 

1681 exc_tb: Optional[TracebackType], 

1682 ) -> None: 

1683 self.join() 

1684 

1685 def __repr__(self) -> str: 

1686 return f"<Promise cmd={self.command!r}>" 

1687 

1688 

1689def normalize_hide( 

1690 val: Any, 

1691 out_stream: Optional[str] = None, 

1692 err_stream: Optional[str] = None, 

1693) -> Tuple[str, ...]: 

1694 # Normalize to list-of-stream-names 

1695 hide_vals = (None, False, "out", "stdout", "err", "stderr", "both", True) 

1696 if val not in hide_vals: 

1697 err = "'hide' got {!r} which is not in {!r}" 

1698 raise ValueError(err.format(val, hide_vals)) 

1699 if val in (None, False): 

1700 hide = [] 

1701 elif val in ("both", True): 

1702 hide = ["stdout", "stderr"] 

1703 elif val == "out": 

1704 hide = ["stdout"] 

1705 elif val == "err": 

1706 hide = ["stderr"] 

1707 else: 

1708 hide = [val] 

1709 # Revert any streams that have been overridden from the default value 

1710 if out_stream is not None and "stdout" in hide: 

1711 hide.remove("stdout") 

1712 if err_stream is not None and "stderr" in hide: 

1713 hide.remove("stderr") 

1714 return tuple(hide) 

1715 

1716 

1717def default_encoding() -> str: 

1718 """ 

1719 Obtain apparent interpreter-local default text encoding. 

1720 

1721 Often used as a baseline in situations where we must use SOME encoding for 

1722 unknown-but-presumably-text bytes, and the user has not specified an 

1723 override. 

1724 """ 

1725 encoding = locale.getpreferredencoding(False) 

1726 return encoding