Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/git/cmd.py: 61%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

622 statements  

1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

2# 

3# This module is part of GitPython and is released under the 

4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ 

5 

6from __future__ import annotations 

7 

8__all__ = ["GitMeta", "Git"] 

9 

10import contextlib 

11import io 

12import itertools 

13import logging 

14import os 

15import re 

16import signal 

17import subprocess 

18from subprocess import DEVNULL, PIPE, Popen 

19import sys 

20from textwrap import dedent 

21import threading 

22import warnings 

23 

24from git.compat import defenc, force_bytes, safe_decode 

25from git.exc import ( 

26 CommandError, 

27 GitCommandError, 

28 GitCommandNotFound, 

29 UnsafeOptionError, 

30 UnsafeProtocolError, 

31) 

32from git.util import ( 

33 cygpath, 

34 expand_path, 

35 is_cygwin_git, 

36 patch_env, 

37 remove_password_if_present, 

38 stream_copy, 

39) 

40 

41# typing --------------------------------------------------------------------------- 

42 

43from typing import ( 

44 Any, 

45 AnyStr, 

46 BinaryIO, 

47 Callable, 

48 Dict, 

49 IO, 

50 Iterator, 

51 List, 

52 Mapping, 

53 Optional, 

54 Sequence, 

55 TYPE_CHECKING, 

56 TextIO, 

57 Tuple, 

58 Union, 

59 cast, 

60 overload, 

61) 

62 

63if sys.version_info >= (3, 10): 

64 from typing import TypeAlias 

65else: 

66 from typing_extensions import TypeAlias 

67 

68from git.types import Literal, PathLike, TBD 

69 

70if TYPE_CHECKING: 

71 from git.diff import DiffIndex 

72 from git.repo.base import Repo 

73 

74# --------------------------------------------------------------------------------- 

75 

76execute_kwargs = { 

77 "istream", 

78 "with_extended_output", 

79 "with_exceptions", 

80 "as_process", 

81 "output_stream", 

82 "stdout_as_string", 

83 "kill_after_timeout", 

84 "with_stdout", 

85 "universal_newlines", 

86 "shell", 

87 "env", 

88 "max_chunk_size", 

89 "strip_newline_in_stdout", 

90} 

91 

92_logger = logging.getLogger(__name__) 

93 

94 

95# ============================================================================== 

96## @name Utilities 

97# ------------------------------------------------------------------------------ 

98# Documentation 

99## @{ 

100 

101 

102def handle_process_output( 

103 process: "Git.AutoInterrupt" | Popen, 

104 stdout_handler: Union[ 

105 None, 

106 Callable[[AnyStr], None], 

107 Callable[[List[AnyStr]], None], 

108 Callable[[bytes, "Repo", "DiffIndex"], None], 

109 ], 

110 stderr_handler: Union[None, Callable[[AnyStr], None], Callable[[List[AnyStr]], None]], 

111 finalizer: Union[None, Callable[[Union[Popen, "Git.AutoInterrupt"]], None]] = None, 

112 decode_streams: bool = True, 

113 kill_after_timeout: Union[None, float] = None, 

114) -> None: 

115 R"""Register for notifications to learn that process output is ready to read, and 

116 dispatch lines to the respective line handlers. 

117 

118 This function returns once the finalizer returns. 

119 

120 :param process: 

121 :class:`subprocess.Popen` instance. 

122 

123 :param stdout_handler: 

124 f(stdout_line_string), or ``None``. 

125 

126 :param stderr_handler: 

127 f(stderr_line_string), or ``None``. 

128 

129 :param finalizer: 

130 f(proc) - wait for proc to finish. 

131 

132 :param decode_streams: 

133 Assume stdout/stderr streams are binary and decode them before pushing their 

134 contents to handlers. 

135 

136 This defaults to ``True``. Set it to ``False`` if: 

137 

138 - ``universal_newlines == True``, as then streams are in text mode, or 

139 - decoding must happen later, such as for :class:`~git.diff.Diff`\s. 

140 

141 :param kill_after_timeout: 

142 :class:`float` or ``None``, Default = ``None`` 

143 

144 To specify a timeout in seconds for the git command, after which the process 

145 should be killed. 

146 """ 

147 

148 # Use 2 "pump" threads and wait for both to finish. 

149 def pump_stream( 

150 cmdline: List[str], 

151 name: str, 

152 stream: Union[BinaryIO, TextIO], 

153 is_decode: bool, 

154 handler: Union[None, Callable[[Union[bytes, str]], None]], 

155 ) -> None: 

156 try: 

157 for line in stream: 

158 if handler: 

159 if is_decode: 

160 assert isinstance(line, bytes) 

161 line_str = line.decode(defenc) 

162 handler(line_str) 

163 else: 

164 handler(line) 

165 

166 except Exception as ex: 

167 _logger.error(f"Pumping {name!r} of cmd({remove_password_if_present(cmdline)}) failed due to: {ex!r}") 

168 if "I/O operation on closed file" not in str(ex): 

169 # Only reraise if the error was not due to the stream closing. 

170 raise CommandError([f"<{name}-pump>"] + remove_password_if_present(cmdline), ex) from ex 

171 finally: 

172 stream.close() 

173 

174 if hasattr(process, "proc"): 

175 process = cast("Git.AutoInterrupt", process) 

176 cmdline: str | Tuple[str, ...] | List[str] = getattr(process.proc, "args", "") 

177 p_stdout = process.proc.stdout if process.proc else None 

178 p_stderr = process.proc.stderr if process.proc else None 

179 else: 

180 process = cast(Popen, process) # type: ignore[redundant-cast] 

181 cmdline = getattr(process, "args", "") 

182 p_stdout = process.stdout 

183 p_stderr = process.stderr 

184 

185 if not isinstance(cmdline, (tuple, list)): 

186 cmdline = cmdline.split() 

187 

188 pumps: List[Tuple[str, IO, Callable[..., None] | None]] = [] 

189 if p_stdout: 

190 pumps.append(("stdout", p_stdout, stdout_handler)) 

191 if p_stderr: 

192 pumps.append(("stderr", p_stderr, stderr_handler)) 

193 

194 threads: List[threading.Thread] = [] 

195 

196 for name, stream, handler in pumps: 

197 t = threading.Thread(target=pump_stream, args=(cmdline, name, stream, decode_streams, handler)) 

198 t.daemon = True 

199 t.start() 

200 threads.append(t) 

201 

202 # FIXME: Why join? Will block if stdin needs feeding... 

203 for t in threads: 

204 t.join(timeout=kill_after_timeout) 

205 if t.is_alive(): 

206 if isinstance(process, Git.AutoInterrupt): 

207 process._terminate() 

208 else: # Don't want to deal with the other case. 

209 raise RuntimeError( 

210 "Thread join() timed out in cmd.handle_process_output()." 

211 f" kill_after_timeout={kill_after_timeout} seconds" 

212 ) 

213 if stderr_handler: 

214 error_str: Union[str, bytes] = ( 

215 f"error: process killed because it timed out. kill_after_timeout={kill_after_timeout} seconds" 

216 ) 

217 if not decode_streams and isinstance(p_stderr, BinaryIO): 

218 # Assume stderr_handler needs binary input. 

219 error_str = cast(str, error_str) 

220 error_str = error_str.encode() 

221 # We ignore typing on the next line because mypy does not like the way 

222 # we inferred that stderr takes str or bytes. 

223 stderr_handler(error_str) # type: ignore[arg-type] 

224 

225 if finalizer: 

226 finalizer(process) 

227 

228 

229safer_popen: Callable[..., Popen] 

230 

231if sys.platform == "win32": 

232 

233 def _safer_popen_windows( 

234 command: Union[str, Sequence[Any]], 

235 *, 

236 shell: bool = False, 

237 env: Optional[Mapping[str, str]] = None, 

238 **kwargs: Any, 

239 ) -> Popen: 

240 """Call :class:`subprocess.Popen` on Windows but don't include a CWD in the 

241 search. 

242 

243 This avoids an untrusted search path condition where a file like ``git.exe`` in 

244 a malicious repository would be run when GitPython operates on the repository. 

245 The process using GitPython may have an untrusted repository's working tree as 

246 its current working directory. Some operations may temporarily change to that 

247 directory before running a subprocess. In addition, while by default GitPython 

248 does not run external commands with a shell, it can be made to do so, in which 

249 case the CWD of the subprocess, which GitPython usually sets to a repository 

250 working tree, can itself be searched automatically by the shell. This wrapper 

251 covers all those cases. 

252 

253 :note: 

254 This currently works by setting the 

255 :envvar:`NoDefaultCurrentDirectoryInExePath` environment variable during 

256 subprocess creation. It also takes care of passing Windows-specific process 

257 creation flags, but that is unrelated to path search. 

258 

259 :note: 

260 The current implementation contains a race condition on :attr:`os.environ`. 

261 GitPython isn't thread-safe, but a program using it on one thread should 

262 ideally be able to mutate :attr:`os.environ` on another, without 

263 unpredictable results. See comments in: 

264 https://github.com/gitpython-developers/GitPython/pull/1650 

265 """ 

