Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/git/cmd.py: 65%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

611 statements  

1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

2# 

3# This module is part of GitPython and is released under the 

4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ 

5 

6from __future__ import annotations 

7 

8__all__ = ["GitMeta", "Git"] 

9 

10import contextlib 

11import io 

12import itertools 

13import logging 

14import os 

15import re 

16import signal 

17import subprocess 

18from subprocess import DEVNULL, PIPE, Popen 

19import sys 

20from textwrap import dedent 

21import threading 

22import warnings 

23 

24from git.compat import defenc, force_bytes, safe_decode 

25from git.exc import ( 

26 CommandError, 

27 GitCommandError, 

28 GitCommandNotFound, 

29 UnsafeOptionError, 

30 UnsafeProtocolError, 

31) 

32from git.util import ( 

33 cygpath, 

34 expand_path, 

35 is_cygwin_git, 

36 patch_env, 

37 remove_password_if_present, 

38 stream_copy, 

39) 

40 

41# typing --------------------------------------------------------------------------- 

42 

43from typing import ( 

44 Any, 

45 AnyStr, 

46 BinaryIO, 

47 Callable, 

48 Dict, 

49 IO, 

50 Iterator, 

51 List, 

52 Mapping, 

53 Optional, 

54 Sequence, 

55 TYPE_CHECKING, 

56 TextIO, 

57 Tuple, 

58 Union, 

59 cast, 

60 overload, 

61) 

62 

63if sys.version_info >= (3, 10): 

64 from typing import TypeAlias 

65else: 

66 from typing_extensions import TypeAlias 

67 

68from git.types import Literal, PathLike, TBD 

69 

70if TYPE_CHECKING: 

71 from git.diff import DiffIndex 

72 from git.repo.base import Repo 

73 

74# --------------------------------------------------------------------------------- 

75 

76execute_kwargs = { 

77 "istream", 

78 "with_extended_output", 

79 "with_exceptions", 

80 "as_process", 

81 "output_stream", 

82 "stdout_as_string", 

83 "kill_after_timeout", 

84 "with_stdout", 

85 "universal_newlines", 

86 "shell", 

87 "env", 

88 "max_chunk_size", 

89 "strip_newline_in_stdout", 

90} 

91 

92_logger = logging.getLogger(__name__) 

93 

94 

95# ============================================================================== 

96## @name Utilities 

97# ------------------------------------------------------------------------------ 

98# Documentation 

99## @{ 

100 

101 

102def handle_process_output( 

103 process: "Git.AutoInterrupt" | Popen, 

104 stdout_handler: Union[ 

105 None, 

106 Callable[[AnyStr], None], 

107 Callable[[List[AnyStr]], None], 

108 Callable[[bytes, "Repo", "DiffIndex"], None], 

109 ], 

110 stderr_handler: Union[None, Callable[[AnyStr], None], Callable[[List[AnyStr]], None]], 

111 finalizer: Union[None, Callable[[Union[Popen, "Git.AutoInterrupt"]], None]] = None, 

112 decode_streams: bool = True, 

113 kill_after_timeout: Union[None, float] = None, 

114) -> None: 

115 R"""Register for notifications to learn that process output is ready to read, and 

116 dispatch lines to the respective line handlers. 

117 

118 This function returns once the finalizer returns. 

119 

120 :param process: 

121 :class:`subprocess.Popen` instance. 

122 

123 :param stdout_handler: 

124 f(stdout_line_string), or ``None``. 

125 

126 :param stderr_handler: 

127 f(stderr_line_string), or ``None``. 

128 

129 :param finalizer: 

130 f(proc) - wait for proc to finish. 

131 

132 :param decode_streams: 

133 Assume stdout/stderr streams are binary and decode them before pushing their 

134 contents to handlers. 

135 

136 This defaults to ``True``. Set it to ``False`` if: 

137 

138 - ``universal_newlines == True``, as then streams are in text mode, or 

139 - decoding must happen later, such as for :class:`~git.diff.Diff`\s. 

140 

141 :param kill_after_timeout: 

142 :class:`float` or ``None``, Default = ``None`` 

143 

144 To specify a timeout in seconds for the git command, after which the process 

145 should be killed. 

146 """ 

147 

148 # Use 2 "pump" threads and wait for both to finish. 

149 def pump_stream( 

150 cmdline: List[str], 

151 name: str, 

152 stream: Union[BinaryIO, TextIO], 

153 is_decode: bool, 

154 handler: Union[None, Callable[[Union[bytes, str]], None]], 

155 ) -> None: 

156 try: 

157 for line in stream: 

158 if handler: 

159 if is_decode: 

160 assert isinstance(line, bytes) 

161 line_str = line.decode(defenc) 

162 handler(line_str) 

163 else: 

164 handler(line) 

165 

166 except Exception as ex: 

167 _logger.error(f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}") 

168 if "I/O operation on closed file" not in str(ex): 

169 # Only reraise if the error was not due to the stream closing. 

170 raise CommandError([f"<{name}-pump>"] + remove_password_if_present(cmdline), ex) from ex 

171 finally: 

172 stream.close() 

173 

174 if hasattr(process, "proc"): 

175 process = cast("Git.AutoInterrupt", process) 

176 cmdline: str | Tuple[str, ...] | List[str] = getattr(process.proc, "args", "") 

177 p_stdout = process.proc.stdout if process.proc else None 

178 p_stderr = process.proc.stderr if process.proc else None 

179 else: 

180 process = cast(Popen, process) # type: ignore[redundant-cast] 

181 cmdline = getattr(process, "args", "") 

182 p_stdout = process.stdout 

183 p_stderr = process.stderr 

184 

185 if not isinstance(cmdline, (tuple, list)): 

186 cmdline = cmdline.split() 

187 

188 pumps: List[Tuple[str, IO, Callable[..., None] | None]] = [] 

189 if p_stdout: 

190 pumps.append(("stdout", p_stdout, stdout_handler)) 

191 if p_stderr: 

192 pumps.append(("stderr", p_stderr, stderr_handler)) 

193 

194 threads: List[threading.Thread] = [] 

195 

196 for name, stream, handler in pumps: 

197 t = threading.Thread(target=pump_stream, args=(cmdline, name, stream, decode_streams, handler)) 

198 t.daemon = True 

199 t.start() 

200 threads.append(t) 

201 

202 # FIXME: Why join? Will block if stdin needs feeding... 

203 for t in threads: 

204 t.join(timeout=kill_after_timeout) 

205 if t.is_alive(): 

206 if isinstance(process, Git.AutoInterrupt): 

207 process._terminate() 

208 else: # Don't want to deal with the other case. 

209 raise RuntimeError( 

210 "Thread join() timed out in cmd.handle_process_output()." 

211 f" kill_after_timeout={kill_after_timeout} seconds" 

212 ) 

213 if stderr_handler: 

214 error_str: Union[str, bytes] = ( 

215 f"error: process killed because it timed out. kill_after_timeout={kill_after_timeout} seconds" 

216 ) 

217 if not decode_streams and isinstance(p_stderr, BinaryIO): 

218 # Assume stderr_handler needs binary input. 

219 error_str = cast(str, error_str) 

220 error_str = error_str.encode() 

221 # We ignore typing on the next line because mypy does not like the way 

222 # we inferred that stderr takes str or bytes. 

223 stderr_handler(error_str) # type: ignore[arg-type] 

224 

225 if finalizer: 

226 finalizer(process) 

227 

228 

229safer_popen: Callable[..., Popen] 

230 

231if sys.platform == "win32": 

232 

233 def _safer_popen_windows( 

234 command: Union[str, Sequence[Any]], 

235 *, 

236 shell: bool = False, 

237 env: Optional[Mapping[str, str]] = None, 

238 **kwargs: Any, 

239 ) -> Popen: 

240 """Call :class:`subprocess.Popen` on Windows but don't include a CWD in the 

241 search. 

242 

243 This avoids an untrusted search path condition where a file like ``git.exe`` in 

244 a malicious repository would be run when GitPython operates on the repository. 

245 The process using GitPython may have an untrusted repository's working tree as 

246 its current working directory. Some operations may temporarily change to that 

247 directory before running a subprocess. In addition, while by default GitPython 

248 does not run external commands with a shell, it can be made to do so, in which 

249 case the CWD of the subprocess, which GitPython usually sets to a repository 

250 working tree, can itself be searched automatically by the shell. This wrapper 

251 covers all those cases. 

252 

253 :note: 

254 This currently works by setting the 

255 :envvar:`NoDefaultCurrentDirectoryInExePath` environment variable during 

256 subprocess creation. It also takes care of passing Windows-specific process 

257 creation flags, but that is unrelated to path search. 

258 

259 :note: 

260 The current implementation contains a race condition on :attr:`os.environ`. 

261 GitPython isn't thread-safe, but a program using it on one thread should 

262 ideally be able to mutate :attr:`os.environ` on another, without 

263 unpredictable results. See comments in: 

264 https://github.com/gitpython-developers/GitPython/pull/1650 

265 """ 

