Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/git/util.py: 55%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

557 statements  

1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

2# 

3# This module is part of GitPython and is released under the 

4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ 

5 

6import sys 

7 

8__all__ = [ 

9 "stream_copy", 

10 "join_path", 

11 "to_native_path_linux", 

12 "join_path_native", 

13 "Stats", 

14 "IndexFileSHA1Writer", 

15 "IterableObj", 

16 "IterableList", 

17 "BlockingLockFile", 

18 "LockFile", 

19 "Actor", 

20 "get_user_id", 

21 "assure_directory_exists", 

22 "RemoteProgress", 

23 "CallableRemoteProgress", 

24 "rmtree", 

25 "unbare_repo", 

26 "HIDE_WINDOWS_KNOWN_ERRORS", 

27] 

28 

29if sys.platform == "win32": 

30 __all__.append("to_native_path_windows") 

31 

32from abc import abstractmethod 

33import contextlib 

34from functools import wraps 

35import getpass 

36import logging 

37import os 

38import os.path as osp 

39import pathlib 

40import platform 

41import re 

42import shutil 

43import stat 

44import subprocess 

45import time 

46from urllib.parse import urlsplit, urlunsplit 

47import warnings 

48 

49# NOTE: Unused imports can be improved now that CI testing has fully resumed. Some of 

50# these be used indirectly through other GitPython modules, which avoids having to write 

51# gitdb all the time in their imports. They are not in __all__, at least currently, 

52# because they could be removed or changed at any time, and so should not be considered 

53# conceptually public to code outside GitPython. Linters of course do not like it. 

54from gitdb.util import ( 

55 LazyMixin, # noqa: F401 

56 LockedFD, # noqa: F401 

57 bin_to_hex, # noqa: F401 

58 file_contents_ro, # noqa: F401 

59 file_contents_ro_filepath, # noqa: F401 

60 hex_to_bin, # noqa: F401 

61 make_sha, 

62 to_bin_sha, # noqa: F401 

63 to_hex_sha, # noqa: F401 

64) 

65 

66# typing --------------------------------------------------------- 

67 

68from typing import ( 

69 Any, 

70 AnyStr, 

71 BinaryIO, 

72 Callable, 

73 Dict, 

74 Generator, 

75 IO, 

76 Iterator, 

77 List, 

78 Optional, 

79 Pattern, 

80 Sequence, 

81 Tuple, 

82 TYPE_CHECKING, 

83 TypeVar, 

84 Union, 

85 cast, 

86 overload, 

87) 

88 

89if TYPE_CHECKING: 

90 from git.cmd import Git 

91 from git.config import GitConfigParser, SectionConstraint 

92 from git.remote import Remote 

93 from git.repo.base import Repo 

94 

95from git.types import ( 

96 Files_TD, 

97 Has_id_attribute, 

98 HSH_TD, 

99 Literal, 

100 PathLike, 

101 Protocol, 

102 SupportsIndex, 

103 Total_TD, 

104 runtime_checkable, 

105) 

106 

107# --------------------------------------------------------------------- 

108 

109T_IterableObj = TypeVar("T_IterableObj", bound=Union["IterableObj", "Has_id_attribute"], covariant=True) 

110# So IterableList[Head] is subtype of IterableList[IterableObj]. 

111 

112_logger = logging.getLogger(__name__) 

113 

114 

115def _read_env_flag(name: str, default: bool) -> bool: 

116 """Read a boolean flag from an environment variable. 

117 

118 :return: 

119 The flag, or the `default` value if absent or ambiguous. 

120 """ 

121 try: 

122 value = os.environ[name] 

123 except KeyError: 

124 return default 

125 

126 _logger.warning( 

127 "The %s environment variable is deprecated. Its effect has never been documented and changes without warning.", 

128 name, 

129 ) 

130 

131 adjusted_value = value.strip().lower() 

132 

133 if adjusted_value in {"", "0", "false", "no"}: 

134 return False 

135 if adjusted_value in {"1", "true", "yes"}: 

136 return True 

137 _logger.warning("%s has unrecognized value %r, treating as %r.", name, value, default) 

138 return default 

139 

140 

141def _read_win_env_flag(name: str, default: bool) -> bool: 

142 """Read a boolean flag from an environment variable on Windows. 

143 

144 :return: 

145 On Windows, the flag, or the `default` value if absent or ambiguous. 

146 On all other operating systems, ``False``. 

147 

148 :note: 

149 This only accesses the environment on Windows. 

150 """ 

151 return sys.platform == "win32" and _read_env_flag(name, default) 

152 

153 

154#: We need an easy way to see if Appveyor TCs start failing, 

155#: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy, 

156#: till then, we wish to hide them. 

157HIDE_WINDOWS_KNOWN_ERRORS = _read_win_env_flag("HIDE_WINDOWS_KNOWN_ERRORS", True) 

158HIDE_WINDOWS_FREEZE_ERRORS = _read_win_env_flag("HIDE_WINDOWS_FREEZE_ERRORS", True) 

159 

160# { Utility Methods 

161 

162T = TypeVar("T") 

163 

164 

165def unbare_repo(func: Callable[..., T]) -> Callable[..., T]: 

166 """Methods with this decorator raise :exc:`~git.exc.InvalidGitRepositoryError` if 

167 they encounter a bare repository.""" 

168 

169 from .exc import InvalidGitRepositoryError 

170 

171 @wraps(func) 

172 def wrapper(self: "Remote", *args: Any, **kwargs: Any) -> T: 

173 if self.repo.bare: 

174 raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__) 

175 # END bare method 

176 return func(self, *args, **kwargs) 

177 

178 # END wrapper 

179 

180 return wrapper 

181 

182 

183@contextlib.contextmanager 

184def cwd(new_dir: PathLike) -> Generator[PathLike, None, None]: 