266 # CREATE_NEW_PROCESS_GROUP is needed for some ways of killing it afterwards. 

267 # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal 

268 # https://docs.python.org/3/library/subprocess.html#subprocess.CREATE_NEW_PROCESS_GROUP 

269 creationflags = subprocess.CREATE_NO_WINDOW | subprocess.CREATE_NEW_PROCESS_GROUP 

270 

271 # When using a shell, the shell is the direct subprocess, so the variable must 

272 # be set in its environment, to affect its search behavior. 

273 if shell: 

274 # The original may be immutable, or the caller may reuse it. Mutate a copy. 

275 env = {} if env is None else dict(env) 

276 env["NoDefaultCurrentDirectoryInExePath"] = "1" # The "1" can be any value. 

277 

278 # When not using a shell, the current process does the search in a 

279 # CreateProcessW API call, so the variable must be set in our environment. With 

280 # a shell, that's unnecessary if https://github.com/python/cpython/issues/101283 

281 # is patched. In Python versions where it is unpatched, in the rare case the 

282 # ComSpec environment variable is unset, the search for the shell itself is 

283 # unsafe. Setting NoDefaultCurrentDirectoryInExePath in all cases, as done here, 

284 # is simpler and protects against that. (As above, the "1" can be any value.) 

285 with patch_env("NoDefaultCurrentDirectoryInExePath", "1"): 

286 return Popen( 

287 command, 

288 shell=shell, 

289 env=env, 

290 creationflags=creationflags, 

291 **kwargs, 

292 ) 

293 

294 safer_popen = _safer_popen_windows 

295else: 

296 safer_popen = Popen 

297 

298 

299def dashify(string: str) -> str: 

300 return string.replace("_", "-") 

301 

302 

303def slots_to_dict(self: "Git", exclude: Sequence[str] = ()) -> Dict[str, Any]: 

304 return {s: getattr(self, s) for s in self.__slots__ if s not in exclude} 

305 

306 

307def dict_to_slots_and__excluded_are_none(self: object, d: Mapping[str, Any], excluded: Sequence[str] = ()) -> None: 

308 for k, v in d.items(): 

309 setattr(self, k, v) 

310 for k in excluded: 

311 setattr(self, k, None) 

312 

313 

314## -- End Utilities -- @} 

315 

316 

317class _AutoInterrupt: 

318 """Process wrapper that terminates the wrapped process on finalization. 

319 

320 This kills/interrupts the stored process instance once this instance goes out of 

321 scope. It is used to prevent processes piling up in case iterators stop reading. 

322 

323 All attributes are wired through to the contained process object. 

324 

325 The wait method is overridden to perform automatic status code checking and possibly 

326 raise. 

327 """ 

328 

329 __slots__ = ("proc", "args", "status") 

330 

331 # If this is non-zero it will override any status code during _terminate, used 

332 # to prevent race conditions in testing. 

333 _status_code_if_terminate: int = 0 

334 

335 def __init__(self, proc: Union[None, subprocess.Popen], args: Any) -> None: 

336 self.proc = proc 

337 self.args = args 

338 self.status: Union[int, None] = None 

339 

340 def _terminate(self) -> None: 

341 """Terminate the underlying process.""" 

342 if self.proc is None: 

343 return 

344 

345 proc = self.proc 

346 self.proc = None 

347 if proc.stdin: 

348 proc.stdin.close() 

349 if proc.stdout: 

350 proc.stdout.close() 

351 if proc.stderr: 

352 proc.stderr.close() 

353 # Did the process finish already so we have a return code? 

354 try: 

355 if proc.poll() is not None: 

356 self.status = self._status_code_if_terminate or proc.poll() 

357 return 

358 except OSError as ex: 

359 _logger.info("Ignored error after process had died: %r", ex) 

360 

361 # It can be that nothing really exists anymore... 

362 if os is None or getattr(os, "kill", None) is None: 

363 return 

364 

365 # Try to kill it. 

366 try: 

367 proc.terminate() 

368 status = proc.wait() # Ensure the process goes away. 

369 

370 self.status = self._status_code_if_terminate or status 

371 except (OSError, AttributeError) as ex: 

372 # On interpreter shutdown (notably on Windows), parts of the stdlib used by 

373 # subprocess can already be torn down (e.g. `subprocess._winapi` becomes None), 

374 # which can cause AttributeError during terminate(). In that case, we prefer 

375 # to silently ignore to avoid noisy "Exception ignored in: __del__" messages. 

376 _logger.info("Ignored error while terminating process: %r", ex) 

377 # END exception handling 

378 

379 def __del__(self) -> None: 

380 self._terminate() 

381 

382 def __getattr__(self, attr: str) -> Any: 

383 return getattr(self.proc, attr) 

384 

385 # TODO: Bad choice to mimic `proc.wait()` but with different args. 

386 def wait(self, stderr: Union[None, str, bytes] = b"") -> int: 

387 """Wait for the process and return its status code. 

388 

389 :param stderr: 

390 Previously read value of stderr, in case stderr is already closed. 

391 

392 :warn: 

393 May deadlock if output or error pipes are used and not handled separately. 

394 

395 :raise git.exc.GitCommandError: 

396 If the return status is not 0. 

397 """ 

398 if stderr is None: 

399 stderr_b = b"" 

400 stderr_b = force_bytes(data=stderr, encoding="utf-8") 

401 status: Union[int, None] 

402 if self.proc is not None: 

403 status = self.proc.wait() 

404 p_stderr = self.proc.stderr 

405 else: # Assume the underlying proc was killed earlier or never existed. 

406 status = self.status 

407 p_stderr = None 

408 

409 def read_all_from_possibly_closed_stream(stream: Union[IO[bytes], None]) -> bytes: 

410 if stream: 

411 try: 

412 return stderr_b + force_bytes(stream.read()) 

413 except (OSError, ValueError): 

414 return stderr_b or b"" 

415 else: 

416 return stderr_b or b"" 

417 

418 # END status handling 

419 

420 if status != 0: 

421 errstr = read_all_from_possibly_closed_stream(p_stderr) 

422 _logger.debug("AutoInterrupt wait stderr: %r" % (errstr,)) 

423 raise GitCommandError(remove_password_if_present(self.args), status, errstr) 

424 return status 

425 

426 

427_AutoInterrupt.__name__ = "AutoInterrupt" 

428_AutoInterrupt.__qualname__ = "Git.AutoInterrupt" 

429 

430 

431class _CatFileContentStream: 

432 """Object representing a sized read-only stream returning the contents of 

433 an object. 

434 

435 This behaves like a stream, but counts the data read and simulates an empty stream 

436 once our sized content region is empty. 

437 

438 If not all data are read to the end of the object's lifetime, we read the rest to 

439 ensure the underlying stream continues to work. 

440 """ 

441 

442 __slots__ = ("_stream", "_nbr", "_size") 

443 

444 def __init__(self, size: int, stream: IO[bytes]) -> None: 

445 self._stream = stream 

446 self._size = size 

447 self._nbr = 0 # Number of bytes read. 

448 

449 # Special case: If the object is empty, has null bytes, get the final 

450 # newline right away. 

451 if size == 0: 

452 stream.read(1) 

453 # END handle empty streams 

454 

455 def read(self, size: int = -1) -> bytes: 

456 bytes_left = self._size - self._nbr 

457 if bytes_left == 0: 

458 return b"" 

459 if size > -1: 

460 # Ensure we don't try to read past our limit. 

461 size = min(bytes_left, size) 

462 else: 

463 # They try to read all, make sure it's not more than what remains. 

464 size = bytes_left 

465 # END check early depletion 

466 data = self._stream.read(size) 

467 self._nbr += len(data) 

468 

469 # Check for depletion, read our final byte to make the stream usable by 

470 # others. 

471 if self._size - self._nbr == 0: 

472 self._stream.read(1) # final newline 

473 # END finish reading 

474 return data 

475 

476 def readline(self, size: int = -1) -> bytes: 

477 if self._nbr == self._size: 

478 return b"" 

479 

480 # Clamp size to lowest allowed value. 

481 bytes_left = self._size - self._nbr 

482 if size > -1: 

483 size = min(bytes_left, size) 

484 else: 

485 size = bytes_left 

486 # END handle size 

487 

488 data = self._stream.readline(size) 