266 # CREATE_NEW_PROCESS_GROUP is needed for some ways of killing it afterwards. 

267 # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal 

268 # https://docs.python.org/3/library/subprocess.html#subprocess.CREATE_NEW_PROCESS_GROUP 

269 creationflags = subprocess.CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP 

270 

271 # When using a shell, the shell is the direct subprocess, so the variable must 

272 # be set in its environment, to affect its search behavior. 

273 if shell: 

274 # The original may be immutable, or the caller may reuse it. Mutate a copy. 

275 env = {} if env is None else dict(env) 

276 env["NoDefaultCurrentDirectoryInExePath"] = "1" # The "1" can be any value. 

277 

278 # When not using a shell, the current process does the search in a 

279 # CreateProcessW API call, so the variable must be set in our environment. With 

280 # a shell, that's unnecessary if https://github.com/python/cpython/issues/101283 

281 # is patched. In Python versions where it is unpatched, in the rare case the 

282 # ComSpec environment variable is unset, the search for the shell itself is 

283 # unsafe. Setting NoDefaultCurrentDirectoryInExePath in all cases, as done here, 

284 # is simpler and protects against that. (As above, the "1" can be any value.) 

285 with patch_env("NoDefaultCurrentDirectoryInExePath", "1"): 

286 return Popen( 

287 command, 

288 shell=shell, 

289 env=env, 

290 creationflags=creationflags, 

291 **kwargs, 

292 ) 

293 

294 safer_popen = _safer_popen_windows 

295else: 

296 safer_popen = Popen 

297 

298 

299def dashify(string: str) -> str: 

300 return string.replace("_", "-") 

301 

302 

303def slots_to_dict(self: "Git", exclude: Sequence[str] = ()) -> Dict[str, Any]: 

304 return {s: getattr(self, s) for s in self.__slots__ if s not in exclude} 

305 

306 

307def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], excluded: Sequence[str] = ()) -> None: 

308 for k, v in d.items(): 

309 setattr(self, k, v) 

310 for k in excluded: 

311 setattr(self, k, None) 

312 

313 

314## -- End Utilities -- @} 

315 

316 

317class _AutoInterrupt: 

318 """Process wrapper that terminates the wrapped process on finalization. 

319 

320 This kills/interrupts the stored process instance once this instance goes out of 

321 scope. It is used to prevent processes piling up in case iterators stop reading. 

322 

323 All attributes are wired through to the contained process object. 

324 

325 The wait method is overridden to perform automatic status code checking and possibly 

326 raise. 

327 """ 

328 

329 __slots__ = ("proc", "args", "status") 

330 

331 # If this is non-zero it will override any status code during _terminate, used 

332 # to prevent race conditions in testing. 

333 _status_code_if_terminate: int = 0 

334 

335 def __init__(self, proc: Union[None, subprocess.Popen], args: Any) -> None: 

336 self.proc = proc 

337 self.args = args 

338 self.status: Union[int, None] = None 

339 

340 def _terminate(self) -> None: 

341 """Terminate the underlying process.""" 

342 if self.proc is None: 

343 return 

344 

345 proc = self.proc 

346 self.proc = None 

347 if proc.stdin: 

348 proc.stdin.close() 

349 if proc.stdout: 

350 proc.stdout.close() 

351 if proc.stderr: 

352 proc.stderr.close() 

353 # Did the process finish already so we have a return code? 

354 try: 

355 if proc.poll() is not None: 

356 self.status = self._status_code_if_terminate or proc.poll() 

357 return 

358 except OSError as ex: 

359 _logger.info("Ignored error after process had died: %r", ex) 

360 

361 # It can be that nothing really exists anymore... 

362 if os is None or getattr(os, "kill", None) is None: 

363 return 

364 

365 # Try to kill it. 

366 try: 

367 proc.terminate() 

368 status = proc.wait() # Ensure the process goes away. 

369 

370 self.status = self._status_code_if_terminate or status 

371 except OSError as ex: 

372 _logger.info("Ignored error after process had died: %r", ex) 

373 # END exception handling 

374 

375 def __del__(self) -> None: 

376 self._terminate() 

377 

378 def __getattr__(self, attr: str) -> Any: 

379 return getattr(self.proc, attr) 

380 

381 # TODO: Bad choice to mimic `proc.wait()` but with different args. 

382 def wait(self, stderr: Union[None, str, bytes] = b"") -> int: 

383 """Wait for the process and return its status code. 

384 

385 :param stderr: 

386 Previously read value of stderr, in case stderr is already closed. 

387 

388 :warn: 

389 May deadlock if output or error pipes are used and not handled separately. 

390 

391 :raise git.exc.GitCommandError: 

392 If the return status is not 0. 

393 """ 

394 if stderr is None: 

395 stderr_b = b"" 

396 stderr_b = force_bytes(data=stderr, encoding="utf-8") 

397 status: Union[int, None] 

398 if self.proc is not None: 

399 status = self.proc.wait() 

400 p_stderr = self.proc.stderr 

401 else: # Assume the underlying proc was killed earlier or never existed. 

402 status = self.status 

403 p_stderr = None 

404 

405 def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes: 

406 if stream: 

407 try: 

408 return stderr_b + force_bytes(stream.read()) 

409 except (OSError, ValueError): 

410 return stderr_b or b"" 

411 else: 

412 return stderr_b or b"" 

413 

414 # END status handling 

415 

416 if status != 0: 

417 errstr = read_all_from_possibly_closed_stream(p_stderr) 

418 _logger.debug("AutoInterrupt wait stderr: %r" % (errstr,)) 

419 raise GitCommandError(remove_password_if_present(self.args), status, errstr) 

420 return status 

421 

422 

423_AutoInterrupt.__name__ = "AutoInterrupt" 

424_AutoInterrupt.__qualname__ = "Git.AutoInterrupt" 

425 

426 

427class _CatFileContentStream: 

428 """Object representing a sized read-only stream returning the contents of 

429 an object. 

430 

431 This behaves like a stream, but counts the data read and simulates an empty stream 

432 once our sized content region is empty. 

433 

434 If not all data are read to the end of the object's lifetime, we read the rest to 

435 ensure the underlying stream continues to work. 

436 """ 

437 

438 __slots__ = ("_stream", "_nbr", "_size") 

439 

440 def __init__(self, size: int, stream: IO[bytes]) -> None: 

441 self._stream = stream 

442 self._size = size 

443 self._nbr = 0 # Number of bytes read. 

444 

445 # Special case: If the object is empty, has null bytes, get the final 

446 # newline right away. 

447 if size == 0: 

448 stream.read(1) 

449 # END handle empty streams 

450 

451 def read(self, size: int = -1) -> bytes: 

452 bytes_left = self._size - self._nbr 

453 if bytes_left == 0: 

454 return b"" 

455 if size > -1: 

456 # Ensure we don't try to read past our limit. 

457 size = min(bytes_left, size) 

458 else: 

459 # They try to read all, make sure it's not more than what remains. 

460 size = bytes_left 

461 # END check early depletion 

462 data = self._stream.read(size) 

463 self._nbr += len(data) 

464 

465 # Check for depletion, read our final byte to make the stream usable by 

466 # others. 

467 if self._size - self._nbr == 0: 

468 self._stream.read(1) # final newline 

469 # END finish reading 

470 return data 

471 

472 def readline(self, size: int = -1) -> bytes: 

473 if self._nbr == self._size: 

474 return b"" 

475 

476 # Clamp size to lowest allowed value. 

477 bytes_left = self._size - self._nbr 

478 if size > -1: 

479 size = min(bytes_left, size) 

480 else: 

481 size = bytes_left 

482 # END handle size 

483 

484 data = self._stream.readline(size) 

485 self._nbr += len(data) 

486 

487 # Handle final byte. 

488 if self._size - self._nbr == 0: 

489 self._stream.read(1) 

490 # END finish reading 

491 

492 return data 

493 

494 def readlines(self, size: int = -1) -> List[bytes]: 

