Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/git/cmd.py: 61%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

611 statements  

1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

2# 

3# This module is part of GitPython and is released under the 

4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ 

5 

6from __future__ import annotations 

7 

8__all__ = ["GitMeta", "Git"] 

9 

10import contextlib 

11import io 

12import itertools 

13import logging 

14import os 

15import re 

16import signal 

17import subprocess 

18from subprocess import DEVNULL, PIPE, Popen 

19import sys 

20from textwrap import dedent 

21import threading 

22import warnings 

23 

24from git.compat import defenc, force_bytes, safe_decode 

25from git.exc import ( 

26 CommandError, 

27 GitCommandError, 

28 GitCommandNotFound, 

29 UnsafeOptionError, 

30 UnsafeProtocolError, 

31) 

32from git.util import ( 

33 cygpath, 

34 expand_path, 

35 is_cygwin_git, 

36 patch_env, 

37 remove_password_if_present, 

38 stream_copy, 

39) 

40 

41# typing --------------------------------------------------------------------------- 

42 

43from typing import ( 

44 Any, 

45 AnyStr, 

46 BinaryIO, 

47 Callable, 

48 Dict, 

49 IO, 

50 Iterator, 

51 List, 

52 Mapping, 

53 Optional, 

54 Sequence, 

55 TYPE_CHECKING, 

56 TextIO, 

57 Tuple, 

58 Union, 

59 cast, 

60 overload, 

61) 

62 

63if sys.version_info >= (3, 10): 

64 from typing import TypeAlias 

65else: 

66 from typing_extensions import TypeAlias 

67 

68from git.types import Literal, PathLike, TBD 

69 

70if TYPE_CHECKING: 

71 from git.diff import DiffIndex 

72 from git.repo.base import Repo 

73 

74# --------------------------------------------------------------------------------- 

75 

76execute_kwargs = { 

77 "istream", 

78 "with_extended_output", 

79 "with_exceptions", 

80 "as_process", 

81 "output_stream", 

82 "stdout_as_string", 

83 "kill_after_timeout", 

84 "with_stdout", 

85 "universal_newlines", 

86 "shell", 

87 "env", 

88 "max_chunk_size", 

89 "strip_newline_in_stdout", 

90} 

91 

92_logger = logging.getLogger(__name__) 

93 

94 

95# ============================================================================== 

96## @name Utilities 

97# ------------------------------------------------------------------------------ 

98# Documentation 

99## @{ 

100 

101 

102def handle_process_output( 

103 process: "Git.AutoInterrupt" | Popen, 

104 stdout_handler: Union[ 

105 None, 

106 Callable[[AnyStr], None], 

107 Callable[[List[AnyStr]], None], 

108 Callable[[bytes, "Repo", "DiffIndex"], None], 

109 ], 

110 stderr_handler: Union[None, Callable[[AnyStr], None], Callable[[List[AnyStr]], None]], 

111 finalizer: Union[None, Callable[[Union[Popen, "Git.AutoInterrupt"]], None]] = None, 

112 decode_streams: bool = True, 

113 kill_after_timeout: Union[None, float] = None, 

114) -> None: 

115 R"""Register for notifications to learn that process output is ready to read, and 

116 dispatch lines to the respective line handlers. 

117 

118 This function returns once the finalizer returns. 

119 

120 :param process: 

121 :class:`subprocess.Popen` instance. 

122 

123 :param stdout_handler: 

124 f(stdout_line_string), or ``None``. 

125 

126 :param stderr_handler: 

127 f(stderr_line_string), or ``None``. 

128 

129 :param finalizer: 

130 f(proc) - wait for proc to finish. 

131 

132 :param decode_streams: 

133 Assume stdout/stderr streams are binary and decode them before pushing their 

134 contents to handlers. 

135 

136 This defaults to ``True``. Set it to ``False`` if: 

137 

138 - ``universal_newlines == True``, as then streams are in text mode, or 

139 - decoding must happen later, such as for :class:`~git.diff.Diff`\s. 

140 

141 :param kill_after_timeout: 

142 :class:`float` or ``None``, Default = ``None`` 

143 

144 To specify a timeout in seconds for the git command, after which the process 

145 should be killed. 

146 """ 

147 

148 # Use 2 "pump" threads and wait for both to finish. 

149 def pump_stream( 

150 cmdline: List[str], 

151 name: str, 

152 stream: Union[BinaryIO, TextIO], 

153 is_decode: bool, 

154 handler: Union[None, Callable[[Union[bytes, str]], None]], 

155 ) -> None: 

156 try: 

157 for line in stream: 

158 if handler: 

159 if is_decode: 

160 assert isinstance(line, bytes) 

161 line_str = line.decode(defenc) 

162 handler(line_str) 

163 else: 

164 handler(line) 

165 

166 except Exception as ex: 

167 _logger.error(f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}") 

168 if "I/O operation on closed file" not in str(ex): 

169 # Only reraise if the error was not due to the stream closing. 

170 raise CommandError([f"<{name}-pump>"] + remove_password_if_present(cmdline), ex) from ex 

171 finally: 

172 stream.close() 

173 

174 if hasattr(process, "proc"): 

175 process = cast("Git.AutoInterrupt", process) 

176 cmdline: str | Tuple[str, ...] | List[str] = getattr(process.proc, "args", "") 

177 p_stdout = process.proc.stdout if process.proc else None 

178 p_stderr = process.proc.stderr if process.proc else None 

179 else: 

180 process = cast(Popen, process) # type: ignore[redundant-cast] 

181 cmdline = getattr(process, "args", "") 

182 p_stdout = process.stdout 

183 p_stderr = process.stderr 

184 

185 if not isinstance(cmdline, (tuple, list)): 

186 cmdline = cmdline.split() 

187 

188 pumps: List[Tuple[str, IO, Callable[..., None] | None]] = [] 

189 if p_stdout: 

190 pumps.append(("stdout", p_stdout, stdout_handler)) 

191 if p_stderr: 

192 pumps.append(("stderr", p_stderr, stderr_handler)) 

193 

194 threads: List[threading.Thread] = [] 

195 

196 for name, stream, handler in pumps: 

197 t = threading.Thread(target=pump_stream, args=(cmdline, name, stream, decode_streams, handler)) 

198 t.daemon = True 

199 t.start() 

200 threads.append(t) 

201 

202 # FIXME: Why join? Will block if stdin needs feeding... 

203 for t in threads: 

204 t.join(timeout=kill_after_timeout) 

205 if t.is_alive(): 

206 if isinstance(process, Git.AutoInterrupt): 

207 process._terminate() 

208 else: # Don't want to deal with the other case. 

209 raise RuntimeError( 

210 "Thread join() timed out in cmd.handle_process_output()." 

211 f" kill_after_timeout={kill_after_timeout} seconds" 

212 ) 

213 if stderr_handler: 

214 error_str: Union[str, bytes] = ( 

215 f"error: process killed because it timed out. kill_after_timeout={kill_after_timeout} seconds" 

216 ) 

217 if not decode_streams and isinstance(p_stderr, BinaryIO): 

218 # Assume stderr_handler needs binary input. 

219 error_str = cast(str, error_str) 

220 error_str = error_str.encode() 

221 # We ignore typing on the next line because mypy does not like the way 

222 # we inferred that stderr takes str or bytes. 

223 stderr_handler(error_str) # type: ignore[arg-type] 

224 

225 if finalizer: 

226 finalizer(process) 

227 

228 

229safer_popen: Callable[..., Popen] 

230 

231if sys.platform == "win32": 

232 

233 def _safer_popen_windows( 

234 command: Union[str, Sequence[Any]], 

235 *, 

236 shell: bool = False, 

237 env: Optional[Mapping[str, str]] = None, 

238 **kwargs: Any, 

239 ) -> Popen: 

240 """Call :class:`subprocess.Popen` on Windows but don't include a CWD in the 

241 search. 

242 

243 This avoids an untrusted search path condition where a file like ``git.exe`` in 

244 a malicious repository would be run when GitPython operates on the repository. 

245 The process using GitPython may have an untrusted repository's working tree as 

246 its current working directory. Some operations may temporarily change to that 

247 directory before running a subprocess. In addition, while by default GitPython 

248 does not run external commands with a shell, it can be made to do so, in which 

249 case the CWD of the subprocess, which GitPython usually sets to a repository 

250 working tree, can itself be searched automatically by the shell. This wrapper 

251 covers all those cases. 

252 

253 :note: 

254 This currently works by setting the 

255 :envvar:`NoDefaultCurrentDirectoryInExePath` environment variable during 

256 subprocess creation. It also takes care of passing Windows-specific process 

257 creation flags, but that is unrelated to path search. 

258 

259 :note: 

260 The current implementation contains a race condition on :attr:`os.environ`. 

261 GitPython isn't thread-safe, but a program using it on one thread should 

262 ideally be able to mutate :attr:`os.environ` on another, without 

263 unpredictable results. See comments in: 

264 https://github.com/gitpython-developers/GitPython/pull/1650 

265 """ 