489 self._nbr += len(data) 

490 

491 # Handle final byte. 

492 if self._size - self._nbr == 0: 

493 self._stream.read(1) 

494 # END finish reading 

495 

496 return data 

497 

498 def readlines(self, size: int = -1) -> List[bytes]: 

499 if self._nbr == self._size: 

500 return [] 

501 

502 # Leave all additional logic to our readline method, we just check the size. 

503 out = [] 

504 nbr = 0 

505 while True: 

506 line = self.readline() 

507 if not line: 

508 break 

509 out.append(line) 

510 if size > -1: 

511 nbr += len(line) 

512 if nbr > size: 

513 break 

514 # END handle size constraint 

515 # END readline loop 

516 return out 

517 

518 # skipcq: PYL-E0301 

519 def __iter__(self) -> "Git.CatFileContentStream": 

520 return self 

521 

522 def __next__(self) -> bytes: 

523 line = self.readline() 

524 if not line: 

525 raise StopIteration 

526 

527 return line 

528 

529 next = __next__ 

530 

531 def __del__(self) -> None: 

532 bytes_left = self._size - self._nbr 

533 if bytes_left: 

534 # Read and discard - seeking is impossible within a stream. 

535 # This includes any terminating newline. 

536 self._stream.read(bytes_left + 1) 

537 # END handle incomplete read 

538 

539 

540_CatFileContentStream.__name__ = "CatFileContentStream" 

541_CatFileContentStream.__qualname__ = "Git.CatFileContentStream" 

542 

543 

544_USE_SHELL_DEFAULT_MESSAGE = ( 

545 "Git.USE_SHELL is deprecated, because only its default value of False is safe. " 

546 "It will be removed in a future release." 

547) 

548 

549_USE_SHELL_DANGER_MESSAGE = ( 

550 "Setting Git.USE_SHELL to True is unsafe and insecure, as the effect of special " 

551 "shell syntax cannot usually be accounted for. This can result in a command " 

552 "injection vulnerability and arbitrary code execution. Git.USE_SHELL is deprecated " 

553 "and will be removed in a future release." 

554) 

555 

556 

557def _warn_use_shell(*, extra_danger: bool) -> None: 

558 warnings.warn( 

559 _USE_SHELL_DANGER_MESSAGE if extra_danger else _USE_SHELL_DEFAULT_MESSAGE, 

560 DeprecationWarning, 

561 stacklevel=3, 

562 ) 

563 

564 

565class _GitMeta(type): 

566 """Metaclass for :class:`Git`. 

567 

568 This helps issue :class:`DeprecationWarning` if :attr:`Git.USE_SHELL` is used. 

569 """ 

570 

571 def __getattribute(cls, name: str) -> Any: 

572 if name == "USE_SHELL": 

573 _warn_use_shell(extra_danger=False) 

574 return super().__getattribute__(name) 

575 

576 def __setattr(cls, name: str, value: Any) -> Any: 

577 if name == "USE_SHELL": 

578 _warn_use_shell(extra_danger=value) 

579 super().__setattr__(name, value) 

580 

581 if not TYPE_CHECKING: 

582 # To preserve static checking for undefined/misspelled attributes while letting 

583 # the methods' bodies be type-checked, these are defined as non-special methods, 

584 # then bound to special names out of view of static type checkers. (The original 

585 # names invoke name mangling (leading "__") to avoid confusion in other scopes.) 

586 __getattribute__ = __getattribute 

587 __setattr__ = __setattr 

588 

589 

590GitMeta = _GitMeta 

591"""Alias of :class:`Git`'s metaclass, whether it is :class:`type` or a custom metaclass. 

592 

593Whether the :class:`Git` class has the default :class:`type` as its metaclass or uses a 

594custom metaclass is not documented and may change at any time. This statically checkable 

595metaclass alias is equivalent at runtime to ``type(Git)``. This should almost never be 

596used. Code that benefits from it is likely to be remain brittle even if it is used. 

597 

598In view of the :class:`Git` class's intended use and :class:`Git` objects' dynamic 

599callable attributes representing git subcommands, it rarely makes sense to inherit from 

600:class:`Git` at all. Using :class:`Git` in multiple inheritance can be especially tricky 

601to do correctly. Attempting uses of :class:`Git` where its metaclass is relevant, such 

602as when a sibling class has an unrelated metaclass and a shared lower bound metaclass 

603might have to be introduced to solve a metaclass conflict, is not recommended. 

604 

605:note: 

606 The correct static type of the :class:`Git` class itself, and any subclasses, is 

607 ``Type[Git]``. (This can be written as ``type[Git]`` in Python 3.9 later.) 

608 

609 :class:`GitMeta` should never be used in any annotation where ``Type[Git]`` is 

610 intended or otherwise possible to use. This alias is truly only for very rare and 

611 inherently precarious situations where it is necessary to deal with the metaclass 

612 explicitly. 

613""" 

614 

615 

616class Git(metaclass=_GitMeta): 

617 """The Git class manages communication with the Git binary. 

618 

619 It provides a convenient interface to calling the Git binary, such as in:: 

620 

621 g = Git( git_dir ) 

622 g.init() # calls 'git init' program 

623 rval = g.ls_files() # calls 'git ls-files' program 

624 

625 Debugging: 

626 

627 * Set the :envvar:`GIT_PYTHON_TRACE` environment variable to print each invocation 

628 of the command to stdout. 

629 * Set its value to ``full`` to see details about the returned values. 

630 """ 

631 

632 __slots__ = ( 

633 "_working_dir", 

634 "cat_file_all", 

635 "cat_file_header", 

636 "_version_info", 

637 "_version_info_token", 

638 "_git_options", 

639 "_persistent_git_options", 

640 "_environment", 

641 ) 

642 

643 _excluded_ = ( 

644 "cat_file_all", 

645 "cat_file_header", 

646 "_version_info", 

647 "_version_info_token", 

648 ) 

649 

650 re_unsafe_protocol = re.compile(r"(.+)::.+") 

651 

652 def __getstate__(self) -> Dict[str, Any]: 

653 return slots_to_dict(self, exclude=self._excluded_) 

654 

655 def __setstate__(self, d: Dict[str, Any]) -> None: 

656 dict_to_slots_and__excluded_are_none(self, d, excluded=self._excluded_) 

657 

658 # CONFIGURATION 

659 

660 git_exec_name = "git" 

661 """Default git command that should work on Linux, Windows, and other systems.""" 

662 

663 GIT_PYTHON_TRACE = os.environ.get("GIT_PYTHON_TRACE", False) 

664 """Enables debugging of GitPython's git commands.""" 

665 

666 USE_SHELL: bool = False 

667 """Deprecated. If set to ``True``, a shell will be used when executing git commands. 

668 

669 Code that uses ``USE_SHELL = True`` or that passes ``shell=True`` to any GitPython 

670 functions should be updated to use the default value of ``False`` instead. ``True`` 

671 is unsafe unless the effect of syntax treated specially by the shell is fully 

672 considered and accounted for, which is not possible under most circumstances. As 

673 detailed below, it is also no longer needed, even where it had been in the past. 

674 

675 It is in many if not most cases a command injection vulnerability for an application 

676 to set :attr:`USE_SHELL` to ``True``. Any attacker who can cause a specially crafted 

677 fragment of text to make its way into any part of any argument to any git command 

678 (including paths, branch names, etc.) can cause the shell to read and write 

679 arbitrary files and execute arbitrary commands. Innocent input may also accidentally 

680 contain special shell syntax, leading to inadvertent malfunctions. 

681 

682 In addition, how a value of ``True`` interacts with some aspects of GitPython's 

683 operation is not precisely specified and may change without warning, even before 

684 GitPython 4.0.0 when :attr:`USE_SHELL` may be removed. This includes: 

685 

686 * Whether or how GitPython automatically customizes the shell environment. 

687 

688 * Whether, outside of Windows (where :class:`subprocess.Popen` supports lists of 

689 separate arguments even when ``shell=True``), this can be used with any GitPython 

690 functionality other than direct calls to the :meth:`execute` method. 

691 

692 * Whether any GitPython feature that runs git commands ever attempts to partially 

693 sanitize data a shell may treat specially. Currently this is not done. 

694 

695 Prior to GitPython 2.0.8, this had a narrow purpose in suppressing console windows 

696 in graphical Windows applications. In 2.0.8 and higher, it provides no benefit, as 

697 GitPython solves that problem more robustly and safely by using the 

698 ``CREATE_NO_WINDOW`` process creation flag on Windows. 

699 

700 Because Windows path search differs subtly based on whether a shell is used, in rare 

701 cases changing this from ``True`` to ``False`` may keep an unusual git "executable", 

702 such as a batch file, from being found. To fix this, set the command name or full 

703 path in the :envvar:`GIT_PYTHON_GIT_EXECUTABLE` environment variable or pass the 

704 full path to :func:`git.refresh` (or invoke the script using a ``.exe`` shim). 

705 

706 Further reading: 

707 

708 * :meth:`Git.execute` (on the ``shell`` parameter). 

709 * https://github.com/gitpython-developers/GitPython/commit/0d9390866f9ce42870d3116094cd49e0019a970a 

710 * https://learn.microsoft.com/en-us/windows/win32/procthread/process-creation-flags 

711 * https://github.com/python/cpython/issues/91558#issuecomment-1100942950 

712 * https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-createprocessw 

713 """ 