495 if self._nbr == self._size: 

496 return [] 

497 

498 # Leave all additional logic to our readline method, we just check the size. 

499 out = [] 

500 nbr = 0 

501 while True: 

502 line = self.readline() 

503 if not line: 

504 break 

505 out.append(line) 

506 if size > -1: 

507 nbr += len(line) 

508 if nbr > size: 

509 break 

510 # END handle size constraint 

511 # END readline loop 

512 return out 

513 

514 # skipcq: PYL-E0301 

515 def __iter__(self) -> "Git.CatFileContentStream": 

516 return self 

517 

518 def __next__(self) -> bytes: 

519 line = self.readline() 

520 if not line: 

521 raise StopIteration 

522 

523 return line 

524 

525 next = __next__ 

526 

527 def __del__(self) -> None: 

528 bytes_left = self._size - self._nbr 

529 if bytes_left: 

530 # Read and discard - seeking is impossible within a stream. 

531 # This includes any terminating newline. 

532 self._stream.read(bytes_left + 1) 

533 # END handle incomplete read 

534 

535 

536_CatFileContentStream.__name__ = "CatFileContentStream" 

537_CatFileContentStream.__qualname__ = "Git.CatFileContentStream" 

538 

539 

540_USE_SHELL_DEFAULT_MESSAGE = ( 

541 "Git.USE_SHELL is deprecated, because only its default value of False is safe. " 

542 "It will be removed in a future release." 

543) 

544 

545_USE_SHELL_DANGER_MESSAGE = ( 

546 "Setting Git.USE_SHELL to True is unsafe and insecure, as the effect of special " 

547 "shell syntax cannot usually be accounted for. This can result in a command " 

548 "injection vulnerability and arbitrary code execution. Git.USE_SHELL is deprecated " 

549 "and will be removed in a future release." 

550) 

551 

552 

553def _warn_use_shell(*, extra_danger: bool) -> None: 

554 warnings.warn( 

555 _USE_SHELL_DANGER_MESSAGE if extra_danger else _USE_SHELL_DEFAULT_MESSAGE, 

556 DeprecationWarning, 

557 stacklevel=3, 

558 ) 

559 

560 

561class _GitMeta(type): 

562 """Metaclass for :class:`Git`. 

563 

564 This helps issue :class:`DeprecationWarning` if :attr:`Git.USE_SHELL` is used. 

565 """ 

566 

567 def __getattribute(cls, name: str) -> Any: 

568 if name == "USE_SHELL": 

569 _warn_use_shell(extra_danger=False) 

570 return super().__getattribute__(name) 

571 

572 def __setattr(cls, name: str, value: Any) -> Any: 

573 if name == "USE_SHELL": 

574 _warn_use_shell(extra_danger=value) 

575 super().__setattr__(name, value) 

576 

577 if not TYPE_CHECKING: 

578 # To preserve static checking for undefined/misspelled attributes while letting 

579 # the methods' bodies be type-checked, these are defined as non-special methods, 

580 # then bound to special names out of view of static type checkers. (The original 

581 # names invoke name mangling (leading "__") to avoid confusion in other scopes.) 

582 __getattribute__ = __getattribute 

583 __setattr__ = __setattr 

584 

585 

586GitMeta = _GitMeta 

587"""Alias of :class:`Git`'s metaclass, whether it is :class:`type` or a custom metaclass. 

588 

589Whether the :class:`Git` class has the default :class:`type` as its metaclass or uses a 

590custom metaclass is not documented and may change at any time. This statically checkable 

591metaclass alias is equivalent at runtime to ``type(Git)``. This should almost never be 

592used. Code that benefits from it is likely to be remain brittle even if it is used. 

593 

594In view of the :class:`Git` class's intended use and :class:`Git` objects' dynamic 

595callable attributes representing git subcommands, it rarely makes sense to inherit from 

596:class:`Git` at all. Using :class:`Git` in multiple inheritance can be especially tricky 

597to do correctly. Attempting uses of :class:`Git` where its metaclass is relevant, such 

598as when a sibling class has an unrelated metaclass and a shared lower bound metaclass 

599might have to be introduced to solve a metaclass conflict, is not recommended. 

600 

601:note: 

602 The correct static type of the :class:`Git` class itself, and any subclasses, is 

603 ``Type[Git]``. (This can be written as ``type[Git]`` in Python 3.9 later.) 

604 

605 :class:`GitMeta` should never be used in any annotation where ``Type[Git]`` is 

606 intended or otherwise possible to use. This alias is truly only for very rare and 

607 inherently precarious situations where it is necessary to deal with the metaclass 

608 explicitly. 

609""" 

610 

611 

612class Git(metaclass=_GitMeta): 

613 """The Git class manages communication with the Git binary. 

614 

615 It provides a convenient interface to calling the Git binary, such as in:: 

616 

617 g = Git( git_dir ) 

618 g.init() # calls 'git init' program 

619 rval = g.ls_files() # calls 'git ls-files' program 

620 

621 Debugging: 

622 

623 * Set the :envvar:`GIT_PYTHON_TRACE` environment variable to print each invocation 

624 of the command to stdout. 

625 * Set its value to ``full`` to see details about the returned values. 

626 """ 

627 

628 __slots__ = ( 

629 "_working_dir", 

630 "cat_file_all", 

631 "cat_file_header", 

632 "_version_info", 

633 "_version_info_token", 

634 "_git_options", 

635 "_persistent_git_options", 

636 "_environment", 

637 ) 

638 

639 _excluded_ = ( 

640 "cat_file_all", 

641 "cat_file_header", 

642 "_version_info", 

643 "_version_info_token", 

644 ) 

645 

646 re_unsafe_protocol = re.compile(r"(.+)::.+") 

647 

648 def __getstate__(self) -> Dict[str, Any]: 

649 return slots_to_dict(self, exclude=self._excluded_) 

650 

651 def __setstate__(self, d: Dict[str, Any]) -> None: 

652 dict_to_slots_and__excluded_are_none(self, d, excluded=self._excluded_) 

653 

654 # CONFIGURATION 

655 

656 git_exec_name = "git" 

657 """Default git command that should work on Linux, Windows, and other systems.""" 

658 

659 GIT_PYTHON_TRACE = os.environ.get("GIT_PYTHON_TRACE", False) 

660 """Enables debugging of GitPython's git commands.""" 

661 

662 USE_SHELL: bool = False 

663 """Deprecated. If set to ``True``, a shell will be used when executing git commands. 

664 

665 Code that uses ``USE_SHELL = True`` or that passes ``shell=True`` to any GitPython 

666 functions should be updated to use the default value of ``False`` instead. ``True`` 

667 is unsafe unless the effect of syntax treated specially by the shell is fully 

668 considered and accounted for, which is not possible under most circumstances. As 

669 detailed below, it is also no longer needed, even where it had been in the past. 

670 

671 It is in many if not most cases a command injection vulnerability for an application 

672 to set :attr:`USE_SHELL` to ``True``. Any attacker who can cause a specially crafted 

673 fragment of text to make its way into any part of any argument to any git command 

674 (including paths, branch names, etc.) can cause the shell to read and write 

675 arbitrary files and execute arbitrary commands. Innocent input may also accidentally 

676 contain special shell syntax, leading to inadvertent malfunctions. 

677 

678 In addition, how a value of ``True`` interacts with some aspects of GitPython's 

679 operation is not precisely specified and may change without warning, even before 

680 GitPython 4.0.0 when :attr:`USE_SHELL` may be removed. This includes: 

681 

682 * Whether or how GitPython automatically customizes the shell environment. 

683 

684 * Whether, outside of Windows (where :class:`subprocess.Popen` supports lists of 

685 separate arguments even when ``shell=True``), this can be used with any GitPython 

686 functionality other than direct calls to the :meth:`execute` method. 

687 

688 * Whether any GitPython feature that runs git commands ever attempts to partially 

689 sanitize data a shell may treat specially. Currently this is not done. 

690 

691 Prior to GitPython 2.0.8, this had a narrow purpose in suppressing console windows 

692 in graphical Windows applications. In 2.0.8 and higher, it provides no benefit, as 

693 GitPython solves that problem more robustly and safely by using the 

694 ``CREATE_NO_WINDOW`` process creation flag on Windows. 

695 

696 Because Windows path search differs subtly based on whether a shell is used, in rare 

697 cases changing this from ``True`` to ``False`` may keep an unusual git "executable", 

698 such as a batch file, from being found. To fix this, set the command name or full 

699 path in the :envvar:`GIT_PYTHON_GIT_EXECUTABLE` environment variable or pass the 

700 full path to :func:`git.refresh` (or invoke the script using a ``.exe`` shim). 

701 

702 Further reading: 

703 

704 * :meth:`Git.execute` (on the ``shell`` parameter). 

705 * https://github.com/gitpython-developers/GitPython/commit/0d9390866f9ce42870d3116094cd49e0019a970a 

706 * https://learn.microsoft.com/en-us/windows/win32/procthread/process-creation-flags 

707 * https://github.com/python/cpython/issues/91558#issuecomment-1100942950 

708 * https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-createprocessw 

709 """ 