185 """Context manager to temporarily change directory. 

186 

187 This is similar to :func:`contextlib.chdir` introduced in Python 3.11, but the 

188 context manager object returned by a single call to this function is not reentrant. 

189 """ 

190 old_dir = os.getcwd() 

191 os.chdir(new_dir) 

192 try: 

193 yield new_dir 

194 finally: 

195 os.chdir(old_dir) 

196 

197 

198@contextlib.contextmanager 

199def patch_env(name: str, value: str) -> Generator[None, None, None]: 

200 """Context manager to temporarily patch an environment variable.""" 

201 old_value = os.getenv(name) 

202 os.environ[name] = value 

203 try: 

204 yield 

205 finally: 

206 if old_value is None: 

207 del os.environ[name] 

208 else: 

209 os.environ[name] = old_value 

210 

211 

212def rmtree(path: PathLike) -> None: 

213 """Remove the given directory tree recursively. 

214 

215 :note: 

216 We use :func:`shutil.rmtree` but adjust its behaviour to see whether files that 

217 couldn't be deleted are read-only. Windows will not remove them in that case. 

218 """ 

219 

220 def handler(function: Callable, path: PathLike, _excinfo: Any) -> None: 

221 """Callback for :func:`shutil.rmtree`. 

222 

223 This works as either a ``onexc`` or ``onerror`` style callback. 

224 """ 

225 # Is the error an access error? 

226 os.chmod(path, stat.S_IWUSR) 

227 

228 try: 

229 function(path) 

230 except PermissionError as ex: 

231 if HIDE_WINDOWS_KNOWN_ERRORS: 

232 from unittest import SkipTest 

233 

234 raise SkipTest(f"FIXME: fails with: PermissionError\n {ex}") from ex 

235 raise 

236 

237 if sys.platform != "win32": 

238 shutil.rmtree(path) 

239 elif sys.version_info >= (3, 12): 

240 shutil.rmtree(path, onexc=handler) 

241 else: 

242 shutil.rmtree(path, onerror=handler) 

243 

244 

245def rmfile(path: PathLike) -> None: 

246 """Ensure file deleted also on *Windows* where read-only files need special 

247 treatment.""" 

248 if osp.isfile(path): 

249 if sys.platform == "win32": 

250 os.chmod(path, 0o777) 

251 os.remove(path) 

252 

253 

254def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int: 

255 """Copy all data from the `source` stream into the `destination` stream in chunks 

256 of size `chunk_size`. 

257 

258 :return: 

259 Number of bytes written 

260 """ 

261 br = 0 

262 while True: 

263 chunk = source.read(chunk_size) 

264 destination.write(chunk) 

265 br += len(chunk) 

266 if len(chunk) < chunk_size: 

267 break 

268 # END reading output stream 

269 return br 

270 

271 

272def join_path(a: PathLike, *p: PathLike) -> PathLike: 

273 R"""Join path tokens together similar to osp.join, but always use ``/`` instead of 

274 possibly ``\`` on Windows.""" 

275 path = str(a) 

276 for b in p: 

277 b = str(b) 

278 if not b: 

279 continue 

280 if b.startswith("/"): 

281 path += b[1:] 

282 elif path == "" or path.endswith("/"): 

283 path += b 

284 else: 

285 path += "/" + b 

286 # END for each path token to add 

287 return path 

288 

289 

290if sys.platform == "win32": 

291 

292 def to_native_path_windows(path: PathLike) -> PathLike: 

293 path = str(path) 

294 return path.replace("/", "\\") 

295 

296 def to_native_path_linux(path: PathLike) -> str: 

297 path = str(path) 

298 return path.replace("\\", "/") 

299 

300 to_native_path = to_native_path_windows 

301else: 

302 # No need for any work on Linux. 

303 def to_native_path_linux(path: PathLike) -> str: 

304 return str(path) 

305 

306 to_native_path = to_native_path_linux 

307 

308 

309def join_path_native(a: PathLike, *p: PathLike) -> PathLike: 

310 R"""Like :func:`join_path`, but makes sure an OS native path is returned. 

311 

312 This is only needed to play it safe on Windows and to ensure nice paths that only 

313 use ``\``. 

314 """ 

315 return to_native_path(join_path(a, *p)) 

316 

317 

318def assure_directory_exists(path: PathLike, is_file: bool = False) -> bool: 

319 """Make sure that the directory pointed to by path exists. 

320 

321 :param is_file: 

322 If ``True``, `path` is assumed to be a file and handled correctly. 

323 Otherwise it must be a directory. 

324 

325 :return: 

326 ``True`` if the directory was created, ``False`` if it already existed. 

327 """ 

328 if is_file: 

329 path = osp.dirname(path) 

330 # END handle file 

331 if not osp.isdir(path): 

332 os.makedirs(path, exist_ok=True) 

333 return True 

334 return False 

335 

336 

337def _get_exe_extensions() -> Sequence[str]: 

338 PATHEXT = os.environ.get("PATHEXT", None) 

339 if PATHEXT: 

340 return tuple(p.upper() for p in PATHEXT.split(os.pathsep)) 

341 elif sys.platform == "win32": 

342 return (".BAT", ".COM", ".EXE") 

343 else: 

344 return () 

345 

346 

347def py_where(program: str, path: Optional[PathLike] = None) -> List[str]: 

348 """Perform a path search to assist :func:`is_cygwin_git`. 

349 

350 This is not robust for general use. It is an implementation detail of 

351 :func:`is_cygwin_git`. When a search following all shell rules is needed, 

352 :func:`shutil.which` can be used instead. 

353 

354 :note: 

355 Neither this function nor :func:`shutil.which` will predict the effect of an 

356 executable search on a native Windows system due to a :class:`subprocess.Popen` 

357 call without ``shell=True``, because shell and non-shell executable search on 

358 Windows differ considerably. 

359 """ 

360 # From: http://stackoverflow.com/a/377028/548792 