714 

715 _git_exec_env_var = "GIT_PYTHON_GIT_EXECUTABLE" 

716 _refresh_env_var = "GIT_PYTHON_REFRESH" 

717 

718 GIT_PYTHON_GIT_EXECUTABLE = None 

719 """Provide the full path to the git executable. Otherwise it assumes git is in the 

720 executable search path. 

721 

722 :note: 

723 The git executable is actually found during the refresh step in the top level 

724 ``__init__``. It can also be changed by explicitly calling :func:`git.refresh`. 

725 """ 

726 

727 _refresh_token = object() # Since None would match an initial _version_info_token. 

728 

729 @classmethod 

730 def refresh(cls, path: Union[None, PathLike] = None) -> bool: 

731 """Update information about the git executable :class:`Git` objects will use. 

732 

733 Called by the :func:`git.refresh` function in the top level ``__init__``. 

734 

735 :param path: 

736 Optional path to the git executable. If not absolute, it is resolved 

737 immediately, relative to the current directory. (See note below.) 

738 

739 :note: 

740 The top-level :func:`git.refresh` should be preferred because it calls this 

741 method and may also update other state accordingly. 

742 

743 :note: 

744 There are three different ways to specify the command that refreshing causes 

745 to be used for git: 

746 

747 1. Pass no `path` argument and do not set the 

748 :envvar:`GIT_PYTHON_GIT_EXECUTABLE` environment variable. The command 

749 name ``git`` is used. It is looked up in a path search by the system, in 

750 each command run (roughly similar to how git is found when running 

751 ``git`` commands manually). This is usually the desired behavior. 

752 

753 2. Pass no `path` argument but set the :envvar:`GIT_PYTHON_GIT_EXECUTABLE` 

754 environment variable. The command given as the value of that variable is 

755 used. This may be a simple command or an arbitrary path. It is looked up 

756 in each command run. Setting :envvar:`GIT_PYTHON_GIT_EXECUTABLE` to 

757 ``git`` has the same effect as not setting it. 

758 

759 3. Pass a `path` argument. This path, if not absolute, is immediately 

760 resolved, relative to the current directory. This resolution occurs at 

761 the time of the refresh. When git commands are run, they are run using 

762 that previously resolved path. If a `path` argument is passed, the 

763 :envvar:`GIT_PYTHON_GIT_EXECUTABLE` environment variable is not 

764 consulted. 

765 

766 :note: 

767 Refreshing always sets the :attr:`Git.GIT_PYTHON_GIT_EXECUTABLE` class 

768 attribute, which can be read on the :class:`Git` class or any of its 

769 instances to check what command is used to run git. This attribute should 

770 not be confused with the related :envvar:`GIT_PYTHON_GIT_EXECUTABLE` 

771 environment variable. The class attribute is set no matter how refreshing is 

772 performed. 

773 """ 

774 # Discern which path to refresh with. 

775 if path is not None: 

776 new_git = os.path.expanduser(path) 

777 new_git = os.path.abspath(new_git) 

778 else: 

779 new_git = os.environ.get(cls._git_exec_env_var, cls.git_exec_name) 

780 

781 # Keep track of the old and new git executable path. 

782 old_git = cls.GIT_PYTHON_GIT_EXECUTABLE 

783 old_refresh_token = cls._refresh_token 

784 cls.GIT_PYTHON_GIT_EXECUTABLE = new_git 

785 cls._refresh_token = object() 

786 

787 # Test if the new git executable path is valid. A GitCommandNotFound error is 

788 # raised by us. A PermissionError is raised if the git executable cannot be 

789 # executed for whatever reason. 

790 has_git = False 

791 try: 

792 cls().version() 

793 has_git = True 

794 except (GitCommandNotFound, PermissionError): 

795 pass 

796 

797 # Warn or raise exception if test failed. 

798 if not has_git: 

799 err = ( 

800 dedent( 

801 """\ 

802 Bad git executable. 

803 The git executable must be specified in one of the following ways: 

804 - be included in your $PATH 

805 - be set via $%s 

806 - explicitly set via git.refresh(<full-path-to-git-executable>) 

807 """ 

808 ) 

809 % cls._git_exec_env_var 

810 ) 

811 

812 # Revert to whatever the old_git was. 

813 cls.GIT_PYTHON_GIT_EXECUTABLE = old_git 

814 cls._refresh_token = old_refresh_token 

815 

816 if old_git is None: 

817 # On the first refresh (when GIT_PYTHON_GIT_EXECUTABLE is None) we only 

818 # are quiet, warn, or error depending on the GIT_PYTHON_REFRESH value. 

819 

820 # Determine what the user wants to happen during the initial refresh. We 

821 # expect GIT_PYTHON_REFRESH to either be unset or be one of the 

822 # following values: 

823 # 

824 # 0|q|quiet|s|silence|silent|n|none 

825 # 1|w|warn|warning|l|log 

826 # 2|r|raise|e|error|exception 

827 

828 mode = os.environ.get(cls._refresh_env_var, "raise").lower() 

829 

830 quiet = ["quiet", "q", "silence", "s", "silent", "none", "n", "0"] 

831 warn = ["warn", "w", "warning", "log", "l", "1"] 

832 error = ["error", "e", "exception", "raise", "r", "2"] 

833 

834 if mode in quiet: 

835 pass 

836 elif mode in warn or mode in error: 

837 err = dedent( 

838 """\ 

839 %s 

840 All git commands will error until this is rectified. 

841 

842 This initial message can be silenced or aggravated in the future by setting the 

843 $%s environment variable. Use one of the following values: 

844 - %s: for no message or exception 

845 - %s: for a warning message (logging level CRITICAL, displayed by default) 

846 - %s: for a raised exception 

847 

848 Example: 

849 export %s=%s 

850 """ 

851 ) % ( 

852 err, 

853 cls._refresh_env_var, 

854 "|".join(quiet), 

855 "|".join(warn), 

856 "|".join(error), 

857 cls._refresh_env_var, 

858 quiet[0], 

859 ) 

860 

861 if mode in warn: 

862 _logger.critical(err) 

863 else: 

864 raise ImportError(err) 

865 else: 

866 err = dedent( 

867 """\ 

868 %s environment variable has been set but it has been set with an invalid value. 

869 

870 Use only the following values: 

871 - %s: for no message or exception 

872 - %s: for a warning message (logging level CRITICAL, displayed by default) 

873 - %s: for a raised exception 

874 """ 

875 ) % ( 

876 cls._refresh_env_var, 

877 "|".join(quiet), 

878 "|".join(warn), 

879 "|".join(error), 

880 ) 

881 raise ImportError(err) 

882 

883 # We get here if this was the initial refresh and the refresh mode was 

884 # not error. Go ahead and set the GIT_PYTHON_GIT_EXECUTABLE such that we 

885 # discern the difference between the first refresh at import time 

886 # and subsequent calls to git.refresh or this refresh method. 

887 cls.GIT_PYTHON_GIT_EXECUTABLE = cls.git_exec_name 

888 else: 

889 # After the first refresh (when GIT_PYTHON_GIT_EXECUTABLE is no longer 

890 # None) we raise an exception. 

891 raise GitCommandNotFound(new_git, err) 

892 

893 return has_git 

894 

895 @classmethod 

896 def is_cygwin(cls) -> bool: 

897 return is_cygwin_git(cls.GIT_PYTHON_GIT_EXECUTABLE) 

898 

899 @overload 

900 @classmethod 

901 def polish_url(cls, url: str, is_cygwin: Literal[False] = ...) -> str: ... 

902 

903 @overload 

904 @classmethod 

905 def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> str: ... 

906 

907 @classmethod 

908 def polish_url(cls, url: str, is_cygwin: Union[None, bool] = None) -> PathLike: 

909 """Remove any backslashes from URLs to be written in config files. 

910 

911 Windows might create config files containing paths with backslashes, but git 

912 stops liking them as it will escape the backslashes. Hence we undo the escaping 

913 just to be sure. 

914 """ 

915 if is_cygwin is None: 