710 

711 _git_exec_env_var = "GIT_PYTHON_GIT_EXECUTABLE" 

712 _refresh_env_var = "GIT_PYTHON_REFRESH" 

713 

714 GIT_PYTHON_GIT_EXECUTABLE = None 

715 """Provide the full path to the git executable. Otherwise it assumes git is in the 

716 executable search path. 

717 

718 :note: 

719 The git executable is actually found during the refresh step in the top level 

720 ``__init__``. It can also be changed by explicitly calling :func:`git.refresh`. 

721 """ 

722 

723 _refresh_token = object() # Since None would match an initial _version_info_token. 

724 

725 @classmethod 

726 def refresh(cls, path: Union[None, PathLike] = None) -> bool: 

727 """Update information about the git executable :class:`Git` objects will use. 

728 

729 Called by the :func:`git.refresh` function in the top level ``__init__``. 

730 

731 :param path: 

732 Optional path to the git executable. If not absolute, it is resolved 

733 immediately, relative to the current directory. (See note below.) 

734 

735 :note: 

736 The top-level :func:`git.refresh` should be preferred because it calls this 

737 method and may also update other state accordingly. 

738 

739 :note: 

740 There are three different ways to specify the command that refreshing causes 

741 to be used for git: 

742 

743 1. Pass no `path` argument and do not set the 

744 :envvar:`GIT_PYTHON_GIT_EXECUTABLE` environment variable. The command 

745 name ``git`` is used. It is looked up in a path search by the system, in 

746 each command run (roughly similar to how git is found when running 

747 ``git`` commands manually). This is usually the desired behavior. 

748 

749 2. Pass no `path` argument but set the :envvar:`GIT_PYTHON_GIT_EXECUTABLE` 

750 environment variable. The command given as the value of that variable is 

751 used. This may be a simple command or an arbitrary path. It is looked up 

752 in each command run. Setting :envvar:`GIT_PYTHON_GIT_EXECUTABLE` to 

753 ``git`` has the same effect as not setting it. 

754 

755 3. Pass a `path` argument. This path, if not absolute, is immediately 

756 resolved, relative to the current directory. This resolution occurs at 

757 the time of the refresh. When git commands are run, they are run using 

758 that previously resolved path. If a `path` argument is passed, the 

759 :envvar:`GIT_PYTHON_GIT_EXECUTABLE` environment variable is not 

760 consulted. 

761 

762 :note: 

763 Refreshing always sets the :attr:`Git.GIT_PYTHON_GIT_EXECUTABLE` class 

764 attribute, which can be read on the :class:`Git` class or any of its 

765 instances to check what command is used to run git. This attribute should 

766 not be confused with the related :envvar:`GIT_PYTHON_GIT_EXECUTABLE` 

767 environment variable. The class attribute is set no matter how refreshing is 

768 performed. 

769 """ 

770 # Discern which path to refresh with. 

771 if path is not None: 

772 new_git = os.path.expanduser(path) 

773 new_git = os.path.abspath(new_git) 

774 else: 

775 new_git = os.environ.get(cls._git_exec_env_var, cls.git_exec_name) 

776 

777 # Keep track of the old and new git executable path. 

778 old_git = cls.GIT_PYTHON_GIT_EXECUTABLE 

779 old_refresh_token = cls._refresh_token 

780 cls.GIT_PYTHON_GIT_EXECUTABLE = new_git 

781 cls._refresh_token = object() 

782 

783 # Test if the new git executable path is valid. A GitCommandNotFound error is 

784 # raised by us. A PermissionError is raised if the git executable cannot be 

785 # executed for whatever reason. 

786 has_git = False 

787 try: 

788 cls().version() 

789 has_git = True 

790 except (GitCommandNotFound, PermissionError): 

791 pass 

792 

793 # Warn or raise exception if test failed. 

794 if not has_git: 

795 err = ( 

796 dedent( 

797 """\ 

798 Bad git executable. 

799 The git executable must be specified in one of the following ways: 

800 - be included in your $PATH 

801 - be set via $%s 

802 - explicitly set via git.refresh(<full-path-to-git-executable>) 

803 """ 

804 ) 

805 % cls._git_exec_env_var 

806 ) 

807 

808 # Revert to whatever the old_git was. 

809 cls.GIT_PYTHON_GIT_EXECUTABLE = old_git 

810 cls._refresh_token = old_refresh_token 

811 

812 if old_git is None: 

813 # On the first refresh (when GIT_PYTHON_GIT_EXECUTABLE is None) we only 

814 # are quiet, warn, or error depending on the GIT_PYTHON_REFRESH value. 

815 

816 # Determine what the user wants to happen during the initial refresh. We 

817 # expect GIT_PYTHON_REFRESH to either be unset or be one of the 

818 # following values: 

819 # 

820 # 0|q|quiet|s|silence|silent|n|none 

821 # 1|w|warn|warning|l|log 

822 # 2|r|raise|e|error|exception 

823 

824 mode = os.environ.get(cls._refresh_env_var, "raise").lower() 

825 

826 quiet = ["quiet", "q", "silence", "s", "silent", "none", "n", "0"] 

827 warn = ["warn", "w", "warning", "log", "l", "1"] 

828 error = ["error", "e", "exception", "raise", "r", "2"] 

829 

830 if mode in quiet: 

831 pass 

832 elif mode in warn or mode in error: 

833 err = dedent( 

834 """\ 

835 %s 

836 All git commands will error until this is rectified. 

837 

838 This initial message can be silenced or aggravated in the future by setting the 

839 $%s environment variable. Use one of the following values: 

840 - %s: for no message or exception 

841 - %s: for a warning message (logging level CRITICAL, displayed by default) 

842 - %s: for a raised exception 

843 

844 Example: 

845 export %s=%s 

846 """ 

847 ) % ( 

848 err, 

849 cls._refresh_env_var, 

850 "|".join(quiet), 

851 "|".join(warn), 

852 "|".join(error), 

853 cls._refresh_env_var, 

854 quiet[0], 

855 ) 

856 

857 if mode in warn: 

858 _logger.critical(err) 

859 else: 

860 raise ImportError(err) 

861 else: 

862 err = dedent( 

863 """\ 

864 %s environment variable has been set but it has been set with an invalid value. 

865 

866 Use only the following values: 

867 - %s: for no message or exception 

868 - %s: for a warning message (logging level CRITICAL, displayed by default) 

869 - %s: for a raised exception 

870 """ 

871 ) % ( 

872 cls._refresh_env_var, 

873 "|".join(quiet), 

874 "|".join(warn), 

875 "|".join(error), 

876 ) 

877 raise ImportError(err) 

878 

879 # We get here if this was the initial refresh and the refresh mode was 

880 # not error. Go ahead and set the GIT_PYTHON_GIT_EXECUTABLE such that we 

881 # discern the difference between the first refresh at import time 

882 # and subsequent calls to git.refresh or this refresh method. 

883 cls.GIT_PYTHON_GIT_EXECUTABLE = cls.git_exec_name 

884 else: 

885 # After the first refresh (when GIT_PYTHON_GIT_EXECUTABLE is no longer 

886 # None) we raise an exception. 

887 raise GitCommandNotFound(new_git, err) 

888 

889 return has_git 

890 

891 @classmethod 

892 def is_cygwin(cls) -> bool: 

893 return is_cygwin_git(cls.GIT_PYTHON_GIT_EXECUTABLE) 

894 

895 @overload 

896 @classmethod 

897 def polish_url(cls, url: str, is_cygwin: Literal[False] = ...) -> str: ... 

898 

899 @overload 

900 @classmethod 

901 def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> str: ... 

902 

903 @classmethod 

904 def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> PathLike: 

905 """Remove any backslashes from URLs to be written in config files. 

906 

907 Windows might create config files containing paths with backslashes, but git 

908 stops liking them as it will escape the backslashes. Hence we undo the escaping 

909 just to be sure. 

910 """ 