361 winprog_exts = _get_exe_extensions() 

362 

363 def is_exec(fpath: str) -> bool: 

364 return ( 

365 osp.isfile(fpath) 

366 and os.access(fpath, os.X_OK) 

367 and ( 

368 sys.platform != "win32" or not winprog_exts or any(fpath.upper().endswith(ext) for ext in winprog_exts) 

369 ) 

370 ) 

371 

372 progs = [] 

373 if not path: 

374 path = os.environ["PATH"] 

375 for folder in str(path).split(os.pathsep): 

376 folder = folder.strip('"') 

377 if folder: 

378 exe_path = osp.join(folder, program) 

379 for f in [exe_path] + ["%s%s" % (exe_path, e) for e in winprog_exts]: 

380 if is_exec(f): 

381 progs.append(f) 

382 return progs 

383 

384 

385def _cygexpath(drive: Optional[str], path: str) -> str: 

386 if osp.isabs(path) and not drive: 

387 # Invoked from `cygpath()` directly with `D:Apps\123`? 

388 # It's an error, leave it alone just slashes) 

389 p = path # convert to str if AnyPath given 

390 else: 

391 p = path and osp.normpath(osp.expandvars(osp.expanduser(path))) 

392 if osp.isabs(p): 

393 if drive: 

394 # Confusing, maybe a remote system should expand vars. 

395 p = path 

396 else: 

397 p = cygpath(p) 

398 elif drive: 

399 p = "/proc/cygdrive/%s/%s" % (drive.lower(), p) 

400 p_str = str(p) # ensure it is a str and not AnyPath 

401 return p_str.replace("\\", "/") 

402 

403 

404_cygpath_parsers: Tuple[Tuple[Pattern[str], Callable, bool], ...] = ( 

405 # See: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx 

406 # and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths 

407 ( 

408 re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"), 

409 (lambda server, share, rest_path: "//%s/%s/%s" % (server, share, rest_path.replace("\\", "/"))), 

410 False, 

411 ), 

412 (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), (_cygexpath), False), 

413 (re.compile(r"(\w):[/\\](.*)"), (_cygexpath), False), 

414 (re.compile(r"file:(.*)", re.I), (lambda rest_path: rest_path), True), 

415 (re.compile(r"(\w{2,}:.*)"), (lambda url: url), False), # remote URL, do nothing 

416) 

417 

418 

419def cygpath(path: str) -> str: 

420 """Use :meth:`git.cmd.Git.polish_url` instead, that works on any environment.""" 

421 path = str(path) # Ensure is str and not AnyPath. 

422 # Fix to use Paths when 3.5 dropped. Or to be just str if only for URLs? 

423 if not path.startswith(("/cygdrive", "//", "/proc/cygdrive")): 

424 for regex, parser, recurse in _cygpath_parsers: 

425 match = regex.match(path) 

426 if match: 

427 path = parser(*match.groups()) 

428 if recurse: 

429 path = cygpath(path) 

430 break 

431 else: 

432 path = _cygexpath(None, path) 

433 

434 return path 

435 

436 

437_decygpath_regex = re.compile(r"(?:/proc)?/cygdrive/(\w)(/.*)?") 

438 

439 

440def decygpath(path: PathLike) -> str: 

441 path = str(path) 

442 m = _decygpath_regex.match(path) 

443 if m: 

444 drive, rest_path = m.groups() 

445 path = "%s:%s" % (drive.upper(), rest_path or "") 

446 

447 return path.replace("/", "\\") 

448 

449 

450#: Store boolean flags denoting if a specific Git executable 

451#: is from a Cygwin installation (since `cache_lru()` unsupported on PY2). 

452_is_cygwin_cache: Dict[str, Optional[bool]] = {} 

453 

454 

455def _is_cygwin_git(git_executable: str) -> bool: 

456 is_cygwin = _is_cygwin_cache.get(git_executable) # type: Optional[bool] 

457 if is_cygwin is None: 

458 is_cygwin = False 

459 try: 

460 git_dir = osp.dirname(git_executable) 

461 if not git_dir: 

462 res = py_where(git_executable) 

463 git_dir = osp.dirname(res[0]) if res else "" 

464 

465 # Just a name given, not a real path. 

466 uname_cmd = osp.join(git_dir, "uname") 

467 process = subprocess.Popen([uname_cmd], stdout=subprocess.PIPE, universal_newlines=True) 

468 uname_out, _ = process.communicate() 

469 # retcode = process.poll() 

470 is_cygwin = "CYGWIN" in uname_out 

471 except Exception as ex: 

472 _logger.debug("Failed checking if running in CYGWIN due to: %r", ex) 

473 _is_cygwin_cache[git_executable] = is_cygwin 

474 

475 return is_cygwin 

476 

477 

478@overload 

479def is_cygwin_git(git_executable: None) -> Literal[False]: ... 

480 

481 

482@overload 

483def is_cygwin_git(git_executable: PathLike) -> bool: ... 

484 

485 

486def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool: 

487 if sys.platform == "win32": # TODO: See if we can use `sys.platform != "cygwin"`. 

488 return False 

489 elif git_executable is None: 

490 return False 

491 else: 

492 return _is_cygwin_git(str(git_executable)) 

493 

494 

495def get_user_id() -> str: 

496 """:return: String identifying the currently active system user as ``name@node``""" 

497 return "%s@%s" % (getpass.getuser(), platform.node()) 

498 

499 

500def finalize_process(proc: Union[subprocess.Popen, "Git.AutoInterrupt"], **kwargs: Any) -> None: 

501 """Wait for the process (clone, fetch, pull or push) and handle its errors 

502 accordingly.""" 

503 # TODO: No close proc-streams?? 

504 proc.wait(**kwargs) 

505 

506 

507@overload 

508def expand_path(p: None, expand_vars: bool = ...) -> None: ... 

509 

510 

511@overload 