916 is_cygwin = cls.is_cygwin() 

917 

918 if is_cygwin: 

919 url = cygpath(url) 

920 else: 

921 url = os.path.expandvars(url) 

922 if url.startswith("~"): 

923 url = os.path.expanduser(url) 

924 url = url.replace("\\\\", "\\").replace("\\", "/") 

925 return url 

926 

927 @classmethod 

928 def check_unsafe_protocols(cls, url: str) -> None: 

929 """Check for unsafe protocols. 

930 

931 Apart from the usual protocols (http, git, ssh), Git allows "remote helpers" 

932 that have the form ``<transport>::<address>``. One of these helpers (``ext::``) 

933 can be used to invoke any arbitrary command. 

934 

935 See: 

936 

937 - https://git-scm.com/docs/gitremote-helpers 

938 - https://git-scm.com/docs/git-remote-ext 

939 """ 

940 match = cls.re_unsafe_protocol.match(url) 

941 if match: 

942 protocol = match.group(1) 

943 raise UnsafeProtocolError( 

944 f"The `{protocol}::` protocol looks suspicious, use `allow_unsafe_protocols=True` to allow it." 

945 ) 

946 

947 @classmethod 

948 def _canonicalize_option_name(cls, option: str) -> str: 

949 """Return the option name used for unsafe-option checks. 

950 

951 Examples: 

952 ``"--upload-pack=/tmp/helper"`` -> ``"upload-pack"`` 

953 ``"upload_pack"`` -> ``"upload-pack"`` 

954 ``"--config core.filemode=false"`` -> ``"config"`` 

955 """ 

956 option_name = option.lstrip("-").split("=", 1)[0] 

957 option_tokens = option_name.split(None, 1) 

958 if not option_tokens: 

959 return "" 

960 return dashify(option_tokens[0]) 

961 

962 @classmethod 

963 def check_unsafe_options(cls, options: List[str], unsafe_options: List[str]) -> None: 

964 """Check for unsafe options. 

965 

966 Some options that are passed to ``git <command>`` can be used to execute 

967 arbitrary commands. These are blocked by default. 

968 """ 

969 # Options can be of the form `foo`, `--foo`, `--foo bar`, or `--foo=bar`. 

970 canonical_unsafe_options = {cls._canonicalize_option_name(option): option for option in unsafe_options} 

971 for option in options: 

972 unsafe_option = canonical_unsafe_options.get(cls._canonicalize_option_name(option)) 

973 if unsafe_option is not None: 

974 raise UnsafeOptionError(f"{unsafe_option} is not allowed, use `allow_unsafe_options=True` to allow it.") 

975 

976 AutoInterrupt: TypeAlias = _AutoInterrupt 

977 

978 CatFileContentStream: TypeAlias = _CatFileContentStream 

979 

980 def __init__(self, working_dir: Union[None, PathLike] = None) -> None: 

981 """Initialize this instance with: 

982 

983 :param working_dir: 

984 Git directory we should work in. If ``None``, we always work in the current 

985 directory as returned by :func:`os.getcwd`. 

986 This is meant to be the working tree directory if available, or the 

987 ``.git`` directory in case of bare repositories. 

988 """ 

989 super().__init__() 

990 self._working_dir = expand_path(working_dir) 

991 self._git_options: Union[List[str], Tuple[str, ...]] = () 

992 self._persistent_git_options: List[str] = [] 

993 

994 # Extra environment variables to pass to git commands 

995 self._environment: Dict[str, str] = {} 

996 

997 # Cached version slots 

998 self._version_info: Union[Tuple[int, ...], None] = None 

999 self._version_info_token: object = None 

1000 

1001 # Cached command slots 

1002 self.cat_file_header: Union[None, TBD] = None 

1003 self.cat_file_all: Union[None, TBD] = None 

1004 

1005 def __getattribute__(self, name: str) -> Any: 

1006 if name == "USE_SHELL": 

1007 _warn_use_shell(extra_danger=False) 

1008 return super().__getattribute__(name) 

1009 

1010 def __getattr__(self, name: str) -> Any: 

1011 """A convenience method as it allows to call the command as if it was an object. 

1012 

1013 :return: 

1014 Callable object that will execute call :meth:`_call_process` with your 

1015 arguments. 

1016 """ 

1017 if name.startswith("_"): 

1018 return super().__getattribute__(name) 

1019 return lambda *args, **kwargs: self._call_process(name, *args, **kwargs) 

1020 

1021 def set_persistent_git_options(self, **kwargs: Any) -> None: 

1022 """Specify command line options to the git executable for subsequent 

1023 subcommand calls. 

1024 

1025 :param kwargs: 

1026 A dict of keyword arguments. 

1027 These arguments are passed as in :meth:`_call_process`, but will be passed 

1028 to the git command rather than the subcommand. 

1029 """ 

1030 

1031 self._persistent_git_options = self.transform_kwargs(split_single_char_options=True, **kwargs) 

1032 

1033 @property 

1034 def working_dir(self) -> Union[None, PathLike]: 

1035 """:return: Git directory we are working on""" 

1036 return self._working_dir 

1037 

1038 @property 

1039 def version_info(self) -> Tuple[int, ...]: 

1040 """ 

1041 :return: Tuple with integers representing the major, minor and additional 

1042 version numbers as parsed from :manpage:`git-version(1)`. Up to four fields 

1043 are used. 

1044 

1045 This value is generated on demand and is cached. 

1046 """ 

1047 # Refreshing is global, but version_info caching is per-instance. 

1048 refresh_token = self._refresh_token # Copy token in case of concurrent refresh. 

1049 

1050 # Use the cached version if obtained after the most recent refresh. 

1051 if self._version_info_token is refresh_token: 

1052 assert self._version_info is not None, "Bug: corrupted token-check state" 

1053 return self._version_info 

1054 

1055 # Run "git version" and parse it. 

1056 process_version = self._call_process("version") 

1057 version_string = process_version.split(" ")[2] 

1058 version_fields = version_string.split(".")[:4] 

1059 leading_numeric_fields = itertools.takewhile(str.isdigit, version_fields) 

1060 self._version_info = tuple(map(int, leading_numeric_fields)) 

1061 

1062 # This value will be considered valid until the next refresh. 

1063 self._version_info_token = refresh_token 

1064 return self._version_info 

1065 

1066 @overload 

1067 def execute( 

1068 self, 

1069 command: Union[str, Sequence[Any]], 

1070 *, 

1071 as_process: Literal[True], 

1072 ) -> "AutoInterrupt": ... 

1073 

1074 @overload 

1075 def execute( 

1076 self, 

1077 command: Union[str, Sequence[Any]], 

1078 *, 

1079 as_process: Literal[False] = False, 

1080 stdout_as_string: Literal[True], 

1081 ) -> Union[str, Tuple[int, str, str]]: ... 

1082 

1083 @overload 

1084 def execute( 

1085 self, 

1086 command: Union[str, Sequence[Any]], 

1087 *, 

1088 as_process: Literal[False] = False, 

1089 stdout_as_string: Literal[False] = False, 

1090 ) -> Union[bytes, Tuple[int, bytes, str]]: ... 

1091 

1092 @overload 

1093 def execute( 

1094 self, 

1095 command: Union[str, Sequence[Any]], 

1096 *, 

1097 with_extended_output: Literal[False], 

1098 as_process: Literal[False], 

1099 stdout_as_string: Literal[True], 

1100 ) -> str: ... 

1101 

1102 @overload 

1103 def execute( 

1104 self, 

1105 command: Union[str, Sequence[Any]], 

1106 *, 

1107 with_extended_output: Literal[False], 

1108 as_process: Literal[False], 

1109 stdout_as_string: Literal[False], 

1110 ) -> bytes: ... 

1111 

1112 def execute( 

1113 self, 

1114 command: Union[str, Sequence[Any]], 

1115 istream: Union[None, BinaryIO] = None, 

1116 with_extended_output: bool = False, 

1117 with_exceptions: bool = True, 

1118 as_process: bool = False, 

1119 output_stream: Union[None, BinaryIO] = None, 

1120 stdout_as_string: bool = True, 

1121 kill_after_timeout: Union[None, float] = None, 

1122 with_stdout: bool = True, 

1123 universal_newlines: bool = False, 

1124 shell: Union[None, bool] = None, 

1125 env: Union[None, Mapping[str, str]] = None, 

1126 max_chunk_size: int = io.DEFAULT_BUFFER_SIZE, 

1127 strip_newline_in_stdout: bool = True, 

1128 **subprocess_kwargs: Any, 

1129 ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], AutoInterrupt]: 