911 if is_cygwin is None: 

912 is_cygwin = cls.is_cygwin() 

913 

914 if is_cygwin: 

915 url = cygpath(url) 

916 else: 

917 url = os.path.expandvars(url) 

918 if url.startswith("~"): 

919 url = os.path.expanduser(url) 

920 url = url.replace("\\\\", "\\").replace("\\", "/") 

921 return url 

922 

923 @classmethod 

924 def check_unsafe_protocols(cls, url: str) -> None: 

925 """Check for unsafe protocols. 

926 

927 Apart from the usual protocols (http, git, ssh), Git allows "remote helpers" 

928 that have the form ``<transport>::<address>``. One of these helpers (``ext::``) 

929 can be used to invoke any arbitrary command. 

930 

931 See: 

932 

933 - https://git-scm.com/docs/gitremote-helpers 

934 - https://git-scm.com/docs/git-remote-ext 

935 """ 

936 match = cls.re_unsafe_protocol.match(url) 

937 if match: 

938 protocol = match.group(1) 

939 raise UnsafeProtocolError( 

940 f"The `{protocol}::` protocol looks suspicious, use `allow_unsafe_protocols=True` to allow it." 

941 ) 

942 

943 @classmethod 

944 def check_unsafe_options(cls, options: List[str], unsafe_options: List[str]) -> None: 

945 """Check for unsafe options. 

946 

947 Some options that are passed to ``git <command>`` can be used to execute 

948 arbitrary commands. These are blocked by default. 

949 """ 

950 # Options can be of the form `foo`, `--foo bar`, or `--foo=bar`, so we need to 

951 # check if they start with "--foo" or if they are equal to "foo". 

952 bare_unsafe_options = [option.lstrip("-") for option in unsafe_options] 

953 for option in options: 

954 for unsafe_option, bare_option in zip(unsafe_options, bare_unsafe_options): 

955 if option.startswith(unsafe_option) or option == bare_option: 

956 raise UnsafeOptionError( 

957 f"{unsafe_option} is not allowed, use `allow_unsafe_options=True` to allow it." 

958 ) 

959 

960 AutoInterrupt: TypeAlias = _AutoInterrupt 

961 

962 CatFileContentStream: TypeAlias = _CatFileContentStream 

963 

964 def __init__(self, working_dir: Union[None, PathLike] = None) -> None: 

965 """Initialize this instance with: 

966 

967 :param working_dir: 

968 Git directory we should work in. If ``None``, we always work in the current 

969 directory as returned by :func:`os.getcwd`. 

970 This is meant to be the working tree directory if available, or the 

971 ``.git`` directory in case of bare repositories. 

972 """ 

973 super().__init__() 

974 self._working_dir = expand_path(working_dir) 

975 self._git_options: Union[List[str], Tuple[str, ...]] = () 

976 self._persistent_git_options: List[str] = [] 

977 

978 # Extra environment variables to pass to git commands 

979 self._environment: Dict[str, str] = {} 

980 

981 # Cached version slots 

982 self._version_info: Union[Tuple[int, ...], None] = None 

983 self._version_info_token: object = None 

984 

985 # Cached command slots 

986 self.cat_file_header: Union[None, TBD] = None 

987 self.cat_file_all: Union[None, TBD] = None 

988 

989 def __getattribute__(self, name: str) -> Any: 

990 if name == "USE_SHELL": 

991 _warn_use_shell(extra_danger=False) 

992 return super().__getattribute__(name) 

993 

994 def __getattr__(self, name: str) -> Any: 

995 """A convenience method as it allows to call the command as if it was an object. 

996 

997 :return: 

998 Callable object that will execute call :meth:`_call_process` with your 

999 arguments. 

1000 """ 

1001 if name.startswith("_"): 

1002 return super().__getattribute__(name) 

1003 return lambda *args, **kwargs: self._call_process(name, *args, **kwargs) 

1004 

1005 def set_persistent_git_options(self, **kwargs: Any) -> None: 

1006 """Specify command line options to the git executable for subsequent 

1007 subcommand calls. 

1008 

1009 :param kwargs: 

1010 A dict of keyword arguments. 

1011 These arguments are passed as in :meth:`_call_process`, but will be passed 

1012 to the git command rather than the subcommand. 

1013 """ 

1014 

1015 self._persistent_git_options = self.transform_kwargs(split_single_char_options=True, **kwargs) 

1016 

1017 @property 

1018 def working_dir(self) -> Union[None, PathLike]: 

1019 """:return: Git directory we are working on""" 

1020 return self._working_dir 

1021 

1022 @property 

1023 def version_info(self) -> Tuple[int, ...]: 

1024 """ 

1025 :return: Tuple with integers representing the major, minor and additional 

1026 version numbers as parsed from :manpage:`git-version(1)`. Up to four fields 

1027 are used. 

1028 

1029 This value is generated on demand and is cached. 

1030 """ 

1031 # Refreshing is global, but version_info caching is per-instance. 

1032 refresh_token = self._refresh_token # Copy token in case of concurrent refresh. 

1033 

1034 # Use the cached version if obtained after the most recent refresh. 

1035 if self._version_info_token is refresh_token: 

1036 assert self._version_info is not None, "Bug: corrupted token-check state" 

1037 return self._version_info 

1038 

1039 # Run "git version" and parse it. 

1040 process_version = self._call_process("version") 

1041 version_string = process_version.split(" ")[2] 

1042 version_fields = version_string.split(".")[:4] 

1043 leading_numeric_fields = itertools.takewhile(str.isdigit, version_fields) 

1044 self._version_info = tuple(map(int, leading_numeric_fields)) 

1045 

1046 # This value will be considered valid until the next refresh. 

1047 self._version_info_token = refresh_token 

1048 return self._version_info 

1049 

1050 @overload 

1051 def execute( 

1052 self, 

1053 command: Union[str, Sequence[Any]], 

1054 *, 

1055 as_process: Literal[True], 

1056 ) -> "AutoInterrupt": ... 

1057 

1058 @overload 

1059 def execute( 

1060 self, 

1061 command: Union[str, Sequence[Any]], 

1062 *, 

1063 as_process: Literal[False] = False, 

1064 stdout_as_string: Literal[True], 

1065 ) -> Union[str, Tuple[int, str, str]]: ... 

1066 

1067 @overload 

1068 def execute( 

1069 self, 

1070 command: Union[str, Sequence[Any]], 

1071 *, 

1072 as_process: Literal[False] = False, 

1073 stdout_as_string: Literal[False] = False, 

1074 ) -> Union[bytes, Tuple[int, bytes, str]]: ... 

1075 

1076 @overload 

1077 def execute( 

1078 self, 

1079 command: Union[str, Sequence[Any]], 

1080 *, 

1081 with_extended_output: Literal[False], 

1082 as_process: Literal[False], 

1083 stdout_as_string: Literal[True], 

1084 ) -> str: ... 

1085 

1086 @overload 

1087 def execute( 

1088 self, 

1089 command: Union[str, Sequence[Any]], 

1090 *, 

1091 with_extended_output: Literal[False], 

1092 as_process: Literal[False], 

1093 stdout_as_string: Literal[False], 

1094 ) -> bytes: ... 

1095 

1096 def execute( 

1097 self, 

1098 command: Union[str, Sequence[Any]], 

1099 istream: Union[None, BinaryIO] = None, 

1100 with_extended_output: bool = False, 

1101 with_exceptions: bool = True, 

1102 as_process: bool = False, 

1103 output_stream: Union[None, BinaryIO] = None, 

1104 stdout_as_string: bool = True, 

1105 kill_after_timeout: Union[None, float] = None, 

1106 with_stdout: bool = True, 

1107 universal_newlines: bool = False, 

1108 shell: Union[None, bool] = None, 

1109 env: Union[None, Mapping[str, str]] = None, 

1110 max_chunk_size: int = io.DEFAULT_BUFFER_SIZE, 

1111 strip_newline_in_stdout: bool = True, 

1112 **subprocess_kwargs: Any, 

1113 ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], AutoInterrupt]: 