266 # CREATE_NEW_PROCESS_GROUP is needed for some ways of killing it afterwards. 

267 # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal 

268 # https://docs.python.org/3/library/subprocess.html#subprocess.CREATE_NEW_PROCESS_GROUP 

269 creationflags = subprocess.CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP 

270 

271 # When using a shell, the shell is the direct subprocess, so the variable must 

272 # be set in its environment, to affect its search behavior. 

273 if shell: 

274 # The original may be immutable, or the caller may reuse it. Mutate a copy. 

275 env = {} if env is None else dict(env) 

276 env["NoDefaultCurrentDirectoryInExePath"] = "1" # The "1" can be any value. 

277 

278 # When not using a shell, the current process does the search in a 

279 # CreateProcessW API call, so the variable must be set in our environment. With 

280 # a shell, that's unnecessary if https://github.com/python/cpython/issues/101283 

281 # is patched. In Python versions where it is unpatched, in the rare case the 

282 # ComSpec environment variable is unset, the search for the shell itself is 

283 # unsafe. Setting NoDefaultCurrentDirectoryInExePath in all cases, as done here, 

284 # is simpler and protects against that. (As above, the "1" can be any value.) 

285 with patch_env("NoDefaultCurrentDirectoryInExePath", "1"): 

286 return Popen( 

287 command, 

288 shell=shell, 

289 env=env, 

290 creationflags=creationflags, 

291 **kwargs, 

292 ) 

293 

294 safer_popen = _safer_popen_windows 

295else: 

296 safer_popen = Popen 

297 

298 

299def dashify(string: str) -> str: 

300 return string.replace("_", "-") 

301 

302 

303def slots_to_dict(self: "Git", exclude: Sequence[str] = ()) -> Dict[str, Any]: 

304 return {s: getattr(self, s) for s in self.__slots__ if s not in exclude} 

305 

306 

307def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], excluded: Sequence[str] = ()) -> None: 

308 for k, v in d.items(): 

309 setattr(self, k, v) 

310 for k in excluded: 

311 setattr(self, k, None) 

312 

313 

314## -- End Utilities -- @} 

315 

316 

317class _AutoInterrupt: 

318 """Process wrapper that terminates the wrapped process on finalization. 

319 

320 This kills/interrupts the stored process instance once this instance goes out of 

321 scope. It is used to prevent processes piling up in case iterators stop reading. 

322 

323 All attributes are wired through to the contained process object. 

324 

325 The wait method is overridden to perform automatic status code checking and possibly 

326 raise. 

327 """ 

328 

329 __slots__ = ("proc", "args", "status") 

330 

331 # If this is non-zero it will override any status code during _terminate, used 

332 # to prevent race conditions in testing. 

333 _status_code_if_terminate: int = 0 

334 

335 def __init__(self, proc: Union[None, subprocess.Popen], args: Any) -> None: 

336 self.proc = proc 

337 self.args = args 

338 self.status: Union[int, None] = None 

339 

340 def _terminate(self) -> None: 

341 """Terminate the underlying process.""" 

342 if self.proc is None: 

343 return 

344 

345 proc = self.proc 

346 self.proc = None 

347 if proc.stdin: 

348 proc.stdin.close() 

349 if proc.stdout: 

350 proc.stdout.close() 

351 if proc.stderr: 

352 proc.stderr.close() 

353 # Did the process finish already so we have a return code? 

354 try: 

355 if proc.poll() is not None: 

356 self.status = self._status_code_if_terminate or proc.poll() 

357 return 

358 except OSError as ex: 

359 _logger.info("Ignored error after process had died: %r", ex) 

360 

361 # It can be that nothing really exists anymore... 

362 if os is None or getattr(os, "kill", None) is None: 

363 return 

364 

365 # Try to kill it. 

366 try: 

367 proc.terminate() 

368 status = proc.wait() # Ensure the process goes away. 

369 

370 self.status = self._status_code_if_terminate or status 

371 except (OSError, AttributeError) as ex: 

372 # On interpreter shutdown (notably on Windows), parts of the stdlib used by 

373 # subprocess can already be torn down (e.g. `subprocess._winapi` becomes None), 

374 # which can cause AttributeError during terminate(). In that case, we prefer 

375 # to silently ignore to avoid noisy "Exception ignored in: __del__" messages. 

376 _logger.info("Ignored error while terminating process: %r", ex) 

377 # END exception handling 

378 

379 def __del__(self) -> None: 

380 self._terminate() 

381 

382 def __getattr__(self, attr: str) -> Any: 

383 return getattr(self.proc, attr) 

384 

385 # TODO: Bad choice to mimic `proc.wait()` but with different args. 

386 def wait(self, stderr: Union[None, str, bytes] = b"") -> int: 

387 """Wait for the process and return its status code. 

388 

389 :param stderr: 

390 Previously read value of stderr, in case stderr is already closed. 

391 

392 :warn: 

393 May deadlock if output or error pipes are used and not handled separately. 

394 

395 :raise git.exc.GitCommandError: 

396 If the return status is not 0. 

397 """ 

398 if stderr is None: 

399 stderr_b = b"" 

400 stderr_b = force_bytes(data=stderr, encoding="utf-8") 

401 status: Union[int, None] 

402 if self.proc is not None: 

403 status = self.proc.wait() 

404 p_stderr = self.proc.stderr 

405 else: # Assume the underlying proc was killed earlier or never existed. 

406 status = self.status 

407 p_stderr = None 

408 

409 def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes: 

410 if stream: 

411 try: 

412 return stderr_b + force_bytes(stream.read()) 

413 except (OSError, ValueError): 

414 return stderr_b or b"" 

415 else: 

416 return stderr_b or b"" 

417 

418 # END status handling 

419 

420 if status != 0: 

421 errstr = read_all_from_possibly_closed_stream(p_stderr) 

422 _logger.debug("AutoInterrupt wait stderr: %r" % (errstr,)) 

423 raise GitCommandError(remove_password_if_present(self.args), status, errstr) 

424 return status 

425 

426 

427_AutoInterrupt.__name__ = "AutoInterrupt" 

428_AutoInterrupt.__qualname__ = "Git.AutoInterrupt" 

429 

430 

431class _CatFileContentStream: 

432 """Object representing a sized read-only stream returning the contents of 

433 an object. 

434 

435 This behaves like a stream, but counts the data read and simulates an empty stream 

436 once our sized content region is empty. 

437 

438 If not all data are read to the end of the object's lifetime, we read the rest to 

439 ensure the underlying stream continues to work. 

440 """ 

441 

442 __slots__ = ("_stream", "_nbr", "_size") 

443 

444 def __init__(self, size: int, stream: IO[bytes]) -> None: 

445 self._stream = stream 

446 self._size = size 

447 self._nbr = 0 # Number of bytes read. 

448 

449 # Special case: If the object is empty, has null bytes, get the final 

450 # newline right away. 

451 if size == 0: 

452 stream.read(1) 

453 # END handle empty streams 

454 

455 def read(self, size: int = -1) -> bytes: 

456 bytes_left = self._size - self._nbr 

457 if bytes_left == 0: 

458 return b"" 

459 if size > -1: 

460 # Ensure we don't try to read past our limit. 

461 size = min(bytes_left, size) 

462 else: 

463 # They try to read all, make sure it's not more than what remains. 

464 size = bytes_left 

465 # END check early depletion 

466 data = self._stream.read(size) 

467 self._nbr += len(data) 

468 

469 # Check for depletion, read our final byte to make the stream usable by 

470 # others. 

471 if self._size - self._nbr == 0: 

472 self._stream.read(1) # final newline 

473 # END finish reading 

474 return data 

475 

476 def readline(self, size: int = -1) -> bytes: 

477 if self._nbr == self._size: 

478 return b"" 

479 

480 # Clamp size to lowest allowed value. 

481 bytes_left = self._size - self._nbr 

482 if size > -1: 

483 size = min(bytes_left, size) 

484 else: 

485 size = bytes_left 

486 # END handle size 

487 

488 data = self._stream.readline(size) 