1130 R"""Handle executing the command, and consume and return the returned 

1131 information (stdout). 

1132 

1133 :param command: 

1134 The command argument list to execute. 

1135 It should be a sequence of program arguments, or a string. The 

1136 program to execute is the first item in the args sequence or string. 

1137 

1138 :param istream: 

1139 Standard input filehandle passed to :class:`subprocess.Popen`. 

1140 

1141 :param with_extended_output: 

1142 Whether to return a (status, stdout, stderr) tuple. 

1143 

1144 :param with_exceptions: 

1145 Whether to raise an exception when git returns a non-zero status. 

1146 

1147 :param as_process: 

1148 Whether to return the created process instance directly from which 

1149 streams can be read on demand. This will render `with_extended_output` 

1150 and `with_exceptions` ineffective - the caller will have to deal with 

1151 the details. It is important to note that the process will be placed 

1152 into an :class:`AutoInterrupt` wrapper that will interrupt the process 

1153 once it goes out of scope. If you use the command in iterators, you 

1154 should pass the whole process instance instead of a single stream. 

1155 

1156 :param output_stream: 

1157 If set to a file-like object, data produced by the git command will be 

1158 copied to the given stream instead of being returned as a string. 

1159 This feature only has any effect if `as_process` is ``False``. 

1160 

1161 :param stdout_as_string: 

1162 If ``False``, the command's standard output will be bytes. Otherwise, it 

1163 will be decoded into a string using the default encoding (usually UTF-8). 

1164 The latter can fail, if the output contains binary data. 

1165 

1166 :param kill_after_timeout: 

1167 Specifies a timeout in seconds for the git command, after which the process 

1168 should be killed. This will have no effect if `as_process` is set to 

1169 ``True``. It is set to ``None`` by default and will let the process run 

1170 until the timeout is explicitly specified. Uses of this feature should be 

1171 carefully considered, due to the following limitations: 

1172 

1173 1. This feature is not supported at all on Windows. 

1174 2. Effectiveness may vary by operating system. ``ps --ppid`` is used to 

1175 enumerate child processes, which is available on most GNU/Linux systems 

1176 but not most others. 

1177 3. Deeper descendants do not receive signals, though they may sometimes 

1178 terminate as a consequence of their parent processes being killed. 

1179 4. `kill_after_timeout` uses ``SIGKILL``, which can have negative side 

1180 effects on a repository. For example, stale locks in case of 

1181 :manpage:`git-gc(1)` could render the repository incapable of accepting 

1182 changes until the lock is manually removed. 

1183 

1184 :param with_stdout: 

1185 If ``True``, default ``True``, we open stdout on the created process. 

1186 

1187 :param universal_newlines: 

1188 If ``True``, pipes will be opened as text, and lines are split at all known 

1189 line endings. 

1190 

1191 :param shell: 

1192 Whether to invoke commands through a shell 

1193 (see :class:`Popen(..., shell=True) <subprocess.Popen>`). 

1194 If this is not ``None``, it overrides :attr:`USE_SHELL`. 

1195 

1196 Passing ``shell=True`` to this or any other GitPython function should be 

1197 avoided, as it is unsafe under most circumstances. This is because it is 

1198 typically not feasible to fully consider and account for the effect of shell 

1199 expansions, especially when passing ``shell=True`` to other methods that 

1200 forward it to :meth:`Git.execute`. Passing ``shell=True`` is also no longer 

1201 needed (nor useful) to work around any known operating system specific 

1202 issues. 

1203 

1204 :param env: 

1205 A dictionary of environment variables to be passed to 

1206 :class:`subprocess.Popen`. 

1207 

1208 :param max_chunk_size: 

1209 Maximum number of bytes in one chunk of data passed to the `output_stream` 

1210 in one invocation of its ``write()`` method. If the given number is not 

1211 positive then the default value is used. 

1212 

1213 :param strip_newline_in_stdout: 

1214 Whether to strip the trailing ``\n`` of the command stdout. 

1215 

1216 :param subprocess_kwargs: 

1217 Keyword arguments to be passed to :class:`subprocess.Popen`. Please note 

1218 that some of the valid kwargs are already set by this method; the ones you 

1219 specify may not be the same ones. 

1220 

1221 :return: 

1222 * str(output), if `extended_output` is ``False`` (Default) 

1223 * tuple(int(status), str(stdout), str(stderr)), 

1224 if `extended_output` is ``True`` 

1225 

1226 If `output_stream` is ``True``, the stdout value will be your output stream: 

1227 

1228 * output_stream, if `extended_output` is ``False`` 

1229 * tuple(int(status), output_stream, str(stderr)), 

1230 if `extended_output` is ``True`` 

1231 

1232 Note that git is executed with ``LC_MESSAGES="C"`` to ensure consistent 

1233 output regardless of system language. 

1234 

1235 :raise git.exc.GitCommandError: 

1236 

1237 :note: 

1238 If you add additional keyword arguments to the signature of this method, you 

1239 must update the ``execute_kwargs`` variable housed in this module. 

1240 """ 

1241 # Remove password for the command if present. 

1242 redacted_command = remove_password_if_present(command) 

1243 if self.GIT_PYTHON_TRACE and (self.GIT_PYTHON_TRACE != "full" or as_process): 

1244 _logger.info(" ".join(redacted_command)) 

1245 

1246 # Allow the user to have the command executed in their working dir. 

1247 try: 

1248 cwd = self._working_dir or os.getcwd() # type: Union[None, str] 

1249 if not os.access(str(cwd), os.X_OK): 

1250 cwd = None 

1251 except FileNotFoundError: 

1252 cwd = None 

1253 

1254 # Start the process. 

1255 inline_env = env 

1256 env = os.environ.copy() 

1257 # Attempt to force all output to plain ASCII English, which is what some parsing 

1258 # code may expect. 

1259 # According to https://askubuntu.com/a/311796, we are setting LANGUAGE as well 

1260 # just to be sure. 

1261 env["LANGUAGE"] = "C" 

1262 env["LC_ALL"] = "C" 

1263 env.update(self._environment) 

1264 if inline_env is not None: 

1265 env.update(inline_env) 

1266 

1267 if sys.platform == "win32": 

1268 if kill_after_timeout is not None: 

1269 raise GitCommandError( 

1270 redacted_command, 

1271 '"kill_after_timeout" feature is not supported on Windows.', 

1272 ) 

1273 cmd_not_found_exception = OSError 

1274 else: 

1275 cmd_not_found_exception = FileNotFoundError 

1276 # END handle 

1277 

1278 stdout_sink = PIPE if with_stdout else getattr(subprocess, "DEVNULL", None) or open(os.devnull, "wb") 

1279 if shell is None: 

1280 # Get the value of USE_SHELL with no deprecation warning. Do this without 

1281 # warnings.catch_warnings, to avoid a race condition with application code 

1282 # configuring warnings. The value could be looked up in type(self).__dict__ 

1283 # or Git.__dict__, but those can break under some circumstances. This works 

1284 # the same as self.USE_SHELL in more situations; see Git.__getattribute__. 

1285 shell = super().__getattribute__("USE_SHELL") 

1286 _logger.debug( 

1287 "Popen(%s, cwd=%s, stdin=%s, shell=%s, universal_newlines=%s)", 

1288 redacted_command, 

1289 cwd, 

1290 "<valid stream>" if istream else "None", 

1291 shell, 

1292 universal_newlines, 

1293 ) 

1294 try: 

1295 proc = safer_popen( 

1296 command, 

1297 env=env, 

1298 cwd=cwd, 

1299 bufsize=-1, 

1300 stdin=(istream or DEVNULL), 

1301 stderr=PIPE, 

1302 stdout=stdout_sink, 

1303 shell=shell, 

1304 universal_newlines=universal_newlines, 

1305 encoding=defenc if universal_newlines else None, 

1306 **subprocess_kwargs, 

1307 ) 

1308 except cmd_not_found_exception as err: 

1309 raise GitCommandNotFound(redacted_command, err) from err 

1310 else: 

1311 # Replace with a typeguard for Popen[bytes]? 

1312 proc.stdout = cast(BinaryIO, proc.stdout) 

1313 proc.stderr = cast(BinaryIO, proc.stderr) 

1314 

1315 if as_process: 

1316 return self.AutoInterrupt(proc, command) 

1317 

1318 if sys.platform != "win32" and kill_after_timeout is not None: 

1319 # Help mypy figure out this is not None even when used inside communicate(). 

1320 timeout = kill_after_timeout 

1321 

1322 def kill_process(pid: int) -> None: 

1323 """Callback to kill a process. 

1324 

1325 This callback implementation would be ineffective and unsafe on Windows. 

1326 """ 

1327 p = Popen(["ps", "--ppid", str(pid)], stdout=PIPE) 

1328 child_pids = [] 

1329 if p.stdout is not None: 