1114 R"""Handle executing the command, and consume and return the returned 

1115 information (stdout). 

1116 

1117 :param command: 

1118 The command argument list to execute. 

1119 It should be a sequence of program arguments, or a string. The 

1120 program to execute is the first item in the args sequence or string. 

1121 

1122 :param istream: 

1123 Standard input filehandle passed to :class:`subprocess.Popen`. 

1124 

1125 :param with_extended_output: 

1126 Whether to return a (status, stdout, stderr) tuple. 

1127 

1128 :param with_exceptions: 

1129 Whether to raise an exception when git returns a non-zero status. 

1130 

1131 :param as_process: 

1132 Whether to return the created process instance directly from which 

1133 streams can be read on demand. This will render `with_extended_output` 

1134 and `with_exceptions` ineffective - the caller will have to deal with 

1135 the details. It is important to note that the process will be placed 

1136 into an :class:`AutoInterrupt` wrapper that will interrupt the process 

1137 once it goes out of scope. If you use the command in iterators, you 

1138 should pass the whole process instance instead of a single stream. 

1139 

1140 :param output_stream: 

1141 If set to a file-like object, data produced by the git command will be 

1142 copied to the given stream instead of being returned as a string. 

1143 This feature only has any effect if `as_process` is ``False``. 

1144 

1145 :param stdout_as_string: 

1146 If ``False``, the command's standard output will be bytes. Otherwise, it 

1147 will be decoded into a string using the default encoding (usually UTF-8). 

1148 The latter can fail, if the output contains binary data. 

1149 

1150 :param kill_after_timeout: 

1151 Specifies a timeout in seconds for the git command, after which the process 

1152 should be killed. This will have no effect if `as_process` is set to 

1153 ``True``. It is set to ``None`` by default and will let the process run 

1154 until the timeout is explicitly specified. Uses of this feature should be 

1155 carefully considered, due to the following limitations: 

1156 

1157 1. This feature is not supported at all on Windows. 

1158 2. Effectiveness may vary by operating system. ``ps --ppid`` is used to 

1159 enumerate child processes, which is available on most GNU/Linux systems 

1160 but not most others. 

1161 3. Deeper descendants do not receive signals, though they may sometimes 

1162 terminate as a consequence of their parent processes being killed. 

1163 4. `kill_after_timeout` uses ``SIGKILL``, which can have negative side 

1164 effects on a repository. For example, stale locks in case of 

1165 :manpage:`git-gc(1)` could render the repository incapable of accepting 

1166 changes until the lock is manually removed. 

1167 

1168 :param with_stdout: 

1169 If ``True``, default ``True``, we open stdout on the created process. 

1170 

1171 :param universal_newlines: 

1172 If ``True``, pipes will be opened as text, and lines are split at all known 

1173 line endings. 

1174 

1175 :param shell: 

1176 Whether to invoke commands through a shell 

1177 (see :class:`Popen(..., shell=True) <subprocess.Popen>`). 

1178 If this is not ``None``, it overrides :attr:`USE_SHELL`. 

1179 

1180 Passing ``shell=True`` to this or any other GitPython function should be 

1181 avoided, as it is unsafe under most circumstances. This is because it is 

1182 typically not feasible to fully consider and account for the effect of shell 

1183 expansions, especially when passing ``shell=True`` to other methods that 

1184 forward it to :meth:`Git.execute`. Passing ``shell=True`` is also no longer 

1185 needed (nor useful) to work around any known operating system specific 

1186 issues. 

1187 

1188 :param env: 

1189 A dictionary of environment variables to be passed to 

1190 :class:`subprocess.Popen`. 

1191 

1192 :param max_chunk_size: 

1193 Maximum number of bytes in one chunk of data passed to the `output_stream` 

1194 in one invocation of its ``write()`` method. If the given number is not 

1195 positive then the default value is used. 

1196 

1197 :param strip_newline_in_stdout: 

1198 Whether to strip the trailing ``\n`` of the command stdout. 

1199 

1200 :param subprocess_kwargs: 

1201 Keyword arguments to be passed to :class:`subprocess.Popen`. Please note 

1202 that some of the valid kwargs are already set by this method; the ones you 

1203 specify may not be the same ones. 

1204 

1205 :return: 

1206 * str(output), if `extended_output` is ``False`` (Default) 

1207 * tuple(int(status), str(stdout), str(stderr)), 

1208 if `extended_output` is ``True`` 

1209 

1210 If `output_stream` is ``True``, the stdout value will be your output stream: 

1211 

1212 * output_stream, if `extended_output` is ``False`` 

1213 * tuple(int(status), output_stream, str(stderr)), 

1214 if `extended_output` is ``True`` 

1215 

1216 Note that git is executed with ``LC_MESSAGES="C"`` to ensure consistent 

1217 output regardless of system language. 

1218 

1219 :raise git.exc.GitCommandError: 

1220 

1221 :note: 

1222 If you add additional keyword arguments to the signature of this method, you 

1223 must update the ``execute_kwargs`` variable housed in this module. 

1224 """ 

1225 # Remove password for the command if present. 

1226 redacted_command = remove_password_if_present(command) 

1227 if self.GIT_PYTHON_TRACE and (self.GIT_PYTHON_TRACE != "full" or as_process): 

1228 _logger.info(" ".join(redacted_command)) 

1229 

1230 # Allow the user to have the command executed in their working dir. 

1231 try: 

1232 cwd = self._working_dir or os.getcwd() # type: Union[None, str] 

1233 if not os.access(str(cwd), os.X_OK): 

1234 cwd = None 

1235 except FileNotFoundError: 

1236 cwd = None 

1237 

1238 # Start the process. 

1239 inline_env = env 

1240 env = os.environ.copy() 

1241 # Attempt to force all output to plain ASCII English, which is what some parsing 

1242 # code may expect. 

1243 # According to https://askubuntu.com/a/311796, we are setting LANGUAGE as well 

1244 # just to be sure. 

1245 env["LANGUAGE"] = "C" 

1246 env["LC_ALL"] = "C" 

1247 env.update(self._environment) 

1248 if inline_env is not None: 

1249 env.update(inline_env) 

1250 

1251 if sys.platform == "win32": 

1252 if kill_after_timeout is not None: 

1253 raise GitCommandError( 

1254 redacted_command, 

1255 '"kill_after_timeout" feature is not supported on Windows.', 

1256 ) 

1257 cmd_not_found_exception = OSError 

1258 else: 

1259 cmd_not_found_exception = FileNotFoundError 

1260 # END handle 

1261 

1262 stdout_sink = PIPE if with_stdout else getattr(subprocess, "DEVNULL", None) or open(os.devnull, "wb") 

1263 if shell is None: 

1264 # Get the value of USE_SHELL with no deprecation warning. Do this without 

1265 # warnings.catch_warnings, to avoid a race condition with application code 

1266 # configuring warnings. The value could be looked up in type(self).__dict__ 

1267 # or Git.__dict__, but those can break under some circumstances. This works 

1268 # the same as self.USE_SHELL in more situations; see Git.__getattribute__. 

1269 shell = super().__getattribute__("USE_SHELL") 

1270 _logger.debug( 

1271 "Popen(%s, cwd=%s, stdin=%s, shell=%s, universal_newlines=%s)", 

1272 redacted_command, 

1273 cwd, 

1274 "<valid stream>" if istream else "None", 

1275 shell, 

1276 universal_newlines, 

1277 ) 

1278 try: 

1279 proc = safer_popen( 

1280 command, 

1281 env=env, 

1282 cwd=cwd, 

1283 bufsize=-1, 

1284 stdin=(istream or DEVNULL), 

1285 stderr=PIPE, 

1286 stdout=stdout_sink, 

1287 shell=shell, 

1288 universal_newlines=universal_newlines, 

1289 encoding=defenc if universal_newlines else None, 

1290 **subprocess_kwargs, 

1291 ) 

1292 except cmd_not_found_exception as err: 

1293 raise GitCommandNotFound(redacted_command, err) from err 

1294 else: 

1295 # Replace with a typeguard for Popen[bytes]? 

1296 proc.stdout = cast(BinaryIO, proc.stdout) 

1297 proc.stderr = cast(BinaryIO, proc.stderr) 

1298 

1299 if as_process: 

1300 return self.AutoInterrupt(proc, command) 

1301 

1302 if sys.platform != "win32" and kill_after_timeout is not None: 

1303 # Help mypy figure out this is not None even when used inside communicate(). 

1304 timeout = kill_after_timeout 

1305 

1306 def kill_process(pid: int) -> None: 

1307 """Callback to kill a process. 

1308 

1309 This callback implementation would be ineffective and unsafe on Windows. 

1310 """ 

1311 p = Popen(["ps", "--ppid", str(pid)], stdout=PIPE) 

1312 child_pids = [] 

1313 if p.stdout is not None: 