489 self._nbr += len(data) 

490 

491 # Handle final byte. 

492 if self._size - self._nbr == 0: 

493 self._stream.read(1) 

494 # END finish reading 

495 

496 return data 

497 

498 def readlines(self, size: int = -1) -> List[bytes]: 

499 if self._nbr == self._size: 

500 return [] 

501 

502 # Leave all additional logic to our readline method, we just check the size. 

503 out = [] 

504 nbr = 0 

505 while True: 

506 line = self.readline() 

507 if not line: 

508 break 

509 out.append(line) 

510 if size > -1: 

511 nbr += len(line) 

512 if nbr > size: 

513 break 

514 # END handle size constraint 

515 # END readline loop 

516 return out 

517 

518 # skipcq: PYL-E0301 

519 def __iter__(self) -> "Git.CatFileContentStream": 

520 return self 

521 

522 def __next__(self) -> bytes: 

523 line = self.readline() 

524 if not line: 

525 raise StopIteration 

526 

527 return line 

528 

529 next = __next__ 

530 

531 def __del__(self) -> None: 

532 bytes_left = self._size - self._nbr 

533 if bytes_left: 

534 # Read and discard - seeking is impossible within a stream. 

535 # This includes any terminating newline. 

536 self._stream.read(bytes_left + 1) 

537 # END handle incomplete read 

538 

539 

540_CatFileContentStream.__name__ = "CatFileContentStream" 

541_CatFileContentStream.__qualname__ = "Git.CatFileContentStream" 

542 

543 

544_USE_SHELL_DEFAULT_MESSAGE = ( 

545 "Git.USE_SHELL is deprecated, because only its default value of False is safe. " 

546 "It will be removed in a future release." 

547) 

548 

549_USE_SHELL_DANGER_MESSAGE = ( 

550 "Setting Git.USE_SHELL to True is unsafe and insecure, as the effect of special " 

551 "shell syntax cannot usually be accounted for. This can result in a command " 

552 "injection vulnerability and arbitrary code execution. Git.USE_SHELL is deprecated " 

553 "and will be removed in a future release." 

554) 

555 

556 

557def _warn_use_shell(*, extra_danger: bool) -> None: 

558 warnings.warn( 

559 _USE_SHELL_DANGER_MESSAGE if extra_danger else _USE_SHELL_DEFAULT_MESSAGE, 

560 DeprecationWarning, 

561 stacklevel=3, 

562 ) 

563 

564 

565class _GitMeta(type): 

566 """Metaclass for :class:`Git`. 

567 

568 This helps issue :class:`DeprecationWarning` if :attr:`Git.USE_SHELL` is used. 

569 """ 

570 

571 def __getattribute(cls, name: str) -> Any: 

572 if name == "USE_SHELL": 

573 _warn_use_shell(extra_danger=False) 

574 return super().__getattribute__(name) 

575 

576 def __setattr(cls, name: str, value: Any) -> Any: 

577 if name == "USE_SHELL": 

578 _warn_use_shell(extra_danger=value) 

579 super().__setattr__(name, value) 

580 

581 if not TYPE_CHECKING: 

582 # To preserve static checking for undefined/misspelled attributes while letting 

583 # the methods' bodies be type-checked, these are defined as non-special methods, 

584 # then bound to special names out of view of static type checkers. (The original 

585 # names invoke name mangling (leading "__") to avoid confusion in other scopes.) 

586 __getattribute__ = __getattribute 

587 __setattr__ = __setattr 

588 

589 

590GitMeta = _GitMeta 

591"""Alias of :class:`Git`'s metaclass, whether it is :class:`type` or a custom metaclass. 

592 

593Whether the :class:`Git` class has the default :class:`type` as its metaclass or uses a 

594custom metaclass is not documented and may change at any time. This statically checkable 

595metaclass alias is equivalent at runtime to ``type(Git)``. This should almost never be 

596used. Code that benefits from it is likely to be remain brittle even if it is used. 

597 

598In view of the :class:`Git` class's intended use and :class:`Git` objects' dynamic 

599callable attributes representing git subcommands, it rarely makes sense to inherit from 

600:class:`Git` at all. Using :class:`Git` in multiple inheritance can be especially tricky 

601to do correctly. Attempting uses of :class:`Git` where its metaclass is relevant, such 

602as when a sibling class has an unrelated metaclass and a shared lower bound metaclass 

603might have to be introduced to solve a metaclass conflict, is not recommended. 

604 

605:note: 

606 The correct static type of the :class:`Git` class itself, and any subclasses, is 

607 ``Type[Git]``. (This can be written as ``type[Git]`` in Python 3.9 later.) 

608 

609 :class:`GitMeta` should never be used in any annotation where ``Type[Git]`` is 

610 intended or otherwise possible to use. This alias is truly only for very rare and 

611 inherently precarious situations where it is necessary to deal with the metaclass 

612 explicitly. 

613""" 

614 

615 

616class Git(metaclass=_GitMeta): 

617 """The Git class manages communication with the Git binary. 

618 

619 It provides a convenient interface to calling the Git binary, such as in:: 

620 

621 g = Git( git_dir ) 

622 g.init() # calls 'git init' program 

623 rval = g.ls_files() # calls 'git ls-files' program 

624 

625 Debugging: 

626 

627 * Set the :envvar:`GIT_PYTHON_TRACE` environment variable to print each invocation 

628 of the command to stdout. 

629 * Set its value to ``full`` to see details about the returned values. 

630 """ 

631 

632 __slots__ = ( 

633 "_working_dir", 

634 "cat_file_all", 

635 "cat_file_header", 

636 "_version_info", 

637 "_version_info_token", 

638 "_git_options", 

639 "_persistent_git_options", 

640 "_environment", 

641 ) 

642 

643 _excluded_ = ( 

644 "cat_file_all", 

645 "cat_file_header", 

646 "_version_info", 

647 "_version_info_token", 

648 ) 

649 

650 re_unsafe_protocol = re.compile(r"(.+)::.+") 

651 

652 def __getstate__(self) -> Dict[str, Any]: 

653 return slots_to_dict(self, exclude=self._excluded_) 

654 

655 def __setstate__(self, d: Dict[str, Any]) -> None: 

656 dict_to_slots_and__excluded_are_none(self, d, excluded=self._excluded_) 

657 

658 # CONFIGURATION 

659 

660 git_exec_name = "git" 

661 """Default git command that should work on Linux, Windows, and other systems.""" 

662 

663 GIT_PYTHON_TRACE = os.environ.get("GIT_PYTHON_TRACE", False) 

664 """Enables debugging of GitPython's git commands.""" 

665 

666 USE_SHELL: bool = False 

667 """Deprecated. If set to ``True``, a shell will be used when executing git commands. 

668 

669 Code that uses ``USE_SHELL = True`` or that passes ``shell=True`` to any GitPython 

670 functions should be updated to use the default value of ``False`` instead. ``True`` 

671 is unsafe unless the effect of syntax treated specially by the shell is fully 

672 considered and accounted for, which is not possible under most circumstances. As 

673 detailed below, it is also no longer needed, even where it had been in the past. 

674 

675 It is in many if not most cases a command injection vulnerability for an application 

676 to set :attr:`USE_SHELL` to ``True``. Any attacker who can cause a specially crafted 

677 fragment of text to make its way into any part of any argument to any git command 

678 (including paths, branch names, etc.) can cause the shell to read and write 

679 arbitrary files and execute arbitrary commands. Innocent input may also accidentally 

680 contain special shell syntax, leading to inadvertent malfunctions. 

681 

682 In addition, how a value of ``True`` interacts with some aspects of GitPython's 

683 operation is not precisely specified and may change without warning, even before 

684 GitPython 4.0.0 when :attr:`USE_SHELL` may be removed. This includes: 

685 

686 * Whether or how GitPython automatically customizes the shell environment. 

687 

688 * Whether, outside of Windows (where :class:`subprocess.Popen` supports lists of 

689 separate arguments even when ``shell=True``), this can be used with any GitPython 

690 functionality other than direct calls to the :meth:`execute` method. 

691 

692 * Whether any GitPython feature that runs git commands ever attempts to partially 

693 sanitize data a shell may treat specially. Currently this is not done. 

694 

695 Prior to GitPython 2.0.8, this had a narrow purpose in suppressing console windows 

696 in graphical Windows applications. In 2.0.8 and higher, it provides no benefit, as 

697 GitPython solves that problem more robustly and safely by using the 

698 ``CREATE_NO_WINDOW`` process creation flag on Windows. 

699 

700 Because Windows path search differs subtly based on whether a shell is used, in rare 

701 cases changing this from ``True`` to ``False`` may keep an unusual git "executable", 

702 such as a batch file, from being found. To fix this, set the command name or full 

703 path in the :envvar:`GIT_PYTHON_GIT_EXECUTABLE` environment variable or pass the 

704 full path to :func:`git.refresh` (or invoke the script using a ``.exe`` shim). 

705 

706 Further reading: 

707 

708 * :meth:`Git.execute` (on the ``shell`` parameter). 

709 * https://github.com/gitpython-developers/GitPython/commit/0d9390866f9ce42870d3116094cd49e0019a970a 

710 * https://learn.microsoft.com/en-us/windows/win32/procthread/process-creation-flags 

711 * https://github.com/python/cpython/issues/91558#issuecomment-1100942950 

712 * https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-createprocessw 

713 """ 