512def expand_path(p: PathLike, expand_vars: bool = ...) -> str: 

513 # TODO: Support for Python 3.5 has been dropped, so these overloads can be improved. 

514 ... 

515 

516 

517def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[PathLike]: 

518 if isinstance(p, pathlib.Path): 

519 return p.resolve() 

520 try: 

521 p = osp.expanduser(p) # type: ignore[arg-type] 

522 if expand_vars: 

523 p = osp.expandvars(p) 

524 return osp.normpath(osp.abspath(p)) 

525 except Exception: 

526 return None 

527 

528 

529def remove_password_if_present(cmdline: Sequence[str]) -> List[str]: 

530 """Parse any command line argument and if one of the elements is an URL with a 

531 username and/or password, replace them by stars (in-place). 

532 

533 If nothing is found, this just returns the command line as-is. 

534 

535 This should be used for every log line that print a command line, as well as 

536 exception messages. 

537 """ 

538 new_cmdline = [] 

539 for index, to_parse in enumerate(cmdline): 

540 new_cmdline.append(to_parse) 

541 try: 

542 url = urlsplit(to_parse) 

543 # Remove password from the URL if present. 

544 if url.password is None and url.username is None: 

545 continue 

546 

547 if url.password is not None: 

548 url = url._replace(netloc=url.netloc.replace(url.password, "*****")) 

549 if url.username is not None: 

550 url = url._replace(netloc=url.netloc.replace(url.username, "*****")) 

551 new_cmdline[index] = urlunsplit(url) 

552 except ValueError: 

553 # This is not a valid URL. 

554 continue 

555 return new_cmdline 

556 

557 

558# } END utilities 

559 

560# { Classes 

561 

562 

563class RemoteProgress: 

564 """Handler providing an interface to parse progress information emitted by 

565 :manpage:`git-push(1)` and :manpage:`git-fetch(1)` and to dispatch callbacks 

566 allowing subclasses to react to the progress.""" 

567 

568 _num_op_codes: int = 9 

569 ( 

570 BEGIN, 

571 END, 

572 COUNTING, 

573 COMPRESSING, 

574 WRITING, 

575 RECEIVING, 

576 RESOLVING, 

577 FINDING_SOURCES, 

578 CHECKING_OUT, 

579 ) = [1 << x for x in range(_num_op_codes)] 

580 STAGE_MASK = BEGIN | END 

581 OP_MASK = ~STAGE_MASK 

582 

583 DONE_TOKEN = "done." 

584 TOKEN_SEPARATOR = ", " 

585 

586 __slots__ = ( 

587 "_cur_line", 

588 "_seen_ops", 

589 "error_lines", # Lines that started with 'error:' or 'fatal:'. 

590 "other_lines", # Lines not denoting progress (i.e.g. push-infos). 

591 ) 

592 re_op_absolute = re.compile(r"(remote: )?([\w\s]+):\s+()(\d+)()(.*)") 

593 re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)") 

594 

595 def __init__(self) -> None: 

596 self._seen_ops: List[int] = [] 

597 self._cur_line: Optional[str] = None 

598 self.error_lines: List[str] = [] 

599 self.other_lines: List[str] = [] 

600 

601 def _parse_progress_line(self, line: AnyStr) -> None: 

602 """Parse progress information from the given line as retrieved by 

603 :manpage:`git-push(1)` or :manpage:`git-fetch(1)`. 

604 

605 - Lines that do not contain progress info are stored in :attr:`other_lines`. 

606 - Lines that seem to contain an error (i.e. start with ``error:`` or ``fatal:``) 

607 are stored in :attr:`error_lines`. 

608 """ 

609 # handle 

610 # Counting objects: 4, done. 

611 # Compressing objects: 50% (1/2) 

612 # Compressing objects: 100% (2/2) 

613 # Compressing objects: 100% (2/2), done. 

614 if isinstance(line, bytes): # mypy argues about ternary assignment. 

615 line_str = line.decode("utf-8") 

616 else: 

617 line_str = line 

618 self._cur_line = line_str 

619 

620 if self._cur_line.startswith(("error:", "fatal:")): 

621 self.error_lines.append(self._cur_line) 

622 return 

623 

624 cur_count, max_count = None, None 

625 match = self.re_op_relative.match(line_str) 

626 if match is None: 

627 match = self.re_op_absolute.match(line_str) 

628 

629 if not match: 

630 self.line_dropped(line_str) 

631 self.other_lines.append(line_str) 

632 return 

633 # END could not get match 

634 

635 op_code = 0 

636 _remote, op_name, _percent, cur_count, max_count, message = match.groups() 

637 

638 # Get operation ID. 

639 if op_name == "Counting objects": 

640 op_code |= self.COUNTING 

641 elif op_name == "Compressing objects": 

642 op_code |= self.COMPRESSING 

643 elif op_name == "Writing objects": 

644 op_code |= self.WRITING 

645 elif op_name == "Receiving objects": 

646 op_code |= self.RECEIVING 

647 elif op_name == "Resolving deltas": 

648 op_code |= self.RESOLVING 

649 elif op_name == "Finding sources": 

650 op_code |= self.FINDING_SOURCES 

651 elif op_name == "Checking out files": 

652 op_code |= self.CHECKING_OUT 

653 else: 

654 # Note: On Windows it can happen that partial lines are sent. 

655 # Hence we get something like "CompreReceiving objects", which is 

656 # a blend of "Compressing objects" and "Receiving objects". 

657 # This can't really be prevented, so we drop the line verbosely 

658 # to make sure we get informed in case the process spits out new 

659 # commands at some point. 

660 self.line_dropped(line_str) 

661 # Note: Don't add this line to the other lines, as we have to silently 

662 # drop it. 

663 return 

664 # END handle op code 

665 

666 # Figure out stage. 

667 if op_code not in self._seen_ops: 

668 self._seen_ops.append(op_code) 

669 op_code |= self.BEGIN 