1314 for line in p.stdout: 

1315 if len(line.split()) > 0: 

1316 local_pid = (line.split())[0] 

1317 if local_pid.isdigit(): 

1318 child_pids.append(int(local_pid)) 

1319 try: 

1320 os.kill(pid, signal.SIGKILL) 

1321 for child_pid in child_pids: 

1322 try: 

1323 os.kill(child_pid, signal.SIGKILL) 

1324 except OSError: 

1325 pass 

1326 # Tell the main routine that the process was killed. 

1327 kill_check.set() 

1328 except OSError: 

1329 # It is possible that the process gets completed in the duration 

1330 # after timeout happens and before we try to kill the process. 

1331 pass 

1332 return 

1333 

1334 def communicate() -> Tuple[AnyStr, AnyStr]: 

1335 watchdog.start() 

1336 out, err = proc.communicate() 

1337 watchdog.cancel() 

1338 if kill_check.is_set(): 

1339 err = 'Timeout: the command "%s" did not complete in %d secs.' % ( 

1340 " ".join(redacted_command), 

1341 timeout, 

1342 ) 

1343 if not universal_newlines: 

1344 err = err.encode(defenc) 

1345 return out, err 

1346 

1347 # END helpers 

1348 

1349 kill_check = threading.Event() 

1350 watchdog = threading.Timer(timeout, kill_process, args=(proc.pid,)) 

1351 else: 

1352 communicate = proc.communicate 

1353 

1354 # Wait for the process to return. 

1355 status = 0 

1356 stdout_value: Union[str, bytes] = b"" 

1357 stderr_value: Union[str, bytes] = b"" 

1358 newline = "\n" if universal_newlines else b"\n" 

1359 try: 

1360 if output_stream is None: 

1361 stdout_value, stderr_value = communicate() 

1362 # Strip trailing "\n". 

1363 if stdout_value.endswith(newline) and strip_newline_in_stdout: # type: ignore[arg-type] 

1364 stdout_value = stdout_value[:-1] 

1365 if stderr_value.endswith(newline): # type: ignore[arg-type] 

1366 stderr_value = stderr_value[:-1] 

1367 

1368 status = proc.returncode 

1369 else: 

1370 max_chunk_size = max_chunk_size if max_chunk_size and max_chunk_size > 0 else io.DEFAULT_BUFFER_SIZE 

1371 stream_copy(proc.stdout, output_stream, max_chunk_size) 

1372 stdout_value = proc.stdout.read() 

1373 stderr_value = proc.stderr.read() 

1374 # Strip trailing "\n". 

1375 if stderr_value.endswith(newline): # type: ignore[arg-type] 

1376 stderr_value = stderr_value[:-1] 

1377 status = proc.wait() 

1378 # END stdout handling 

1379 finally: 

1380 proc.stdout.close() 

1381 proc.stderr.close() 

1382 

1383 if self.GIT_PYTHON_TRACE == "full": 

1384 cmdstr = " ".join(redacted_command) 

1385 

1386 def as_text(stdout_value: Union[bytes, str]) -> str: 

1387 return not output_stream and safe_decode(stdout_value) or "<OUTPUT_STREAM>" 

1388 

1389 # END as_text 

1390 

1391 if stderr_value: 

1392 _logger.info( 

1393 "%s -> %d; stdout: '%s'; stderr: '%s'", 

1394 cmdstr, 

1395 status, 

1396 as_text(stdout_value), 

1397 safe_decode(stderr_value), 

1398 ) 

1399 elif stdout_value: 

1400 _logger.info("%s -> %d; stdout: '%s'", cmdstr, status, as_text(stdout_value)) 

1401 else: 

1402 _logger.info("%s -> %d", cmdstr, status) 

1403 # END handle debug printing 

1404 

1405 if with_exceptions and status != 0: 

1406 raise GitCommandError(redacted_command, status, stderr_value, stdout_value) 

1407 

1408 if isinstance(stdout_value, bytes) and stdout_as_string: # Could also be output_stream. 

1409 stdout_value = safe_decode(stdout_value) 

1410 

1411 # Allow access to the command's status code. 

1412 if with_extended_output: 

1413 return (status, stdout_value, safe_decode(stderr_value)) 

1414 else: 

1415 return stdout_value 

1416 

1417 def environment(self) -> Dict[str, str]: 

1418 return self._environment 

1419 

1420 def update_environment(self, **kwargs: Any) -> Dict[str, Union[str, None]]: 

1421 """Set environment variables for future git invocations. Return all changed 

1422 values in a format that can be passed back into this function to revert the 

1423 changes. 

1424 

1425 Examples:: 

1426 

1427 old_env = self.update_environment(PWD='/tmp') 

1428 self.update_environment(**old_env) 

1429 

1430 :param kwargs: 

1431 Environment variables to use for git processes. 

1432 

1433 :return: 

1434 Dict that maps environment variables to their old values 

1435 """ 

1436 old_env = {} 

1437 for key, value in kwargs.items(): 

1438 # Set value if it is None. 

1439 if value is not None: 

1440 old_env[key] = self._environment.get(key) 

1441 self._environment[key] = value 

1442 # Remove key from environment if its value is None. 

1443 elif key in self._environment: 

1444 old_env[key] = self._environment[key] 

1445 del self._environment[key] 

1446 return old_env 

1447 

1448 @contextlib.contextmanager 

1449 def custom_environment(self, **kwargs: Any) -> Iterator[None]: 

1450 """A context manager around the above :meth:`update_environment` method to 

1451 restore the environment back to its previous state after operation. 

1452 

1453 Examples:: 

1454 

1455 with self.custom_environment(GIT_SSH='/bin/ssh_wrapper'): 

1456 repo.remotes.origin.fetch() 

1457 

1458 :param kwargs: 

1459 See :meth:`update_environment`. 

1460 """ 

1461 old_env = self.update_environment(**kwargs) 

1462 try: 

1463 yield 

1464 finally: 

1465 self.update_environment(**old_env) 

1466 

1467 def transform_kwarg(self, name: str, value: Any, split_single_char_options: bool) -> List[str]: 

1468 if len(name) == 1: 

1469 if value is True: 

1470 return ["-%s" % name] 

1471 elif value not in (False, None): 

1472 if split_single_char_options: 

1473 return ["-%s" % name, "%s" % value] 

1474 else: 

1475 return ["-%s%s" % (name, value)] 

1476 else: 

1477 if value is True: 

1478 return ["--%s" % dashify(name)] 

1479 elif value is not False and value is not None: 

1480 return ["--%s=%s" % (dashify(name), value)] 

1481 return [] 

1482 

1483 def transform_kwargs(self, split_single_char_options: bool = True, **kwargs: Any) -> List[str]: 

1484 """Transform Python-style kwargs into git command line options.""" 

1485 args = [] 

1486 for k, v in kwargs.items(): 

1487 if isinstance(v, (list, tuple)): 

1488 for value in v: 

1489 args += self.transform_kwarg(k, value, split_single_char_options) 

1490 else: 

1491 args += self.transform_kwarg(k, v, split_single_char_options) 

1492 return args 

1493 

1494 @classmethod 

1495 def _unpack_args(cls, arg_list: Sequence[str]) -> List[str]: 

1496 outlist = [] 

1497 if isinstance(arg_list, (list, tuple)): 

1498 for arg in arg_list: 

1499 outlist.extend(cls._unpack_args(arg)) 

1500 else: 

1501 outlist.append(str(arg_list)) 

1502 

1503 return outlist 

1504 

1505 def __call__(self, **kwargs: Any) -> "Git": 

1506 """Specify command line options to the git executable for a subcommand call. 

1507 

1508 :param kwargs: 

1509 A dict of keyword arguments. 

1510 These arguments are passed as in :meth:`_call_process`, but will be passed 

1511 to the git command rather than the subcommand. 

1512 

1513 Examples:: 

1514 

1515 git(work_tree='/tmp').difftool() 

1516 """ 

1517 self._git_options = self.transform_kwargs(split_single_char_options=True, **kwargs) 

1518 return self 

1519 

1520 @overload 

1521 def _call_process( 

1522 self, method: str, *args: None, **kwargs: None 

1523 ) -> str: ... # If no args were given, execute the call with all defaults. 

1524 

1525 @overload 

1526 def _call_process( 

1527 self, 

1528 method: str, 

1529 istream: int, 

1530 as_process: Literal[True], 

1531 *args: Any, 

1532 **kwargs: Any, 

1533 ) -> "Git.AutoInterrupt": ... 