714 

715 _git_exec_env_var = "GIT_PYTHON_GIT_EXECUTABLE" 

716 _refresh_env_var = "GIT_PYTHON_REFRESH" 

717 

718 GIT_PYTHON_GIT_EXECUTABLE = None 

719 """Provide the full path to the git executable. Otherwise it assumes git is in the 

720 executable search path. 

721 

722 :note: 

723 The git executable is actually found during the refresh step in the top level 

724 ``__init__``. It can also be changed by explicitly calling :func:`git.refresh`. 

725 """ 

726 

727 _refresh_token = object() # Since None would match an initial _version_info_token. 

728 

729 @classmethod 

730 def refresh(cls, path: Union[None, PathLike] = None) -> bool: 

731 """Update information about the git executable :class:`Git` objects will use. 

732 

733 Called by the :func:`git.refresh` function in the top level ``__init__``. 

734 

735 :param path: 

736 Optional path to the git executable. If not absolute, it is resolved 

737 immediately, relative to the current directory. (See note below.) 

738 

739 :note: 

740 The top-level :func:`git.refresh` should be preferred because it calls this 

741 method and may also update other state accordingly. 

742 

743 :note: 

744 There are three different ways to specify the command that refreshing causes 

745 to be used for git: 

746 

747 1. Pass no `path` argument and do not set the 

748 :envvar:`GIT_PYTHON_GIT_EXECUTABLE` environment variable. The command 

749 name ``git`` is used. It is looked up in a path search by the system, in 

750 each command run (roughly similar to how git is found when running 

751 ``git`` commands manually). This is usually the desired behavior. 

752 

753 2. Pass no `path` argument but set the :envvar:`GIT_PYTHON_GIT_EXECUTABLE` 

754 environment variable. The command given as the value of that variable is 

755 used. This may be a simple command or an arbitrary path. It is looked up 

756 in each command run. Setting :envvar:`GIT_PYTHON_GIT_EXECUTABLE` to 

757 ``git`` has the same effect as not setting it. 

758 

759 3. Pass a `path` argument. This path, if not absolute, is immediately 

760 resolved, relative to the current directory. This resolution occurs at 

761 the time of the refresh. When git commands are run, they are run using 

762 that previously resolved path. If a `path` argument is passed, the 

763 :envvar:`GIT_PYTHON_GIT_EXECUTABLE` environment variable is not 

764 consulted. 

765 

766 :note: 

767 Refreshing always sets the :attr:`Git.GIT_PYTHON_GIT_EXECUTABLE` class 

768 attribute, which can be read on the :class:`Git` class or any of its 

769 instances to check what command is used to run git. This attribute should 

770 not be confused with the related :envvar:`GIT_PYTHON_GIT_EXECUTABLE` 

771 environment variable. The class attribute is set no matter how refreshing is 

772 performed. 

773 """ 

774 # Discern which path to refresh with. 

775 if path is not None: 

776 new_git = os.path.expanduser(path) 

777 new_git = os.path.abspath(new_git) 

778 else: 

779 new_git = os.environ.get(cls._git_exec_env_var, cls.git_exec_name) 

780 

781 # Keep track of the old and new git executable path. 

782 old_git = cls.GIT_PYTHON_GIT_EXECUTABLE 

783 old_refresh_token = cls._refresh_token 

784 cls.GIT_PYTHON_GIT_EXECUTABLE = new_git 

785 cls._refresh_token = object() 

786 

787 # Test if the new git executable path is valid. A GitCommandNotFound error is 

788 # raised by us. A PermissionError is raised if the git executable cannot be 

789 # executed for whatever reason. 

790 has_git = False 

791 try: 

792 cls().version() 

793 has_git = True 

794 except (GitCommandNotFound, PermissionError): 

795 pass 

796 

797 # Warn or raise exception if test failed. 

798 if not has_git: 

799 err = ( 

800 dedent( 

801 """\ 

802 Bad git executable. 

803 The git executable must be specified in one of the following ways: 

804 - be included in your $PATH 

805 - be set via $%s 

806 - explicitly set via git.refresh(<full-path-to-git-executable>) 

807 """ 

808 ) 

809 % cls._git_exec_env_var 

810 ) 

811 

812 # Revert to whatever the old_git was. 

813 cls.GIT_PYTHON_GIT_EXECUTABLE = old_git 

814 cls._refresh_token = old_refresh_token 

815 

816 if old_git is None: 

817 # On the first refresh (when GIT_PYTHON_GIT_EXECUTABLE is None) we only 

818 # are quiet, warn, or error depending on the GIT_PYTHON_REFRESH value. 

819 

820 # Determine what the user wants to happen during the initial refresh. We 

821 # expect GIT_PYTHON_REFRESH to either be unset or be one of the 

822 # following values: 

823 # 

824 # 0|q|quiet|s|silence|silent|n|none 

825 # 1|w|warn|warning|l|log 

826 # 2|r|raise|e|error|exception 

827 

828 mode = os.environ.get(cls._refresh_env_var, "raise").lower() 

829 

830 quiet = ["quiet", "q", "silence", "s", "silent", "none", "n", "0"] 

831 warn = ["warn", "w", "warning", "log", "l", "1"] 

832 error = ["error", "e", "exception", "raise", "r", "2"] 

833 

834 if mode in quiet: 

835 pass 

836 elif mode in warn or mode in error: 

837 err = dedent( 

838 """\ 

839 %s 

840 All git commands will error until this is rectified. 

841 

842 This initial message can be silenced or aggravated in the future by setting the 

843 $%s environment variable. Use one of the following values: 

844 - %s: for no message or exception 

845 - %s: for a warning message (logging level CRITICAL, displayed by default) 

846 - %s: for a raised exception 

847 

848 Example: 

849 export %s=%s 

850 """ 

851 ) % ( 

852 err, 

853 cls._refresh_env_var, 

854 "|".join(quiet), 

855 "|".join(warn), 

856 "|".join(error), 

857 cls._refresh_env_var, 

858 quiet[0], 

859 ) 

860 

861 if mode in warn: 

862 _logger.critical(err) 

863 else: 

864 raise ImportError(err) 

865 else: 

866 err = dedent( 

867 """\ 

868 %s environment variable has been set but it has been set with an invalid value. 

869 

870 Use only the following values: 

871 - %s: for no message or exception 

872 - %s: for a warning message (logging level CRITICAL, displayed by default) 

873 - %s: for a raised exception 

874 """ 

875 ) % ( 

876 cls._refresh_env_var, 

877 "|".join(quiet), 

878 "|".join(warn), 

879 "|".join(error), 

880 ) 

881 raise ImportError(err) 

882 

883 # We get here if this was the initial refresh and the refresh mode was 

884 # not error. Go ahead and set the GIT_PYTHON_GIT_EXECUTABLE such that we 

885 # discern the difference between the first refresh at import time 

886 # and subsequent calls to git.refresh or this refresh method. 

887 cls.GIT_PYTHON_GIT_EXECUTABLE = cls.git_exec_name 

888 else: 

889 # After the first refresh (when GIT_PYTHON_GIT_EXECUTABLE is no longer 

890 # None) we raise an exception. 

891 raise GitCommandNotFound(new_git, err) 

892 

893 return has_git 

894 

895 @classmethod 

896 def is_cygwin(cls) -> bool: 

897 return is_cygwin_git(cls.GIT_PYTHON_GIT_EXECUTABLE) 

898 

899 @overload 

900 @classmethod 

901 def polish_url(cls, url: str, is_cygwin: Literal[False] = ...) -> str: ... 

902 

903 @overload 

904 @classmethod 

905 def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> str: ... 

906 

907 @classmethod 