1330 for line in p.stdout: 

1331 if len(line.split()) > 0: 

1332 local_pid = (line.split())[0] 

1333 if local_pid.isdigit(): 

1334 child_pids.append(int(local_pid)) 

1335 try: 

1336 os.kill(pid, signal.SIGKILL) 

1337 for child_pid in child_pids: 

1338 try: 

1339 os.kill(child_pid, signal.SIGKILL) 

1340 except OSError: 

1341 pass 

1342 # Tell the main routine that the process was killed. 

1343 kill_check.set() 

1344 except OSError: 

1345 # It is possible that the process gets completed in the duration 

1346 # after timeout happens and before we try to kill the process. 

1347 pass 

1348 return 

1349 

1350 def communicate() -> Tuple[AnyStr, AnyStr]: 

1351 watchdog.start() 

1352 out, err = proc.communicate() 

1353 watchdog.cancel() 

1354 if kill_check.is_set(): 

1355 err = 'Timeout: the command "%s" did not complete in %d secs.' % ( 

1356 " ".join(redacted_command), 

1357 timeout, 

1358 ) 

1359 if not universal_newlines: 

1360 err = err.encode(defenc) 

1361 return out, err 

1362 

1363 # END helpers 

1364 

1365 kill_check = threading.Event() 

1366 watchdog = threading.Timer(timeout, kill_process, args=(proc.pid,)) 

1367 else: 

1368 communicate = proc.communicate 

1369 

1370 # Wait for the process to return. 

1371 status = 0 

1372 stdout_value: Union[str, bytes] = b"" 

1373 stderr_value: Union[str, bytes] = b"" 

1374 newline = "\n" if universal_newlines else b"\n" 

1375 try: 

1376 if output_stream is None: 

1377 stdout_value, stderr_value = communicate() 

1378 # Strip trailing "\n". 

1379 if stdout_value is not None and stdout_value.endswith(newline) and strip_newline_in_stdout: # type: ignore[arg-type] 

1380 stdout_value = stdout_value[:-1] 

1381 if stderr_value is not None and stderr_value.endswith(newline): # type: ignore[arg-type] 

1382 stderr_value = stderr_value[:-1] 

1383 

1384 status = proc.returncode 

1385 else: 

1386 max_chunk_size = max_chunk_size if max_chunk_size and max_chunk_size > 0 else io.DEFAULT_BUFFER_SIZE 

1387 if proc.stdout is not None: 

1388 stream_copy(proc.stdout, output_stream, max_chunk_size) 

1389 stdout_value = proc.stdout.read() 

1390 if proc.stderr is not None: 

1391 stderr_value = proc.stderr.read() 

1392 # Strip trailing "\n". 

1393 if stderr_value is not None and stderr_value.endswith(newline): # type: ignore[arg-type] 

1394 stderr_value = stderr_value[:-1] 

1395 status = proc.wait() 

1396 # END stdout handling 

1397 finally: 

1398 if proc.stdout is not None: 

1399 proc.stdout.close() 

1400 if proc.stderr is not None: 

1401 proc.stderr.close() 

1402 

1403 if self.GIT_PYTHON_TRACE == "full": 

1404 cmdstr = " ".join(redacted_command) 

1405 

1406 def as_text(stdout_value: Union[bytes, str]) -> str: 

1407 return not output_stream and safe_decode(stdout_value) or "<OUTPUT_STREAM>" 

1408 

1409 # END as_text 

1410 

1411 if stderr_value: 

1412 _logger.info( 

1413 "%s -> %d; stdout: '%s'; stderr: '%s'", 

1414 cmdstr, 

1415 status, 

1416 as_text(stdout_value), 

1417 safe_decode(stderr_value), 

1418 ) 

1419 elif stdout_value: 

1420 _logger.info("%s -> %d; stdout: '%s'", cmdstr, status, as_text(stdout_value)) 

1421 else: 

1422 _logger.info("%s -> %d", cmdstr, status) 

1423 # END handle debug printing 

1424 

1425 if with_exceptions and status != 0: 

1426 raise GitCommandError(redacted_command, status, stderr_value, stdout_value) 

1427 

1428 if isinstance(stdout_value, bytes) and stdout_as_string: # Could also be output_stream. 

1429 stdout_value = safe_decode(stdout_value) 

1430 

1431 # Allow access to the command's status code. 

1432 if with_extended_output: 

1433 return (status, stdout_value, safe_decode(stderr_value)) 

1434 else: 

1435 return stdout_value 

1436 

1437 def environment(self) -> Dict[str, str]: 

1438 return self._environment 

1439 

1440 def update_environment(self, **kwargs: Any) -> Dict[str, Union[str, None]]: 

1441 """Set environment variables for future git invocations. Return all changed 

1442 values in a format that can be passed back into this function to revert the 

1443 changes. 

1444 

1445 Examples:: 

1446 

1447 old_env = self.update_environment(PWD='/tmp') 

1448 self.update_environment(**old_env) 

1449 

1450 :param kwargs: 

1451 Environment variables to use for git processes. 

1452 

1453 :return: 

1454 Dict that maps environment variables to their old values 

1455 """ 

1456 old_env = {} 

1457 for key, value in kwargs.items(): 

1458 # Set value if it is None. 

1459 if value is not None: 

1460 old_env[key] = self._environment.get(key) 

1461 self._environment[key] = value 

1462 # Remove key from environment if its value is None. 

1463 elif key in self._environment: 

1464 old_env[key] = self._environment[key] 

1465 del self._environment[key] 

1466 return old_env 

1467 

1468 @contextlib.contextmanager 

1469 def custom_environment(self, **kwargs: Any) -> Iterator[None]: 

1470 """A context manager around the above :meth:`update_environment` method to 

1471 restore the environment back to its previous state after operation. 

1472 

1473 Examples:: 

1474 

1475 with self.custom_environment(GIT_SSH='/bin/ssh_wrapper'): 

1476 repo.remotes.origin.fetch() 

1477 

1478 :param kwargs: 

1479 See :meth:`update_environment`. 

1480 """ 

1481 old_env = self.update_environment(**kwargs) 

1482 try: 

1483 yield 

1484 finally: 

1485 self.update_environment(**old_env) 

1486 

1487 def transform_kwarg(self, name: str, value: Any, split_single_char_options: bool) -> List[str]: 

1488 if len(name) == 1: 

1489 if value is True: 

1490 return ["-%s" % name] 

1491 elif value not in (False, None): 

1492 if split_single_char_options: 

1493 return ["-%s" % name, "%s" % value] 

1494 else: 

1495 return ["-%s%s" % (name, value)] 

1496 else: 

1497 if value is True: 

1498 return ["--%s" % dashify(name)] 

1499 elif value is not False and value is not None: 

1500 return ["--%s=%s" % (dashify(name), value)] 

1501 return [] 

1502 

1503 def transform_kwargs(self, split_single_char_options: bool = True, **kwargs: Any) -> List[str]: 

1504 """Transform Python-style kwargs into git command line options.""" 

1505 args = [] 

1506 for k, v in kwargs.items(): 

1507 if isinstance(v, (list, tuple)): 

1508 for value in v: 

1509 args += self.transform_kwarg(k, value, split_single_char_options) 

1510 else: 

1511 args += self.transform_kwarg(k, v, split_single_char_options) 

1512 return args 

1513 

1514 @classmethod 

1515 def _unpack_args(cls, arg_list: Sequence[str]) -> List[str]: 

1516 outlist = [] 

1517 if isinstance(arg_list, (list, tuple)): 

1518 for arg in arg_list: 

1519 outlist.extend(cls._unpack_args(arg)) 

1520 else: 

1521 outlist.append(str(arg_list)) 

1522 

1523 return outlist 

1524 

1525 def __call__(self, **kwargs: Any) -> "Git": 

1526 """Specify command line options to the git executable for a subcommand call. 

1527 

1528 :param kwargs: 

1529 A dict of keyword arguments. 

1530 These arguments are passed as in :meth:`_call_process`, but will be passed 

1531 to the git command rather than the subcommand. 

1532 

1533 Examples:: 

1534 

1535 git(work_tree='/tmp').difftool() 

1536 """ 

1537 self._git_options = self.transform_kwargs(split_single_char_options=True, **kwargs) 

1538 return self 

1539 

1540 @overload 

1541 def _call_process( 

1542 self, method: str, *args: None, **kwargs: None 

1543 ) -> str: ... # If no args were given, execute the call with all defaults. 

1544 

1545 @overload 

1546 def _call_process( 

1547 self, 

1548 method: str, 

1549 istream: int, 

1550 as_process: Literal[True], 

1551 *args: Any, 

1552 **kwargs: Any, 

1553 ) -> "Git.AutoInterrupt": ... 