1534 

1535 @overload 

1536 def _call_process( 

1537 self, method: str, *args: Any, **kwargs: Any 

1538 ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]: ... 

1539 

1540 def _call_process( 

1541 self, method: str, *args: Any, **kwargs: Any 

1542 ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]: 

1543 """Run the given git command with the specified arguments and return the result 

1544 as a string. 

1545 

1546 :param method: 

1547 The command. Contained ``_`` characters will be converted to hyphens, such 

1548 as in ``ls_files`` to call ``ls-files``. 

1549 

1550 :param args: 

1551 The list of arguments. If ``None`` is included, it will be pruned. 

1552 This allows your commands to call git more conveniently, as ``None`` is 

1553 realized as non-existent. 

1554 

1555 :param kwargs: 

1556 Contains key-values for the following: 

1557 

1558 - The :meth:`execute()` kwds, as listed in ``execute_kwargs``. 

1559 - "Command options" to be converted by :meth:`transform_kwargs`. 

1560 - The ``insert_kwargs_after`` key which its value must match one of 

1561 ``*args``. 

1562 

1563 It also contains any command options, to be appended after the matched arg. 

1564 

1565 Examples:: 

1566 

1567 git.rev_list('master', max_count=10, header=True) 

1568 

1569 turns into:: 

1570 

1571 git rev-list max-count 10 --header master 

1572 

1573 :return: 

1574 Same as :meth:`execute`. If no args are given, used :meth:`execute`'s 

1575 default (especially ``as_process = False``, ``stdout_as_string = True``) and 

1576 return :class:`str`. 

1577 """ 

1578 # Handle optional arguments prior to calling transform_kwargs. 

1579 # Otherwise these'll end up in args, which is bad. 

1580 exec_kwargs = {k: v for k, v in kwargs.items() if k in execute_kwargs} 

1581 opts_kwargs = {k: v for k, v in kwargs.items() if k not in execute_kwargs} 

1582 

1583 insert_after_this_arg = opts_kwargs.pop("insert_kwargs_after", None) 

1584 

1585 # Prepare the argument list. 

1586 

1587 opt_args = self.transform_kwargs(**opts_kwargs) 

1588 ext_args = self._unpack_args([a for a in args if a is not None]) 

1589 

1590 if insert_after_this_arg is None: 

1591 args_list = opt_args + ext_args 

1592 else: 

1593 try: 

1594 index = ext_args.index(insert_after_this_arg) 

1595 except ValueError as err: 

1596 raise ValueError( 

1597 "Couldn't find argument '%s' in args %s to insert cmd options after" 

1598 % (insert_after_this_arg, str(ext_args)) 

1599 ) from err 

1600 # END handle error 

1601 args_list = ext_args[: index + 1] + opt_args + ext_args[index + 1 :] 

1602 # END handle opts_kwargs 

1603 

1604 call = [self.GIT_PYTHON_GIT_EXECUTABLE] 

1605 

1606 # Add persistent git options. 

1607 call.extend(self._persistent_git_options) 

1608 

1609 # Add the git options, then reset to empty to avoid side effects. 

1610 call.extend(self._git_options) 

1611 self._git_options = () 

1612 

1613 call.append(dashify(method)) 

1614 call.extend(args_list) 

1615 

1616 return self.execute(call, **exec_kwargs) 

1617 

1618 def _parse_object_header(self, header_line: str) -> Tuple[str, str, int]: 

1619 """ 

1620 :param header_line: 

1621 A line of the form:: 

1622 

1623 <hex_sha> type_string size_as_int 

1624 

1625 :return: 

1626 (hex_sha, type_string, size_as_int) 

1627 

1628 :raise ValueError: 

1629 If the header contains indication for an error due to incorrect input sha. 

1630 """ 

1631 tokens = header_line.split() 

1632 if len(tokens) != 3: 

1633 if not tokens: 

1634 err_msg = ( 

1635 f"SHA is empty, possible dubious ownership in the repository " 

1636 f"""at {self._working_dir}.\n If this is unintended run:\n\n """ 

1637 f""" "git config --global --add safe.directory {self._working_dir}" """ 

1638 ) 

1639 raise ValueError(err_msg) 

1640 else: 

1641 raise ValueError("SHA %s could not be resolved, git returned: %r" % (tokens[0], header_line.strip())) 

1642 # END handle actual return value 

1643 # END error handling 

1644 

1645 if len(tokens[0]) != 40: 

1646 raise ValueError("Failed to parse header: %r" % header_line) 

1647 return (tokens[0], tokens[1], int(tokens[2])) 

1648 

1649 def _prepare_ref(self, ref: AnyStr) -> bytes: 

1650 # Required for command to separate refs on stdin, as bytes. 

1651 if isinstance(ref, bytes): 

1652 # Assume 40 bytes hexsha - bin-to-ascii for some reason returns bytes, not text. 

1653 refstr: str = ref.decode("ascii") 

1654 elif not isinstance(ref, str): 

1655 refstr = str(ref) # Could be ref-object. 

1656 else: 

1657 refstr = ref 

1658 

1659 if not refstr.endswith("\n"): 

1660 refstr += "\n" 

1661 return refstr.encode(defenc) 

1662 

1663 def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any) -> "Git.AutoInterrupt": 

1664 cur_val = getattr(self, attr_name) 

1665 if cur_val is not None: 

1666 return cur_val 

1667 

1668 options = {"istream": PIPE, "as_process": True} 

1669 options.update(kwargs) 

1670 

1671 cmd = self._call_process(cmd_name, *args, **options) 

1672 setattr(self, attr_name, cmd) 

1673 cmd = cast("Git.AutoInterrupt", cmd) 

1674 return cmd 

1675 

1676 def __get_object_header(self, cmd: "Git.AutoInterrupt", ref: AnyStr) -> Tuple[str, str, int]: 

1677 if cmd.stdin and cmd.stdout: 

1678 cmd.stdin.write(self._prepare_ref(ref)) 

1679 cmd.stdin.flush() 

1680 return self._parse_object_header(cmd.stdout.readline()) 

1681 else: 

1682 raise ValueError("cmd stdin was empty") 

1683 

1684 def get_object_header(self, ref: str) -> Tuple[str, str, int]: 

1685 """Use this method to quickly examine the type and size of the object behind the 

1686 given ref. 

1687 

1688 :note: 

1689 The method will only suffer from the costs of command invocation once and 

1690 reuses the command in subsequent calls. 

1691 

1692 :return: 

1693 (hexsha, type_string, size_as_int) 

1694 """ 

1695 cmd = self._get_persistent_cmd("cat_file_header", "cat_file", batch_check=True) 

1696 return self.__get_object_header(cmd, ref) 

1697 

1698 def get_object_data(self, ref: str) -> Tuple[str, str, int, bytes]: 

1699 """Similar to :meth:`get_object_header`, but returns object data as well. 

1700 

1701 :return: 

1702 (hexsha, type_string, size_as_int, data_string) 

1703 

1704 :note: 

1705 Not threadsafe. 

1706 """ 

1707 hexsha, typename, size, stream = self.stream_object_data(ref) 

1708 data = stream.read(size) 

1709 del stream 

1710 return (hexsha, typename, size, data) 

1711 

1712 def stream_object_data(self, ref: str) -> Tuple[str, str, int, "Git.CatFileContentStream"]: 

1713 """Similar to :meth:`get_object_data`, but returns the data as a stream. 

1714 

1715 :return: 

1716 (hexsha, type_string, size_as_int, stream) 

1717 

1718 :note: 

1719 This method is not threadsafe. You need one independent :class:`Git` 

1720 instance per thread to be safe! 

1721 """ 

1722 cmd = self._get_persistent_cmd("cat_file_all", "cat_file", batch=True) 

1723 hexsha, typename, size = self.__get_object_header(cmd, ref) 

1724 cmd_stdout = cmd.stdout if cmd.stdout is not None else io.BytesIO() 

1725 return (hexsha, typename, size, self.CatFileContentStream(size, cmd_stdout)) 

1726 

1727 def clear_cache(self) -> "Git": 

1728 """Clear all kinds of internal caches to release resources. 

1729 

1730 Currently persistent commands will be interrupted. 

1731 

1732 :return: 

1733 self 

1734 """ 

1735 for cmd in (self.cat_file_all, self.cat_file_header): 

1736 if cmd: 

1737 cmd.__del__() 

1738 

1739 self.cat_file_all = None 

1740 self.cat_file_header = None 

1741 return self