670 # END begin opcode 

671 

672 if message is None: 

673 message = "" 

674 # END message handling 

675 

676 message = message.strip() 

677 if message.endswith(self.DONE_TOKEN): 

678 op_code |= self.END 

679 message = message[: -len(self.DONE_TOKEN)] 

680 # END end message handling 

681 message = message.strip(self.TOKEN_SEPARATOR) 

682 

683 self.update( 

684 op_code, 

685 cur_count and float(cur_count), 

686 max_count and float(max_count), 

687 message, 

688 ) 

689 

690 def new_message_handler(self) -> Callable[[str], None]: 

691 """ 

692 :return: 

693 A progress handler suitable for :func:`~git.cmd.handle_process_output`, 

694 passing lines on to this progress handler in a suitable format. 

695 """ 

696 

697 def handler(line: AnyStr) -> None: 

698 return self._parse_progress_line(line.rstrip()) 

699 

700 # END handler 

701 

702 return handler 

703 

704 def line_dropped(self, line: str) -> None: 

705 """Called whenever a line could not be understood and was therefore dropped.""" 

706 pass 

707 

708 def update( 

709 self, 

710 op_code: int, 

711 cur_count: Union[str, float], 

712 max_count: Union[str, float, None] = None, 

713 message: str = "", 

714 ) -> None: 

715 """Called whenever the progress changes. 

716 

717 :param op_code: 

718 Integer allowing to be compared against Operation IDs and stage IDs. 

719 

720 Stage IDs are :const:`BEGIN` and :const:`END`. :const:`BEGIN` will only be 

721 set once for each Operation ID as well as :const:`END`. It may be that 

722 :const:`BEGIN` and :const:`END` are set at once in case only one progress 

723 message was emitted due to the speed of the operation. Between 

724 :const:`BEGIN` and :const:`END`, none of these flags will be set. 

725 

726 Operation IDs are all held within the :const:`OP_MASK`. Only one Operation 

727 ID will be active per call. 

728 

729 :param cur_count: 

730 Current absolute count of items. 

731 

732 :param max_count: 

733 The maximum count of items we expect. It may be ``None`` in case there is no 

734 maximum number of items or if it is (yet) unknown. 

735 

736 :param message: 

737 In case of the :const:`WRITING` operation, it contains the amount of bytes 

738 transferred. It may possibly be used for other purposes as well. 

739 

740 :note: 

741 You may read the contents of the current line in 

742 :attr:`self._cur_line <_cur_line>`. 

743 """ 

744 pass 

745 

746 

747class CallableRemoteProgress(RemoteProgress): 

748 """A :class:`RemoteProgress` implementation forwarding updates to any callable. 

749 

750 :note: 

751 Like direct instances of :class:`RemoteProgress`, instances of this 

752 :class:`CallableRemoteProgress` class are not themselves directly callable. 

753 Rather, instances of this class wrap a callable and forward to it. This should 

754 therefore not be confused with :class:`git.types.CallableProgress`. 

755 """ 

756 

757 __slots__ = ("_callable",) 

758 

759 def __init__(self, fn: Callable) -> None: 

760 self._callable = fn 

761 super().__init__() 

762 

763 def update(self, *args: Any, **kwargs: Any) -> None: 

764 self._callable(*args, **kwargs) 

765 

766 

767class Actor: 

768 """Actors hold information about a person acting on the repository. They can be 

769 committers and authors or anything with a name and an email as mentioned in the git 

770 log entries.""" 

771 

772 # PRECOMPILED REGEX 

773 name_only_regex = re.compile(r"<(.*)>") 

774 name_email_regex = re.compile(r"(.*) <(.*?)>") 

775 

776 # ENVIRONMENT VARIABLES 

777 # These are read when creating new commits. 

778 env_author_name = "GIT_AUTHOR_NAME" 

779 env_author_email = "GIT_AUTHOR_EMAIL" 

780 env_committer_name = "GIT_COMMITTER_NAME" 

781 env_committer_email = "GIT_COMMITTER_EMAIL" 

782 

783 # CONFIGURATION KEYS 

784 conf_name = "name" 

785 conf_email = "email" 

786 

787 __slots__ = ("name", "email") 

788 

789 def __init__(self, name: Optional[str], email: Optional[str]) -> None: 

790 self.name = name 

791 self.email = email 

792 

793 def __eq__(self, other: Any) -> bool: 

794 return self.name == other.name and self.email == other.email 

795 

796 def __ne__(self, other: Any) -> bool: 

797 return not (self == other) 

798 

799 def __hash__(self) -> int: 

800 return hash((self.name, self.email)) 

801 

802 def __str__(self) -> str: 

803 return self.name if self.name else "" 

804 

805 def __repr__(self) -> str: 

806 return '<git.Actor "%s <%s>">' % (self.name, self.email) 

807 

808 @classmethod 

809 def _from_string(cls, string: str) -> "Actor": 

810 """Create an :class:`Actor` from a string. 

811 

812 :param string: 

813 The string, which is expected to be in regular git format:: 

814 

815 John Doe <jdoe@example.com> 

816 

817 :return: 

818 :class:`Actor` 

819 """ 

820 m = cls.name_email_regex.search(string) 

821 if m: 

822 name, email = m.groups() 

823 return Actor(name, email) 

824 else: 

825 m = cls.name_only_regex.search(string) 

826 if m: 

827 return Actor(m.group(1), None) 

828 # Assume the best and use the whole string as name. 

829 return Actor(string, None) 

830 # END special case name 

831 # END handle name/email matching 

832 

833 @classmethod 

834 def _main_actor( 

835 cls, 

836 env_name: str, 

837 env_email: str, 

838 config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None, 

839 ) -> "Actor": 

840 actor = Actor("", "") 

841 user_id = None # We use this to avoid multiple calls to getpass.getuser(). 

842 

843 def default_email() -> str: 