1554 

1555 @overload 

1556 def _call_process( 

1557 self, method: str, *args: Any, **kwargs: Any 

1558 ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]: ... 

1559 

1560 def _call_process( 

1561 self, method: str, *args: Any, **kwargs: Any 

1562 ) -> Union[str, bytes, Tuple[int, Union[str, bytes], str], "Git.AutoInterrupt"]: 

1563 """Run the given git command with the specified arguments and return the result 

1564 as a string. 

1565 

1566 :param method: 

1567 The command. Contained ``_`` characters will be converted to hyphens, such 

1568 as in ``ls_files`` to call ``ls-files``. 

1569 

1570 :param args: 

1571 The list of arguments. If ``None`` is included, it will be pruned. 

1572 This allows your commands to call git more conveniently, as ``None`` is 

1573 realized as non-existent. 

1574 

1575 :param kwargs: 

1576 Contains key-values for the following: 

1577 

1578 - The :meth:`execute()` kwds, as listed in ``execute_kwargs``. 

1579 - "Command options" to be converted by :meth:`transform_kwargs`. 

1580 - The ``insert_kwargs_after`` key which its value must match one of 

1581 ``*args``. 

1582 

1583 It also contains any command options, to be appended after the matched arg. 

1584 

1585 Examples:: 

1586 

1587 git.rev_list('master', max_count=10, header=True) 

1588 

1589 turns into:: 

1590 

1591 git rev-list --max-count=10 --header=master 

1592 

1593 :return: 

1594 Same as :meth:`execute`. If no args are given, used :meth:`execute`'s 

1595 default (especially ``as_process = False``, ``stdout_as_string = True``) and 

1596 return :class:`str`. 

1597 """ 

1598 # Handle optional arguments prior to calling transform_kwargs. 

1599 # Otherwise these'll end up in args, which is bad. 

1600 exec_kwargs = {k: v for k, v in kwargs.items() if k in execute_kwargs} 

1601 opts_kwargs = {k: v for k, v in kwargs.items() if k not in execute_kwargs} 

1602 

1603 insert_after_this_arg = opts_kwargs.pop("insert_kwargs_after", None) 

1604 

1605 # Prepare the argument list. 

1606 

1607 opt_args = self.transform_kwargs(**opts_kwargs) 

1608 ext_args = self._unpack_args([a for a in args if a is not None]) 

1609 

1610 if insert_after_this_arg is None: 

1611 args_list = opt_args + ext_args 

1612 else: 

1613 try: 

1614 index = ext_args.index(insert_after_this_arg) 

1615 except ValueError as err: 

1616 raise ValueError( 

1617 "Couldn't find argument '%s' in args %s to insert cmd options after" 

1618 % (insert_after_this_arg, str(ext_args)) 

1619 ) from err 

1620 # END handle error 

1621 args_list = ext_args[: index + 1] + opt_args + ext_args[index + 1 :] 

1622 # END handle opts_kwargs 

1623 

1624 call = [self.GIT_PYTHON_GIT_EXECUTABLE] 

1625 

1626 # Add persistent git options. 

1627 call.extend(self._persistent_git_options) 

1628 

1629 # Add the git options, then reset to empty to avoid side effects. 

1630 call.extend(self._git_options) 

1631 self._git_options = () 

1632 

1633 call.append(dashify(method)) 

1634 call.extend(args_list) 

1635 

1636 return self.execute(call, **exec_kwargs) 

1637 

1638 def _parse_object_header(self, header_line: str) -> Tuple[str, str, int]: 

1639 """ 

1640 :param header_line: 

1641 A line of the form:: 

1642 

1643 <hex_sha> type_string size_as_int 

1644 

1645 :return: 

1646 (hex_sha, type_string, size_as_int) 

1647 

1648 :raise ValueError: 

1649 If the header contains indication for an error due to incorrect input sha. 

1650 """ 

1651 tokens = header_line.split() 

1652 if len(tokens) != 3: 

1653 if not tokens: 

1654 err_msg = ( 

1655 f"SHA is empty, possible dubious ownership in the repository " 

1656 f"""at {self._working_dir}.\n If this is unintended run:\n\n """ 

1657 f""" "git config --global --add safe.directory {self._working_dir}" """ 

1658 ) 

1659 raise ValueError(err_msg) 

1660 else: 

1661 raise ValueError("SHA %s could not be resolved, git returned: %r" % (tokens[0], header_line.strip())) 

1662 # END handle actual return value 

1663 # END error handling 

1664 

1665 if len(tokens[0]) != 40: 

1666 raise ValueError("Failed to parse header: %r" % header_line) 

1667 return (tokens[0], tokens[1], int(tokens[2])) 

1668 

1669 def _prepare_ref(self, ref: AnyStr) -> bytes: 

1670 # Required for command to separate refs on stdin, as bytes. 

1671 if isinstance(ref, bytes): 

1672 # Assume 40 bytes hexsha - bin-to-ascii for some reason returns bytes, not text. 

1673 refstr: str = ref.decode("ascii") 

1674 elif not isinstance(ref, str): 

1675 refstr = str(ref) # Could be ref-object. 

1676 else: 

1677 refstr = ref 

1678 

1679 if not refstr.endswith("\n"): 

1680 refstr += "\n" 

1681 return refstr.encode(defenc) 

1682 

1683 def _get_persistent_cmd(self, attr_name: str, cmd_name: str, *args: Any, **kwargs: Any) -> "Git.AutoInterrupt": 

1684 cur_val = getattr(self, attr_name) 

1685 if cur_val is not None: 

1686 return cur_val 

1687 

1688 options = {"istream": PIPE, "as_process": True} 

1689 options.update(kwargs) 

1690 

1691 cmd = self._call_process(cmd_name, *args, **options) 

1692 setattr(self, attr_name, cmd) 

1693 cmd = cast("Git.AutoInterrupt", cmd) 

1694 return cmd 

1695 

1696 def __get_object_header(self, cmd: "Git.AutoInterrupt", ref: AnyStr) -> Tuple[str, str, int]: 

1697 if cmd.stdin and cmd.stdout: 

1698 cmd.stdin.write(self._prepare_ref(ref)) 

1699 cmd.stdin.flush() 

1700 return self._parse_object_header(cmd.stdout.readline()) 

1701 else: 

1702 raise ValueError("cmd stdin was empty") 

1703 

1704 def get_object_header(self, ref: str) -> Tuple[str, str, int]: 

1705 """Use this method to quickly examine the type and size of the object behind the 

1706 given ref. 

1707 

1708 :note: 

1709 The method will only suffer from the costs of command invocation once and 

1710 reuses the command in subsequent calls. 

1711 

1712 :return: 

1713 (hexsha, type_string, size_as_int) 

1714 """ 

1715 cmd = self._get_persistent_cmd("cat_file_header", "cat_file", batch_check=True) 

1716 return self.__get_object_header(cmd, ref) 

1717 

1718 def get_object_data(self, ref: str) -> Tuple[str, str, int, bytes]: 

1719 """Similar to :meth:`get_object_header`, but returns object data as well. 

1720 

1721 :return: 

1722 (hexsha, type_string, size_as_int, data_string) 

1723 

1724 :note: 

1725 Not threadsafe. 

1726 """ 

1727 hexsha, typename, size, stream = self.stream_object_data(ref) 

1728 data = stream.read(size) 

1729 del stream 

1730 return (hexsha, typename, size, data) 

1731 

1732 def stream_object_data(self, ref: str) -> Tuple[str, str, int, "Git.CatFileContentStream"]: 

1733 """Similar to :meth:`get_object_data`, but returns the data as a stream. 

1734 

1735 :return: 

1736 (hexsha, type_string, size_as_int, stream) 

1737 

1738 :note: 

1739 This method is not threadsafe. You need one independent :class:`Git` 

1740 instance per thread to be safe! 

1741 """ 

1742 cmd = self._get_persistent_cmd("cat_file_all", "cat_file", batch=True) 

1743 hexsha, typename, size = self.__get_object_header(cmd, ref) 

1744 cmd_stdout = cmd.stdout if cmd.stdout is not None else io.BytesIO() 

1745 return (hexsha, typename, size, self.CatFileContentStream(size, cmd_stdout)) 

1746 

1747 def clear_cache(self) -> "Git": 

1748 """Clear all kinds of internal caches to release resources. 

1749 

1750 Currently persistent commands will be interrupted. 

1751 

1752 :return: 

1753 self 

1754 """ 

1755 for cmd in (self.cat_file_all, self.cat_file_header): 

1756 if cmd: 

1757 cmd.__del__() 

1758 

1759 self.cat_file_all = None 

1760 self.cat_file_header = None 

1761 return self