908 def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> PathLike: 

909 """Remove any backslashes from URLs to be written in config files. 

910 

911 Windows might create config files containing paths with backslashes, but git 

912 stops liking them as it will escape the backslashes. Hence we undo the escaping 

913 just to be sure. 

914 """ 

915 if is_cygwin is None: 

916 is_cygwin = cls.is_cygwin() 

917 

918 if is_cygwin: 

919 url = cygpath(url) 

920 else: 

921 url = os.path.expandvars(url) 

922 if url.startswith("~"): 

923 url = os.path.expanduser(url) 

924 url = url.replace("\\\\", "\\").replace("\\", "/") 

925 return url 

926 

927 @classmethod 

928 def check_unsafe_protocols(cls, url: str) -> None: 

929 """Check for unsafe protocols. 

930 

931 Apart from the usual protocols (http, git, ssh), Git allows "remote helpers" 

932 that have the form ``<transport>::<address>``. One of these helpers (``ext::``) 

933 can be used to invoke any arbitrary command. 

934 

935 See: 

936 

937 - https://git-scm.com/docs/gitremote-helpers 

938 - https://git-scm.com/docs/git-remote-ext 

939 """ 

940 match = cls.re_unsafe_protocol.match(url) 

941 if match: 

942 protocol = match.group(1) 

943 raise UnsafeProtocolError( 

944 f"The `{protocol}::` protocol looks suspicious, use `allow_unsafe_protocols=True` to allow it." 

945 ) 

946 

947 @classmethod 

948 def check_unsafe_options(cls, options: List[str], unsafe_options: List[str]) -> None: 

949 """Check for unsafe options. 

950 

951 Some options that are passed to ``git <command>`` can be used to execute 

952 arbitrary commands. These are blocked by default. 

953 """ 

954 # Options can be of the form `foo`, `--foo bar`, or `--foo=bar`, so we need to 

955 # check if they start with "--foo" or if they are equal to "foo". 

956 bare_unsafe_options = [option.lstrip("-") for option in unsafe_options] 

957 for option in options: 

958 for unsafe_option, bare_option in zip(unsafe_options, bare_unsafe_options): 

959 if option.startswith(unsafe_option) or option == bare_option: 

960 raise UnsafeOptionError( 

961 f"{unsafe_option} is not allowed, use `allow_unsafe_options=True` to allow it." 

962 ) 

963 

964 AutoInterrupt: TypeAlias = _AutoInterrupt 

965 

966 CatFileContentStream: TypeAlias = _CatFileContentStream 

967 

968 def __init__(self, working_dir: Union[None, PathLike] = None) -> None: 

969 """Initialize this instance with: 

970 

971 :param working_dir: 

972 Git directory we should work in. If ``None``, we always work in the current 

973 directory as returned by :func:`os.getcwd`. 

974 This is meant to be the working tree directory if available, or the 

975 ``.git`` directory in case of bare repositories. 

976 """ 

977 super().__init__() 

978 self._working_dir = expand_path(working_dir) 

979 self._git_options: Union[List[str], Tuple[str, ...]] = () 

980 self._persistent_git_options: List[str] = [] 

981 

982 # Extra environment variables to pass to git commands 

983 self._environment: Dict[str, str] = {} 

984 

985 # Cached version slots 

986 self._version_info: Union[Tuple[int, ...], None] = None 

987 self._version_info_token: object = None 

988 

989 # Cached command slots 

990 self.cat_file_header: Union[None, TBD] = None 

991 self.cat_file_all: Union[None, TBD] = None 

992 

993 def __getattribute__(self, name: str) -> Any: 

994 if name == "USE_SHELL": 

995 _warn_use_shell(extra_danger=False) 

996 return super().__getattribute__(name) 

997 

998 def __getattr__(self, name: str) -> Any: 

999 """A convenience method as it allows to call the command as if it was an object. 

1000 

1001 :return: 

1002 Callable object that will execute call :meth:`_call_process` with your 

1003 arguments. 

1004 """ 

1005 if name.startswith("_"): 

1006 return super().__getattribute__(name) 

1007 return lambda *args, **kwargs: self._call_process(name, *args, **kwargs) 

1008 

1009 def set_persistent_git_options(self, **kwargs: Any) -> None: 

1010 """Specify command line options to the git executable for subsequent 

1011 subcommand calls. 

1012 

1013 :param kwargs: 

1014 A dict of keyword arguments. 

1015 These arguments are passed as in :meth:`_call_process`, but will be passed 

1016 to the git command rather than the subcommand. 

1017 """ 

1018 

1019 self._persistent_git_options = self.transform_kwargs(split_single_char_options=True, **kwargs) 

1020 

1021 @property 

1022 def working_dir(self) -> Union[None, PathLike]: 

1023 """:return: Git directory we are working on""" 

1024 return self._working_dir 

1025 

1026 @property 

1027 def version_info(self) -> Tuple[int, ...]: 

1028 """ 

1029 :return: Tuple with integers representing the major, minor and additional 

1030 version numbers as parsed from :manpage:`git-version(1)`. Up to four fields 

1031 are used. 

1032 

1033 This value is generated on demand and is cached. 

1034 """ 

1035 # Refreshing is global, but version_info caching is per-instance. 

1036 refresh_token = self._refresh_token # Copy token in case of concurrent refresh. 

1037 

1038 # Use the cached version if obtained after the most recent refresh. 

1039 if self._version_info_token is refresh_token: 

1040 assert self._version_info is not None, "Bug: corrupted token-check state" 

1041 return self._version_info 

1042 

1043 # Run "git version" and parse it. 

1044 process_version = self._call_process("version") 

1045 version_string = process_version.split(" ")[2] 

1046 version_fields = version_string.split(".")[:4] 

1047 leading_numeric_fields = itertools.takewhile(str.isdigit, version_fields) 

1048 self._version_info = tuple(map(int, leading_numeric_fields)) 

1049 

1050 # This value will be considered valid until the next refresh. 

1051 self._version_info_token = refresh_token 

1052 return self._version_info 

1053 

1054 @overload 

1055 def execute( 

1056 self, 

1057 command: Union[str, Sequence[Any]], 

1058 *, 

1059 as_process: Literal[True], 

1060 ) -> "AutoInterrupt": ... 

1061 

1062 @overload 

1063 def execute( 

1064 self, 

1065 command: Union[str, Sequence[Any]], 

1066 *, 

1067 as_process: Literal[False] = False, 

1068 stdout_as_string: Literal[True], 

1069 ) -> Union[str, Tuple[int, str, str]]: ... 

1070 

1071 @overload 

1072 def execute( 

1073 self, 

1074 command: Union[str, Sequence[Any]], 

1075 *, 

1076 as_process: Literal[False] = False, 

1077 stdout_as_string: Literal[False] = False, 

1078 ) -> Union[bytes, Tuple[int, bytes, str]]: ... 

1079 

1080 @overload 

1081 def execute( 

1082 self, 

1083 command: Union[str, Sequence[Any]], 

1084 *, 

1085 with_extended_output: Literal[False], 

1086 as_process: Literal[False], 

1087 stdout_as_string: Literal[True], 

1088 ) -> str: ... 

1089 

1090 @overload 

1091 def execute( 

1092 self, 

1093 command: Union[str, Sequence[Any]], 

1094 *, 

1095 with_extended_output: Literal[False], 

1096 as_process: Literal[False], 

1097 stdout_as_string: Literal[False], 

1098 ) -> bytes: ... 

1099 

1100 def execute( 

1101 self, 

1102 command: Union[str, Sequence[Any]], 

1103 istream: Union[None, BinaryIO] = None, 

1104 with_extended_output: bool = False, 

1105 with_exceptions: bool = True, 

1106 as_process: bool = False, 

1107 output_stream: Union[None, BinaryIO] = None, 

1108 stdout_as_string: bool = True, 

1109 kill_after_timeout: Union[None, float] = None, 

1110 with_stdout: bool = True, 

1111 universal_newlines: bool = False, 

1112 shell: Union[None, bool] = None, 

1113 env: Union[None, Mapping[str, str]] = None, 

1114 max_chunk_size: int = io.DEFAULT_BUFFER_SIZE, 

1115 strip_newline_in_stdout: bool = True, 

1116 **subprocess_kwargs: Any, 

1117 ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], AutoInterrupt]: 