844 nonlocal user_id 

845 if not user_id: 

846 user_id = get_user_id() 

847 return user_id 

848 

849 def default_name() -> str: 

850 return default_email().split("@")[0] 

851 

852 for attr, evar, cvar, default in ( 

853 ("name", env_name, cls.conf_name, default_name), 

854 ("email", env_email, cls.conf_email, default_email), 

855 ): 

856 try: 

857 val = os.environ[evar] 

858 setattr(actor, attr, val) 

859 except KeyError: 

860 if config_reader is not None: 

861 try: 

862 val = config_reader.get("user", cvar) 

863 except Exception: 

864 val = default() 

865 setattr(actor, attr, val) 

866 # END config-reader handling 

867 if not getattr(actor, attr): 

868 setattr(actor, attr, default()) 

869 # END handle name 

870 # END for each item to retrieve 

871 return actor 

872 

873 @classmethod 

874 def committer(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor": 

875 """ 

876 :return: 

877 :class:`Actor` instance corresponding to the configured committer. It 

878 behaves similar to the git implementation, such that the environment will 

879 override configuration values of `config_reader`. If no value is set at all, 

880 it will be generated. 

881 

882 :param config_reader: 

883 ConfigReader to use to retrieve the values from in case they are not set in 

884 the environment. 

885 """ 

886 return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader) 

887 

888 @classmethod 

889 def author(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor": 

890 """Same as :meth:`committer`, but defines the main author. It may be specified 

891 in the environment, but defaults to the committer.""" 

892 return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader) 

893 

894 

895class Stats: 

896 """Represents stat information as presented by git at the end of a merge. It is 

897 created from the output of a diff operation. 

898 

899 Example:: 

900 

901 c = Commit( sha1 ) 

902 s = c.stats 

903 s.total # full-stat-dict 

904 s.files # dict( filepath : stat-dict ) 

905 

906 ``stat-dict`` 

907 

908 A dictionary with the following keys and values:: 

909 

910 deletions = number of deleted lines as int 

911 insertions = number of inserted lines as int 

912 lines = total number of lines changed as int, or deletions + insertions 

913 change_type = type of change as str, A|C|D|M|R|T|U|X|B 

914 

915 ``full-stat-dict`` 

916 

917 In addition to the items in the stat-dict, it features additional information:: 

918 

919 files = number of changed files as int 

920 """ 

921 

922 __slots__ = ("total", "files") 

923 

924 def __init__(self, total: Total_TD, files: Dict[PathLike, Files_TD]) -> None: 

925 self.total = total 

926 self.files = files 

927 

928 @classmethod 

929 def _list_from_string(cls, repo: "Repo", text: str) -> "Stats": 

930 """Create a :class:`Stats` object from output retrieved by 

931 :manpage:`git-diff(1)`. 

932 

933 :return: 

934 :class:`git.Stats` 

935 """ 

936 

937 hsh: HSH_TD = { 

938 "total": {"insertions": 0, "deletions": 0, "lines": 0, "files": 0}, 

939 "files": {}, 

940 } 

941 for line in text.splitlines(): 

942 (change_type, raw_insertions, raw_deletions, filename) = line.split("\t") 

943 insertions = raw_insertions != "-" and int(raw_insertions) or 0 

944 deletions = raw_deletions != "-" and int(raw_deletions) or 0 

945 hsh["total"]["insertions"] += insertions 

946 hsh["total"]["deletions"] += deletions 

947 hsh["total"]["lines"] += insertions + deletions 

948 hsh["total"]["files"] += 1 

949 files_dict: Files_TD = { 

950 "insertions": insertions, 

951 "deletions": deletions, 

952 "lines": insertions + deletions, 

953 "change_type": change_type, 

954 } 

955 hsh["files"][filename.strip()] = files_dict 

956 return Stats(hsh["total"], hsh["files"]) 

957 

958 

959class IndexFileSHA1Writer: 

960 """Wrapper around a file-like object that remembers the SHA1 of the data written to 

961 it. It will write a sha when the stream is closed or if asked for explicitly using 

962 :meth:`write_sha`. 

963 

964 Only useful to the index file. 

965 

966 :note: 

967 Based on the dulwich project. 

968 """ 

969 

970 __slots__ = ("f", "sha1") 

971 

972 def __init__(self, f: IO) -> None: 

973 self.f = f 

974 self.sha1 = make_sha(b"") 

975 

976 def write(self, data: AnyStr) -> int: 

977 self.sha1.update(data) 

978 return self.f.write(data) 

979 

980 def write_sha(self) -> bytes: 

981 sha = self.sha1.digest() 

982 self.f.write(sha) 

983 return sha 

984 

985 def close(self) -> bytes: 

986 sha = self.write_sha() 

987 self.f.close() 

988 return sha 

989 

990 def tell(self) -> int: 

991 return self.f.tell() 

992 

993 

994class LockFile: 

995 """Provides methods to obtain, check for, and release a file based lock which 

996 should be used to handle concurrent access to the same file. 

997 

998 As we are a utility class to be derived from, we only use protected methods. 

999 

1000 Locks will automatically be released on destruction. 

1001 """ 

1002 

1003 __slots__ = ("_file_path", "_owns_lock") 

1004 

1005 def __init__(self, file_path: PathLike) -> None: 

1006 self._file_path = file_path 

1007 self._owns_lock = False 

1008 

1009 def __del__(self) -> None: 

1010 self._release_lock() 

1011 

1012 def _lock_file_path(self) -> str: 

1013 """:return: Path to lockfile""" 

1014 return "%s.lock" % (self._file_path) 

1015 

1016 def _has_lock(self) -> bool: 

1017 """ 

1018 :return: 

1019 True if we have a lock and if the lockfile still exists 

1020 

1021 :raise AssertionError: 

1022 If our lock-file does not exist. 

1023 """ 

1024 return self._owns_lock 