1118 R"""Handle executing the command, and consume and return the returned 

1119 information (stdout). 

1120 

1121 :param command: 

1122 The command argument list to execute. 

1123 It should be a sequence of program arguments, or a string. The 

1124 program to execute is the first item in the args sequence or string. 

1125 

1126 :param istream: 

1127 Standard input filehandle passed to :class:`subprocess.Popen`. 

1128 

1129 :param with_extended_output: 

1130 Whether to return a (status, stdout, stderr) tuple. 

1131 

1132 :param with_exceptions: 

1133 Whether to raise an exception when git returns a non-zero status. 

1134 

1135 :param as_process: 

1136 Whether to return the created process instance directly from which 

1137 streams can be read on demand. This will render `with_extended_output` 

1138 and `with_exceptions` ineffective - the caller will have to deal with 

1139 the details. It is important to note that the process will be placed 

1140 into an :class:`AutoInterrupt` wrapper that will interrupt the process 

1141 once it goes out of scope. If you use the command in iterators, you 

1142 should pass the whole process instance instead of a single stream. 

1143 

1144 :param output_stream: 

1145 If set to a file-like object, data produced by the git command will be 

1146 copied to the given stream instead of being returned as a string. 

1147 This feature only has any effect if `as_process` is ``False``. 

1148 

1149 :param stdout_as_string: 

1150 If ``False``, the command's standard output will be bytes. Otherwise, it 

1151 will be decoded into a string using the default encoding (usually UTF-8). 

1152 The latter can fail, if the output contains binary data. 

1153 

1154 :param kill_after_timeout: 

1155 Specifies a timeout in seconds for the git command, after which the process 

1156 should be killed. This will have no effect if `as_process` is set to 

1157 ``True``. It is set to ``None`` by default and will let the process run 

1158 until the timeout is explicitly specified. Uses of this feature should be 

1159 carefully considered, due to the following limitations: 

1160 

1161 1. This feature is not supported at all on Windows. 

1162 2. Effectiveness may vary by operating system. ``ps --ppid`` is used to 

1163 enumerate child processes, which is available on most GNU/Linux systems 

1164 but not most others. 

1165 3. Deeper descendants do not receive signals, though they may sometimes 

1166 terminate as a consequence of their parent processes being killed. 

1167 4. `kill_after_timeout` uses ``SIGKILL``, which can have negative side 

1168 effects on a repository. For example, stale locks in case of 

1169 :manpage:`git-gc(1)` could render the repository incapable of accepting 

1170 changes until the lock is manually removed. 

1171 

1172 :param with_stdout: 

1173 If ``True``, default ``True``, we open stdout on the created process. 

1174 

1175 :param universal_newlines: 

1176 If ``True``, pipes will be opened as text, and lines are split at all known 

1177 line endings. 

1178 

1179 :param shell: 

1180 Whether to invoke commands through a shell 

1181 (see :class:`Popen(..., shell=True) <subprocess.Popen>`). 

1182 If this is not ``None``, it overrides :attr:`USE_SHELL`. 

1183 

1184 Passing ``shell=True`` to this or any other GitPython function should be 

1185 avoided, as it is unsafe under most circumstances. This is because it is 

1186 typically not feasible to fully consider and account for the effect of shell 

1187 expansions, especially when passing ``shell=True`` to other methods that 

1188 forward it to :meth:`Git.execute`. Passing ``shell=True`` is also no longer 

1189 needed (nor useful) to work around any known operating system specific 

1190 issues. 

1191 

1192 :param env: 

1193 A dictionary of environment variables to be passed to 

1194 :class:`subprocess.Popen`. 

1195 

1196 :param max_chunk_size: 

1197 Maximum number of bytes in one chunk of data passed to the `output_stream` 

1198 in one invocation of its ``write()`` method. If the given number is not 

1199 positive then the default value is used. 

1200 

1201 :param strip_newline_in_stdout: 

1202 Whether to strip the trailing ``\n`` of the command stdout. 

1203 

1204 :param subprocess_kwargs: 

1205 Keyword arguments to be passed to :class:`subprocess.Popen`. Please note 

1206 that some of the valid kwargs are already set by this method; the ones you 

1207 specify may not be the same ones. 

1208 

1209 :return: 

1210 * str(output), if `extended_output` is ``False`` (Default) 

1211 * tuple(int(status), str(stdout), str(stderr)), 

1212 if `extended_output` is ``True`` 

1213 

1214 If `output_stream` is ``True``, the stdout value will be your output stream: 

1215 

1216 * output_stream, if `extended_output` is ``False`` 

1217 * tuple(int(status), output_stream, str(stderr)), 

1218 if `extended_output` is ``True`` 

1219 

1220 Note that git is executed with ``LC_MESSAGES="C"`` to ensure consistent 

1221 output regardless of system language. 

1222 

1223 :raise git.exc.GitCommandError: 

1224 

1225 :note: 

1226 If you add additional keyword arguments to the signature of this method, you 

1227 must update the ``execute_kwargs`` variable housed in this module. 

1228 """ 

1229 # Remove password for the command if present. 

1230 redacted_command = remove_password_if_present(command) 

1231 if self.GIT_PYTHON_TRACE and (self.GIT_PYTHON_TRACE != "full" or as_process): 

1232 _logger.info(" ".join(redacted_command)) 

1233 

1234 # Allow the user to have the command executed in their working dir. 

1235 try: 

1236 cwd = self._working_dir or os.getcwd() # type: Union[None, str] 

1237 if not os.access(str(cwd), os.X_OK): 

1238 cwd = None 

1239 except FileNotFoundError: 

1240 cwd = None 

1241 

1242 # Start the process. 

1243 inline_env = env 

1244 env = os.environ.copy() 

1245 # Attempt to force all output to plain ASCII English, which is what some parsing 

1246 # code may expect. 

1247 # According to https://askubuntu.com/a/311796, we are setting LANGUAGE as well 

1248 # just to be sure. 

1249 env["LANGUAGE"] = "C" 

1250 env["LC_ALL"] = "C" 

1251 env.update(self._environment) 

1252 if inline_env is not None: 

1253 env.update(inline_env) 

1254 

1255 if sys.platform == "win32": 

1256 if kill_after_timeout is not None: 

1257 raise GitCommandError( 

1258 redacted_command, 

1259 '"kill_after_timeout" feature is not supported on Windows.', 

1260 ) 

1261 cmd_not_found_exception = OSError 

1262 else: 

1263 cmd_not_found_exception = FileNotFoundError 

1264 # END handle 

1265 

1266 stdout_sink = PIPE if with_stdout else getattr(subprocess, "DEVNULL", None) or open(os.devnull, "wb") 

1267 if shell is None: 

1268 # Get the value of USE_SHELL with no deprecation warning. Do this without 

1269 # warnings.catch_warnings, to avoid a race condition with application code 

1270 # configuring warnings. The value could be looked up in type(self).__dict__ 

1271 # or Git.__dict__, but those can break under some circumstances. This works 

1272 # the same as self.USE_SHELL in more situations; see Git.__getattribute__. 

1273 shell = super().__getattribute__("USE_SHELL") 

1274 _logger.debug( 

1275 "Popen(%s, cwd=%s, stdin=%s, shell=%s, universal_newlines=%s)", 

1276 redacted_command, 

1277 cwd, 

1278 "<valid stream>" if istream else "None", 

1279 shell, 

1280 universal_newlines, 

1281 ) 

1282 try: 

1283 proc = safer_popen( 

1284 command, 

1285 env=env, 

1286 cwd=cwd, 

1287 bufsize=-1, 

1288 stdin=(istream or DEVNULL), 

1289 stderr=PIPE, 

1290 stdout=stdout_sink, 

1291 shell=shell, 

1292 universal_newlines=universal_newlines, 

1293 encoding=defenc if universal_newlines else None, 

1294 **subprocess_kwargs, 

1295 ) 

1296 except cmd_not_found_exception as err: 

1297 raise GitCommandNotFound(redacted_command, err) from err 

1298 else: 

1299 # Replace with a typeguard for Popen[bytes]? 

1300 proc.stdout = cast(BinaryIO, proc.stdout) 

1301 proc.stderr = cast(BinaryIO, proc.stderr) 

1302 

1303 if as_process: 

1304 return self.AutoInterrupt(proc, command) 

1305 

1306 if sys.platform != "win32" and kill_after_timeout is not None: 

1307 # Help mypy figure out this is not None even when used inside communicate(). 

1308 timeout = kill_after_timeout 

1309 

1310 def kill_process(pid: int) -> None: 

1311 """Callback to kill a process. 

1312 

1313 This callback implementation would be ineffective and unsafe on Windows. 

1314 """ 

1315 p = Popen(["ps", "--ppid", str(pid)], stdout=PIPE) 