1025 

1026 def _obtain_lock_or_raise(self) -> None: 

1027 """Create a lock file as flag for other instances, mark our instance as 

1028 lock-holder. 

1029 

1030 :raise IOError: 

1031 If a lock was already present or a lock file could not be written. 

1032 """ 

1033 if self._has_lock(): 

1034 return 

1035 lock_file = self._lock_file_path() 

1036 if osp.isfile(lock_file): 

1037 raise IOError( 

1038 "Lock for file %r did already exist, delete %r in case the lock is illegal" 

1039 % (self._file_path, lock_file) 

1040 ) 

1041 

1042 try: 

1043 with open(lock_file, mode="w"): 

1044 pass 

1045 except OSError as e: 

1046 raise IOError(str(e)) from e 

1047 

1048 self._owns_lock = True 

1049 

1050 def _obtain_lock(self) -> None: 

1051 """The default implementation will raise if a lock cannot be obtained. 

1052 

1053 Subclasses may override this method to provide a different implementation. 

1054 """ 

1055 return self._obtain_lock_or_raise() 

1056 

1057 def _release_lock(self) -> None: 

1058 """Release our lock if we have one.""" 

1059 if not self._has_lock(): 

1060 return 

1061 

1062 # If someone removed our file beforehand, lets just flag this issue instead of 

1063 # failing, to make it more usable. 

1064 lfp = self._lock_file_path() 

1065 try: 

1066 rmfile(lfp) 

1067 except OSError: 

1068 pass 

1069 self._owns_lock = False 

1070 

1071 

1072class BlockingLockFile(LockFile): 

1073 """The lock file will block until a lock could be obtained, or fail after a 

1074 specified timeout. 

1075 

1076 :note: 

1077 If the directory containing the lock was removed, an exception will be raised 

1078 during the blocking period, preventing hangs as the lock can never be obtained. 

1079 """ 

1080 

1081 __slots__ = ("_check_interval", "_max_block_time") 

1082 

1083 def __init__( 

1084 self, 

1085 file_path: PathLike, 

1086 check_interval_s: float = 0.3, 

1087 max_block_time_s: int = sys.maxsize, 

1088 ) -> None: 

1089 """Configure the instance. 

1090 

1091 :param check_interval_s: 

1092 Period of time to sleep until the lock is checked the next time. 

1093 By default, it waits a nearly unlimited time. 

1094 

1095 :param max_block_time_s: 

1096 Maximum amount of seconds we may lock. 

1097 """ 

1098 super().__init__(file_path) 

1099 self._check_interval = check_interval_s 

1100 self._max_block_time = max_block_time_s 

1101 

1102 def _obtain_lock(self) -> None: 

1103 """This method blocks until it obtained the lock, or raises :exc:`IOError` if it 

1104 ran out of time or if the parent directory was not available anymore. 

1105 

1106 If this method returns, you are guaranteed to own the lock. 

1107 """ 

1108 starttime = time.time() 

1109 maxtime = starttime + float(self._max_block_time) 

1110 while True: 

1111 try: 

1112 super()._obtain_lock() 

1113 except IOError as e: 

1114 # synity check: if the directory leading to the lockfile is not 

1115 # readable anymore, raise an exception 

1116 curtime = time.time() 

1117 if not osp.isdir(osp.dirname(self._lock_file_path())): 

1118 msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % ( 

1119 self._lock_file_path(), 

1120 curtime - starttime, 

1121 ) 

1122 raise IOError(msg) from e 

1123 # END handle missing directory 

1124 

1125 if curtime >= maxtime: 

1126 msg = "Waited %g seconds for lock at %r" % ( 

1127 maxtime - starttime, 

1128 self._lock_file_path(), 

1129 ) 

1130 raise IOError(msg) from e 

1131 # END abort if we wait too long 

1132 time.sleep(self._check_interval) 

1133 else: 

1134 break 

1135 # END endless loop 

1136 

1137 

1138class IterableList(List[T_IterableObj]): 

1139 """List of iterable objects allowing to query an object by id or by named index:: 

1140 

1141 heads = repo.heads 

1142 heads.master 

1143 heads['master'] 

1144 heads[0] 

1145 

1146 Iterable parent objects: 

1147 

1148 * :class:`Commit <git.objects.Commit>` 

1149 * :class:`Submodule <git.objects.submodule.base.Submodule>` 

1150 * :class:`Reference <git.refs.reference.Reference>` 

1151 * :class:`FetchInfo <git.remote.FetchInfo>` 

1152 * :class:`PushInfo <git.remote.PushInfo>` 

1153 

1154 Iterable via inheritance: 

1155 

1156 * :class:`Head <git.refs.head.Head>` 

1157 * :class:`TagReference <git.refs.tag.TagReference>` 

1158 * :class:`RemoteReference <git.refs.remote.RemoteReference>` 

1159 

1160 This requires an ``id_attribute`` name to be set which will be queried from its 

1161 contained items to have a means for comparison. 

1162 

1163 A prefix can be specified which is to be used in case the id returned by the items 

1164 always contains a prefix that does not matter to the user, so it can be left out. 

1165 """ 

1166 

1167 __slots__ = ("_id_attr", "_prefix") 

1168 

1169 def __new__(cls, id_attr: str, prefix: str = "") -> "IterableList[T_IterableObj]": 

1170 return super().__new__(cls) 

1171 

1172 def __init__(self, id_attr: str, prefix: str = "") -> None: 

1173 self._id_attr = id_attr 

1174 self._prefix = prefix 

1175 

1176 def __contains__(self, attr: object) -> bool: 

1177 # First try identity match for performance. 

1178 try: 

1179 rval = list.__contains__(self, attr) 

1180 if rval: 

1181 return rval 

1182 except (AttributeError, TypeError): 

1183 pass 

1184 # END handle match 

1185 

1186 # Otherwise make a full name search. 

1187 try: 

1188 getattr(self, cast(str, attr)) # Use cast to silence mypy. 

1189 return True 

1190 except (AttributeError, TypeError): 

1191 return False 

1192 # END handle membership 

1193 

1194 def __getattr__(self, attr: str) -> T_IterableObj: 

1195 attr = self._prefix + attr 

1196 for item in self: 

1197 if getattr(item, self._id_attr) == attr: 

1198 return item 

1199 # END for each item 

1200 return list.__getattribute__(self, attr) 

1201 

1202 def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> T_IterableObj: # type: ignore[override] 

1203 assert isinstance(index, (int, str, slice)), "Index of IterableList should be an int or str" 

1204 

1205 if isinstance(index, int): 

1206 return list.__getitem__(self, index) 

1207 elif isinstance(index, slice): 

1208 raise ValueError("Index should be an int or str") 

1209 else: 

1210 try: 

1211 return getattr(self, index) 

1212 except AttributeError as e: 

1213 raise IndexError("No item found with id %r" % (self._prefix + index)) from e 

1214 # END handle getattr 

1215 

1216 def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> None: 

1217 assert isinstance(index, (int, str)), "Index of IterableList should be an int or str" 

1218 

1219 delindex = cast(int, index) 

1220 if not isinstance(index, int): 

1221 delindex = -1 

1222 name = self._prefix + index 

1223 for i, item in enumerate(self): 

1224 if getattr(item, self._id_attr) == name: 

1225 delindex = i 

1226 break 

1227 # END search index 

1228 # END for each item 

1229 if delindex == -1: 

1230 raise IndexError("Item with name %s not found" % name) 

1231 # END handle error 

1232 # END get index to delete 

1233 list.__delitem__(self, delindex) 

1234 

1235 

1236@runtime_checkable 

1237class IterableObj(Protocol): 

1238 """Defines an interface for iterable items, so there is a uniform way to retrieve 

1239 and iterate items within the git repository. 

1240 

1241 Subclasses: 

1242 

1243 * :class:`Submodule <git.objects.submodule.base.Submodule>` 

1244 * :class:`Commit <git.objects.Commit>` 

1245 * :class:`Reference <git.refs.reference.Reference>` 

1246 * :class:`PushInfo <git.remote.PushInfo>` 

1247 * :class:`FetchInfo <git.remote.FetchInfo>` 

1248 * :class:`Remote <git.remote.Remote>` 

1249 """ 

1250 

1251 __slots__ = () 

1252 

1253 _id_attribute_: str 

1254 

1255 @classmethod 

1256 @abstractmethod 

1257 def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Iterator[T_IterableObj]: 

1258 # Return-typed to be compatible with subtypes e.g. Remote. 

1259 """Find (all) items of this type. 

1260 

1261 Subclasses can specify `args` and `kwargs` differently, and may use them for 

1262 filtering. However, when the method is called with no additional positional or 

1263 keyword arguments, subclasses are obliged to to yield all items. 

1264 

1265 :return: 

1266 Iterator yielding Items 

1267 """ 

1268 raise NotImplementedError("To be implemented by Subclass") 

1269 

1270 @classmethod 

1271 def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> IterableList[T_IterableObj]: 

1272 """Find (all) items of this type and collect them into a list. 

1273 

1274 For more information about the arguments, see :meth:`iter_items`. 

1275 

1276 :note: 

1277 Favor the :meth:`iter_items` method as it will avoid eagerly collecting all 

1278 items. When there are many items, that can slow performance and increase 

1279 memory usage. 

1280 

1281 :return: 

1282 list(Item,...) list of item instances 

1283 """ 

1284 out_list: IterableList = IterableList(cls._id_attribute_) 

1285 out_list.extend(cls.iter_items(repo, *args, **kwargs)) 

1286 return out_list 

1287 

1288 

1289class IterableClassWatcher(type): 

1290 """Metaclass that issues :exc:`DeprecationWarning` when :class:`git.util.Iterable` 

1291 is subclassed.""" 

1292 

1293 def __init__(cls, name: str, bases: Tuple, clsdict: Dict) -> None: 

1294 for base in bases: 

1295 if type(base) is IterableClassWatcher: 

1296 warnings.warn( 

1297 f"GitPython Iterable subclassed by {name}." 

1298 " Iterable is deprecated due to naming clash since v3.1.18" 

1299 " and will be removed in 4.0.0." 

1300 " Use IterableObj instead.", 

1301 DeprecationWarning, 

1302 stacklevel=2, 

1303 ) 

1304 

1305 

1306class Iterable(metaclass=IterableClassWatcher): 

1307 """Deprecated, use :class:`IterableObj` instead. 

1308 

1309 Defines an interface for iterable items, so there is a uniform way to retrieve 

1310 and iterate items within the git repository. 

1311 """ 

1312 

1313 __slots__ = () 

1314 

1315 _id_attribute_ = "attribute that most suitably identifies your instance" 

1316 

1317 @classmethod 

1318 def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any: 

1319 """Deprecated, use :class:`IterableObj` instead. 

1320 

1321 Find (all) items of this type. 

1322 

1323 See :meth:`IterableObj.iter_items` for details on usage. 

1324 

1325 :return: 

1326 Iterator yielding Items 

1327 """ 

1328 raise NotImplementedError("To be implemented by Subclass") 

1329 

1330 @classmethod 

1331 def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any: 

1332 """Deprecated, use :class:`IterableObj` instead. 

1333 

1334 Find (all) items of this type and collect them into a list. 

1335 

1336 See :meth:`IterableObj.list_items` for details on usage. 

1337 

1338 :return: 

1339 list(Item,...) list of item instances 

1340 """ 

1341 out_list: Any = IterableList(cls._id_attribute_) 

1342 out_list.extend(cls.iter_items(repo, *args, **kwargs)) 

1343 return out_list 

1344 

1345 

1346# } END classes