1316 child_pids = [] 

1317 if p.stdout is not None: 

1318 for line in p.stdout: 

1319 if len(line.split()) > 0: 

1320 local_pid = (line.split())[0] 

1321 if local_pid.isdigit(): 

1322 child_pids.append(int(local_pid)) 

1323 try: 

1324 os.kill(pid, signal.SIGKILL) 

1325 for child_pid in child_pids: 

1326 try: 

1327 os.kill(child_pid, signal.SIGKILL) 

1328 except OSError: 

1329 pass 

1330 # Tell the main routine that the process was killed. 

1331 kill_check.set() 

1332 except OSError: 

1333 # It is possible that the process gets completed in the duration 

1334 # after timeout happens and before we try to kill the process. 

1335 pass 

1336 return 

1337 

1338 def communicate() -> Tuple[AnyStr, AnyStr]: 

1339 watchdog.start() 

1340 out, err = proc.communicate() 

1341 watchdog.cancel() 

1342 if kill_check.is_set(): 

1343 err = 'Timeout: the command "%s" did not complete in %d secs.' % ( 

1344 " ".join(redacted_command), 

1345 timeout, 

1346 ) 

1347 if not universal_newlines: 

1348 err = err.encode(defenc) 

1349 return out, err 

1350 

1351 # END helpers 

1352 

1353 kill_check = threading.Event() 

1354 watchdog = threading.Timer(timeout, kill_process, args=(proc.pid,)) 

1355 else: 

1356 communicate = proc.communicate 

1357 

1358 # Wait for the process to return. 

1359 status = 0 

1360 stdout_value: Union[str, bytes] = b"" 

1361 stderr_value: Union[str, bytes] = b"" 

1362 newline = "\n" if universal_newlines else b"\n" 

1363 try: 

1364 if output_stream is None: 

1365 stdout_value, stderr_value = communicate() 

1366 # Strip trailing "\n". 

1367 if stdout_value.endswith(newline) and strip_newline_in_stdout: # type: ignore[arg-type] 

1368 stdout_value = stdout_value[:-1] 

1369 if stderr_value.endswith(newline): # type: ignore[arg-type] 

1370 stderr_value = stderr_value[:-1] 

1371 

1372 status = proc.returncode 

1373 else: 

1374 max_chunk_size = max_chunk_size if max_chunk_size and max_chunk_size > 0 else io.DEFAULT_BUFFER_SIZE 

1375 stream_copy(proc.stdout, output_stream, max_chunk_size) 

1376 stdout_value = proc.stdout.read() 

1377 stderr_value = proc.stderr.read() 

1378 # Strip trailing "\n". 

1379 if stderr_value.endswith(newline): # type: ignore[arg-type] 

1380 stderr_value = stderr_value[:-1] 

1381 status = proc.wait() 

1382 # END stdout handling 

1383 finally: 

1384 proc.stdout.close() 

1385 proc.stderr.close() 

1386 

1387 if self.GIT_PYTHON_TRACE == "full": 

1388 cmdstr = " ".join(redacted_command) 

1389 

1390 def as_text(stdout_value: Union[bytes, str]) -> str: 

1391 return not output_stream and safe_decode(stdout_value) or "<OUTPUT_STREAM>" 

1392 

1393 # END as_text 

1394 

1395 if stderr_value: 

1396 _logger.info( 

1397 "%s -> %d; stdout: '%s'; stderr: '%s'", 

1398 cmdstr, 

1399 status, 

1400 as_text(stdout_value), 

1401 safe_decode(stderr_value), 

1402 ) 

1403 elif stdout_value: 

1404 _logger.info("%s -> %d; stdout: '%s'", cmdstr, status, as_text(stdout_value)) 

1405 else: 

1406 _logger.info("%s -> %d", cmdstr, status) 

1407 # END handle debug printing 

1408 

1409 if with_exceptions and status != 0: 

1410 raise GitCommandError(redacted_command, status, stderr_value, stdout_value) 

1411 

1412 if isinstance(stdout_value, bytes) and stdout_as_string: # Could also be output_stream. 

1413 stdout_value = safe_decode(stdout_value) 

1414 

1415 # Allow access to the command's status code. 

1416 if with_extended_output: 

1417 return (status, stdout_value, safe_decode(stderr_value)) 

1418 else: 

1419 return stdout_value 

1420 

1421 def environment(self) -> Dict[str, str]: 

1422 return self._environment 

1423 

1424 def update_environment(self, **kwargs: Any) -> Dict[str, Union[str, None]]: 

1425 """Set environment variables for future git invocations. Return all changed 

1426 values in a format that can be passed back into this function to revert the 

1427 changes. 

1428 

1429 Examples:: 

1430 

1431 old_env = self.update_environment(PWD='/tmp') 

1432 self.update_environment(**old_env) 

1433 

1434 :param kwargs: 

1435 Environment variables to use for git processes. 

1436 

1437 :return: 

1438 Dict that maps environment variables to their old values 

1439 """ 

1440 old_env = {} 

1441 for key, value in kwargs.items(): 

1442 # Set value if it is None. 

1443 if value is not None: 

1444 old_env[key] = self._environment.get(key) 

1445 self._environment[key] = value 

1446 # Remove key from environment if its value is None. 

1447 elif key in self._environment: 

1448 old_env[key] = self._environment[key] 

1449 del self._environment[key] 

1450 return old_env 

1451 

1452 @contextlib.contextmanager 

1453 def custom_environment(self, **kwargs: Any) -> Iterator[None]: 

1454 """A context manager around the above :meth:`update_environment` method to 

1455 restore the environment back to its previous state after operation. 

1456 

1457 Examples:: 

1458 

1459 with self.custom_environment(GIT_SSH='/bin/ssh_wrapper'): 

1460 repo.remotes.origin.fetch() 

1461 

1462 :param kwargs: 

1463 See :meth:`update_environment`. 

1464 """ 

1465 old_env = self.update_environment(**kwargs) 

1466 try: 

1467 yield 

1468 finally: 

1469 self.update_environment(**old_env) 

1470 

1471 def transform_kwarg(self, name: str, value: Any, split_single_char_options: bool) -> List[str]: 

1472 if len(name) == 1: 

1473 if value is True: 

1474 return ["-%s" % name] 

1475 elif value not in (False, None): 

1476 if split_single_char_options: 

1477 return ["-%s" % name, "%s" % value] 

1478 else: 

1479 return ["-%s%s" % (name, value)] 

1480 else: 

1481 if value is True: 

1482 return ["--%s" % dashify(name)] 

1483 elif value is not False and value is not None: 

1484 return ["--%s=%s" % (dashify(name), value)] 

1485 return [] 

1486 

1487 def transform_kwargs(self, split_single_char_options: bool = True, **kwargs: Any) -> List[str]: 

1488 """Transform Python-style kwargs into git command line options.""" 

1489 args = [] 

1490 for k, v in kwargs.items(): 

1491 if isinstance(v, (list, tuple)): 

1492 for value in v: 

1493 args += self.transform_kwarg(k, value, split_single_char_options) 

1494 else: 

1495 args += self.transform_kwarg(k, v, split_single_char_options) 

1496 return args 

1497 

1498 @classmethod 

1499 def _unpack_args(cls, arg_list: Sequence[str]) -> List[str]: 

1500 outlist = [] 

1501 if isinstance(arg_list, (list, tuple)): 

1502 for arg in arg_list: 

1503 outlist.extend(cls._unpack_args(arg)) 

1504 else: 

1505 outlist.append(str(arg_list)) 

1506 

1507 return outlist 

1508 

1509 def __call__(self, **kwargs: Any) -> "Git": 

1510 """Specify command line options to the git executable for a subcommand call. 

1511 

1512 :param kwargs: 

1513 A dict of keyword arguments. 

1514 These arguments are passed as in :meth:`_call_process`, but will be passed 

1515 to the git command rather than the subcommand. 

1516 

1517 Examples:: 

1518 

1519 git(work_tree='/tmp').difftool() 

1520 """ 

1521 self._git_options = self.transform_kwargs(split_single_char_options=True, **kwargs) 

1522 return self 

1523 

1524 @overload 

1525 def _call_process( 

1526 self, method: str, *args: None, **kwargs: None 

1527 ) -> str: ... # If no args were given, execute the call with all defaults. 

1528 

1529 @overload 

1530 def _call_process( 

1531 self, 

1532 method: str, 

1533 istream: int, 

1534 as_process: Literal[True], 

1535 *args: Any, 

1536 **kwargs: Any, 

1537 ) -> "Git.AutoInterrupt": ... 

1538 

1539 @overload 

1540 def _call_process( 

1541 self, method: str, *args: Any, **kwargs: Any 

1542 ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]: ... 

1543 

1544 def _call_process( 

1545 self, method: str, *args: Any, **kwargs: Any 

1546 ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]: 

1547 """Run the given git command with the specified arguments and return the result 

1548 as a string. 

1549 

1550 :param method: 

1551 The command. Contained ``_`` characters will be converted to hyphens, such 

1552 as in ``ls_files`` to call ``ls-files``. 

1553 

1554 :param args: 

1555 The list of arguments. If ``None`` is included, it will be pruned. 

1556 This allows your commands to call git more conveniently, as ``None`` is 

1557 realized as non-existent. 

1558 

1559 :param kwargs: 

1560 Contains key-values for the following: 

1561 

1562 - The :meth:`execute()` kwds, as listed in ``execute_kwargs``. 

1563 - "Command options" to be converted by :meth:`transform_kwargs`. 

1564 - The ``insert_kwargs_after`` key which its value must match one of 

1565 ``*args``. 

1566 

1567 It also contains any command options, to be appended after the matched arg. 

1568 

1569 Examples:: 

1570 

1571 git.rev_list('master', max_count=10, header=True) 

1572 

1573 turns into:: 

1574 

1575 git rev-list max-count 10 --header master 

1576 

1577 :return: 

1578 Same as :meth:`execute`. If no args are given, used :meth:`execute`'s 

1579 default (especially ``as_process = False``, ``stdout_as_string = True``) and 

1580 return :class:`str`. 

1581 """ 

1582 # Handle optional arguments prior to calling transform_kwargs. 

1583 # Otherwise these'll end up in args, which is bad. 

1584 exec_kwargs = {k: v for k, v in kwargs.items() if k in execute_kwargs} 

1585 opts_kwargs = {k: v for k, v in kwargs.items() if k not in execute_kwargs} 

1586 

1587 insert_after_this_arg = opts_kwargs.pop("insert_kwargs_after", None) 

1588 

1589 # Prepare the argument list. 

1590 

1591 opt_args = self.transform_kwargs(**opts_kwargs) 

1592 ext_args = self._unpack_args([a for a in args if a is not None]) 

1593 

1594 if insert_after_this_arg is None: 

1595 args_list = opt_args + ext_args 

1596 else: 

1597 try: 

1598 index = ext_args.index(insert_after_this_arg) 

1599 except ValueError as err: 

1600 raise ValueError( 

1601 "Couldn't find argument '%s' in args %s to insert cmd options after" 

1602 % (insert_after_this_arg, str(ext_args)) 

1603 ) from err 

1604 # END handle error 

1605 args_list = ext_args[: index + 1] + opt_args + ext_args[index + 1 :] 

1606 # END handle opts_kwargs 

1607 

1608 call = [self.GIT_PYTHON_GIT_EXECUTABLE] 

1609 

1610 # Add persistent git options. 

1611 call.extend(self._persistent_git_options) 

1612 

1613 # Add the git options, then reset to empty to avoid side effects. 

1614 call.extend(self._git_options) 

1615 self._git_options = () 

1616 

1617 call.append(dashify(method)) 

1618 call.extend(args_list) 

1619 

1620 return self.execute(call, **exec_kwargs) 

1621 

1622 def _parse_object_header(self, header_line: str) -> Tuple[str, str, int]: 

1623 """ 

1624 :param header_line: 

1625 A line of the form:: 

1626 

1627 <hex_sha> type_string size_as_int 

1628 

1629 :return: 

1630 (hex_sha, type_string, size_as_int) 

1631 

1632 :raise ValueError: 

1633 If the header contains indication for an error due to incorrect input sha. 

1634 """ 

1635 tokens = header_line.split() 

1636 if len(tokens) != 3: 

1637 if not tokens: 

1638 err_msg = ( 

1639 f"SHA is empty, possible dubious ownership in the repository " 

1640 f"""at {self._working_dir}.\n If this is unintended run:\n\n """ 

1641 f""" "git config --global --add safe.directory {self._working_dir}" """ 

1642 ) 

1643 raise ValueError(err_msg) 

1644 else: 

1645 raise ValueError("SHA %s could not be resolved, git returned: %r" % (tokens[0], header_line.strip())) 

1646 # END handle actual return value 

1647 # END error handling 

1648 

1649 if len(tokens[0]) != 40: 

1650 raise ValueError("Failed to parse header: %r" % header_line) 

1651 return (tokens[0], tokens[1], int(tokens[2])) 

1652 

1653 def _prepare_ref(self, ref: AnyStr) -> bytes: 

1654 # Required for command to separate refs on stdin, as bytes. 

1655 if isinstance(ref, bytes): 

1656 # Assume 40 bytes hexsha - bin-to-ascii for some reason returns bytes, not text. 

1657 refstr: str = ref.decode("ascii") 

1658 elif not isinstance(ref, str): 

1659 refstr = str(ref) # Could be ref-object. 

1660 else: 

1661 refstr = ref 

1662 

1663 if not refstr.endswith("\n"): 

1664 refstr += "\n" 

1665 return refstr.encode(defenc) 

1666 

1667 def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any) -> "Git.AutoInterrupt": 

1668 cur_val = getattr(self, attr_name) 

1669 if cur_val is not None: 

1670 return cur_val 

1671 

1672 options = {"istream": PIPE, "as_process": True} 

1673 options.update(kwargs) 

1674 

1675 cmd = self._call_process(cmd_name, *args, **options) 

1676 setattr(self, attr_name, cmd) 

1677 cmd = cast("Git.AutoInterrupt", cmd) 

1678 return cmd 

1679 

1680 def __get_object_header(self, cmd: "Git.AutoInterrupt", ref: AnyStr) -> Tuple[str, str, int]: 

1681 if cmd.stdin and cmd.stdout: 

1682 cmd.stdin.write(self._prepare_ref(ref)) 

1683 cmd.stdin.flush() 

1684 return self._parse_object_header(cmd.stdout.readline()) 

1685 else: 

1686 raise ValueError("cmd stdin was empty") 

1687 

1688 def get_object_header(self, ref: str) -> Tuple[str, str, int]: 

1689 """Use this method to quickly examine the type and size of the object behind the 

1690 given ref. 

1691 

1692 :note: 

1693 The method will only suffer from the costs of command invocation once and 

1694 reuses the command in subsequent calls. 

1695 

1696 :return: 

1697 (hexsha, type_string, size_as_int) 

1698 """ 

1699 cmd = self._get_persistent_cmd("cat_file_header", "cat_file", batch_check=True) 

1700 return self.__get_object_header(cmd, ref) 

1701 

1702 def get_object_data(self, ref: str) -> Tuple[str, str, int, bytes]: 

1703 """Similar to :meth:`get_object_header`, but returns object data as well. 

1704 

1705 :return: 

1706 (hexsha, type_string, size_as_int, data_string) 

1707 

1708 :note: 

1709 Not threadsafe. 

1710 """ 

1711 hexsha, typename, size, stream = self.stream_object_data(ref) 

1712 data = stream.read(size) 

1713 del stream 

1714 return (hexsha, typename, size, data) 

1715 

1716 def stream_object_data(self, ref: str) -> Tuple[str, str, int, "Git.CatFileContentStream"]: 

1717 """Similar to :meth:`get_object_data`, but returns the data as a stream. 

1718 

1719 :return: 

1720 (hexsha, type_string, size_as_int, stream) 

1721 

1722 :note: 

1723 This method is not threadsafe. You need one independent :class:`Git` 

1724 instance per thread to be safe! 

1725 """ 

1726 cmd = self._get_persistent_cmd("cat_file_all", "cat_file", batch=True) 

1727 hexsha, typename, size = self.__get_object_header(cmd, ref) 

1728 cmd_stdout = cmd.stdout if cmd.stdout is not None else io.BytesIO() 

1729 return (hexsha, typename, size, self.CatFileContentStream(size, cmd_stdout)) 

1730 

1731 def clear_cache(self) -> "Git": 

1732 """Clear all kinds of internal caches to release resources. 

1733 

1734 Currently persistent commands will be interrupted. 

1735 

1736 :return: 

1737 self 

1738 """ 

1739 for cmd in (self.cat_file_all, self.cat_file_header): 

1740 if cmd: 

1741 cmd.__del__() 

1742 

1743 self.cat_file_all = None 

1744 self.cat_file_header = None 

1745 return self