Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/git/util.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

563 statements  

1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors 

2# 

3# This module is part of GitPython and is released under the 

4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ 

5 

6import sys 

7 

8__all__ = [ 

9 "stream_copy", 

10 "join_path", 

11 "to_native_path_linux", 

12 "join_path_native", 

13 "Stats", 

14 "IndexFileSHA1Writer", 

15 "IterableObj", 

16 "IterableList", 

17 "BlockingLockFile", 

18 "LockFile", 

19 "Actor", 

20 "get_user_id", 

21 "assure_directory_exists", 

22 "RemoteProgress", 

23 "CallableRemoteProgress", 

24 "rmtree", 

25 "unbare_repo", 

26 "HIDE_WINDOWS_KNOWN_ERRORS", 

27] 

28 

29if sys.platform == "win32": 

30 __all__.append("to_native_path_windows") 

31 

32from abc import abstractmethod 

33import contextlib 

34from functools import wraps 

35import getpass 

36import logging 

37import os 

38import os.path as osp 

39import pathlib 

40import platform 

41import re 

42import shutil 

43import stat 

44import subprocess 

45import time 

46from urllib.parse import urlsplit, urlunsplit 

47import warnings 

48 

49# NOTE: Unused imports can be improved now that CI testing has fully resumed. Some of 

50# these be used indirectly through other GitPython modules, which avoids having to write 

51# gitdb all the time in their imports. They are not in __all__, at least currently, 

52# because they could be removed or changed at any time, and so should not be considered 

53# conceptually public to code outside GitPython. Linters of course do not like it. 

54from gitdb.util import ( 

55 LazyMixin, # noqa: F401 

56 LockedFD, # noqa: F401 

57 bin_to_hex, # noqa: F401 

58 file_contents_ro, # noqa: F401 

59 file_contents_ro_filepath, # noqa: F401 

60 hex_to_bin, # noqa: F401 

61 make_sha, 

62 to_bin_sha, # noqa: F401 

63 to_hex_sha, # noqa: F401 

64) 

65 

66# typing --------------------------------------------------------- 

67 

68from typing import ( 

69 Any, 

70 AnyStr, 

71 BinaryIO, 

72 Callable, 

73 Dict, 

74 Generator, 

75 IO, 

76 Iterator, 

77 List, 

78 Optional, 

79 Pattern, 

80 Sequence, 

81 Tuple, 

82 TYPE_CHECKING, 

83 TypeVar, 

84 Union, 

85 cast, 

86 overload, 

87) 

88 

89if TYPE_CHECKING: 

90 from git.cmd import Git 

91 from git.config import GitConfigParser, SectionConstraint 

92 from git.remote import Remote 

93 from git.repo.base import Repo 

94 

95from git.types import ( 

96 Files_TD, 

97 Has_id_attribute, 

98 HSH_TD, 

99 Literal, 

100 PathLike, 

101 Protocol, 

102 SupportsIndex, 

103 Total_TD, 

104 runtime_checkable, 

105) 

106 

107# --------------------------------------------------------------------- 

108 

109T_IterableObj = TypeVar("T_IterableObj", bound=Union["IterableObj", "Has_id_attribute"], covariant=True) 

110# So IterableList[Head] is subtype of IterableList[IterableObj]. 

111 

112_logger = logging.getLogger(__name__) 

113 

114 

115def _read_env_flag(name: str, default: bool) -> bool: 

116 """Read a boolean flag from an environment variable. 

117 

118 :return: 

119 The flag, or the `default` value if absent or ambiguous. 

120 """ 

121 try: 

122 value = os.environ[name] 

123 except KeyError: 

124 return default 

125 

126 _logger.warning( 

127 "The %s environment variable is deprecated. Its effect has never been documented and changes without warning.", 

128 name, 

129 ) 

130 

131 adjusted_value = value.strip().lower() 

132 

133 if adjusted_value in {"", "0", "false", "no"}: 

134 return False 

135 if adjusted_value in {"1", "true", "yes"}: 

136 return True 

137 _logger.warning("%s has unrecognized value %r, treating as %r.", name, value, default) 

138 return default 

139 

140 

141def _read_win_env_flag(name: str, default: bool) -> bool: 

142 """Read a boolean flag from an environment variable on Windows. 

143 

144 :return: 

145 On Windows, the flag, or the `default` value if absent or ambiguous. 

146 On all other operating systems, ``False``. 

147 

148 :note: 

149 This only accesses the environment on Windows. 

150 """ 

151 return sys.platform == "win32" and _read_env_flag(name, default) 

152 

153 

154#: We need an easy way to see if Appveyor TCs start failing, 

155#: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy, 

156#: till then, we wish to hide them. 

157HIDE_WINDOWS_KNOWN_ERRORS = _read_win_env_flag("HIDE_WINDOWS_KNOWN_ERRORS", True) 

158HIDE_WINDOWS_FREEZE_ERRORS = _read_win_env_flag("HIDE_WINDOWS_FREEZE_ERRORS", True) 

159 

160# { Utility Methods 

161 

162T = TypeVar("T") 

163 

164 

165def unbare_repo(func: Callable[..., T]) -> Callable[..., T]: 

166 """Methods with this decorator raise :exc:`~git.exc.InvalidGitRepositoryError` if 

167 they encounter a bare repository.""" 

168 

169 from .exc import InvalidGitRepositoryError 

170 

171 @wraps(func) 

172 def wrapper(self: "Remote", *args: Any, **kwargs: Any) -> T: 

173 if self.repo.bare: 

174 raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__) 

175 # END bare method 

176 return func(self, *args, **kwargs) 

177 

178 # END wrapper 

179 

180 return wrapper 

181 

182 

183@contextlib.contextmanager 

184def cwd(new_dir: PathLike) -> Generator[PathLike, None, None]: 

185 """Context manager to temporarily change directory. 

186 

187 This is similar to :func:`contextlib.chdir` introduced in Python 3.11, but the 

188 context manager object returned by a single call to this function is not reentrant. 

189 """ 

190 old_dir = os.getcwd() 

191 os.chdir(new_dir) 

192 try: 

193 yield new_dir 

194 finally: 

195 os.chdir(old_dir) 

196 

197 

198@contextlib.contextmanager 

199def patch_env(name: str, value: str) -> Generator[None, None, None]: 

200 """Context manager to temporarily patch an environment variable.""" 

201 old_value = os.getenv(name) 

202 os.environ[name] = value 

203 try: 

204 yield 

205 finally: 

206 if old_value is None: 

207 del os.environ[name] 

208 else: 

209 os.environ[name] = old_value 

210 

211 

212def rmtree(path: PathLike) -> None: 

213 """Remove the given directory tree recursively. 

214 

215 :note: 

216 We use :func:`shutil.rmtree` but adjust its behaviour to see whether files that 

217 couldn't be deleted are read-only. Windows will not remove them in that case. 

218 """ 

219 

220 def handler(function: Callable, path: PathLike, _excinfo: Any) -> None: 

221 """Callback for :func:`shutil.rmtree`. 

222 

223 This works as either a ``onexc`` or ``onerror`` style callback. 

224 """ 

225 # Is the error an access error? 

226 os.chmod(path, stat.S_IWUSR) 

227 

228 try: 

229 function(path) 

230 except PermissionError as ex: 

231 if HIDE_WINDOWS_KNOWN_ERRORS: 

232 from unittest import SkipTest 

233 

234 raise SkipTest(f"FIXME: fails with: PermissionError\n {ex}") from ex 

235 raise 

236 

237 if sys.platform != "win32": 

238 shutil.rmtree(path) 

239 elif sys.version_info >= (3, 12): 

240 shutil.rmtree(path, onexc=handler) 

241 else: 

242 shutil.rmtree(path, onerror=handler) 

243 

244 

245def rmfile(path: PathLike) -> None: 

246 """Ensure file deleted also on *Windows* where read-only files need special 

247 treatment.""" 

248 if osp.isfile(path): 

249 if sys.platform == "win32": 

250 os.chmod(path, 0o777) 

251 os.remove(path) 

252 

253 

254def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int: 

255 """Copy all data from the `source` stream into the `destination` stream in chunks 

256 of size `chunk_size`. 

257 

258 :return: 

259 Number of bytes written 

260 """ 

261 br = 0 

262 while True: 

263 chunk = source.read(chunk_size) 

264 destination.write(chunk) 

265 br += len(chunk) 

266 if len(chunk) < chunk_size: 

267 break 

268 # END reading output stream 

269 return br 

270 

271 

272def join_path(a: PathLike, *p: PathLike) -> PathLike: 

273 R"""Join path tokens together similar to osp.join, but always use ``/`` instead of 

274 possibly ``\`` on Windows.""" 

275 path = str(a) 

276 for b in p: 

277 b = str(b) 

278 if not b: 

279 continue 

280 if b.startswith("/"): 

281 path += b[1:] 

282 elif path == "" or path.endswith("/"): 

283 path += b 

284 else: 

285 path += "/" + b 

286 # END for each path token to add 

287 return path 

288 

289 

290if sys.platform == "win32": 

291 

292 def to_native_path_windows(path: PathLike) -> PathLike: 

293 path = str(path) 

294 return path.replace("/", "\\") 

295 

296 def to_native_path_linux(path: PathLike) -> str: 

297 path = str(path) 

298 return path.replace("\\", "/") 

299 

300 to_native_path = to_native_path_windows 

301else: 

302 # No need for any work on Linux. 

303 def to_native_path_linux(path: PathLike) -> str: 

304 return str(path) 

305 

306 to_native_path = to_native_path_linux 

307 

308 

309def join_path_native(a: PathLike, *p: PathLike) -> PathLike: 

310 R"""Like :func:`join_path`, but makes sure an OS native path is returned. 

311 

312 This is only needed to play it safe on Windows and to ensure nice paths that only 

313 use ``\``. 

314 """ 

315 return to_native_path(join_path(a, *p)) 

316 

317 

318def assure_directory_exists(path: PathLike, is_file: bool = False) -> bool: 

319 """Make sure that the directory pointed to by path exists. 

320 

321 :param is_file: 

322 If ``True``, `path` is assumed to be a file and handled correctly. 

323 Otherwise it must be a directory. 

324 

325 :return: 

326 ``True`` if the directory was created, ``False`` if it already existed. 

327 """ 

328 if is_file: 

329 path = osp.dirname(path) 

330 # END handle file 

331 if not osp.isdir(path): 

332 os.makedirs(path, exist_ok=True) 

333 return True 

334 return False 

335 

336 

337def _get_exe_extensions() -> Sequence[str]: 

338 PATHEXT = os.environ.get("PATHEXT", None) 

339 if PATHEXT: 

340 return tuple(p.upper() for p in PATHEXT.split(os.pathsep)) 

341 elif sys.platform == "win32": 

342 return (".BAT", ".COM", ".EXE") 

343 else: 

344 return () 

345 

346 

347def py_where(program: str, path: Optional[PathLike] = None) -> List[str]: 

348 """Perform a path search to assist :func:`is_cygwin_git`. 

349 

350 This is not robust for general use. It is an implementation detail of 

351 :func:`is_cygwin_git`. When a search following all shell rules is needed, 

352 :func:`shutil.which` can be used instead. 

353 

354 :note: 

355 Neither this function nor :func:`shutil.which` will predict the effect of an 

356 executable search on a native Windows system due to a :class:`subprocess.Popen` 

357 call without ``shell=True``, because shell and non-shell executable search on 

358 Windows differ considerably. 

359 """ 

360 # From: http://stackoverflow.com/a/377028/548792 

361 winprog_exts = _get_exe_extensions() 

362 

363 def is_exec(fpath: str) -> bool: 

364 return ( 

365 osp.isfile(fpath) 

366 and os.access(fpath, os.X_OK) 

367 and ( 

368 sys.platform != "win32" or not winprog_exts or any(fpath.upper().endswith(ext) for ext in winprog_exts) 

369 ) 

370 ) 

371 

372 progs = [] 

373 if not path: 

374 path = os.environ["PATH"] 

375 for folder in str(path).split(os.pathsep): 

376 folder = folder.strip('"') 

377 if folder: 

378 exe_path = osp.join(folder, program) 

379 for f in [exe_path] + ["%s%s" % (exe_path, e) for e in winprog_exts]: 

380 if is_exec(f): 

381 progs.append(f) 

382 return progs 

383 

384 

385def _cygexpath(drive: Optional[str], path: str) -> str: 

386 if osp.isabs(path) and not drive: 

387 # Invoked from `cygpath()` directly with `D:Apps\123`? 

388 # It's an error, leave it alone just slashes) 

389 p = path # convert to str if AnyPath given 

390 else: 

391 p = path and osp.normpath(osp.expandvars(osp.expanduser(path))) 

392 if osp.isabs(p): 

393 if drive: 

394 # Confusing, maybe a remote system should expand vars. 

395 p = path 

396 else: 

397 p = cygpath(p) 

398 elif drive: 

399 p = "/proc/cygdrive/%s/%s" % (drive.lower(), p) 

400 p_str = str(p) # ensure it is a str and not AnyPath 

401 return p_str.replace("\\", "/") 

402 

403 

404_cygpath_parsers: Tuple[Tuple[Pattern[str], Callable, bool], ...] = ( 

405 # See: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx 

406 # and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths 

407 ( 

408 re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"), 

409 (lambda server, share, rest_path: "//%s/%s/%s" % (server, share, rest_path.replace("\\", "/"))), 

410 False, 

411 ), 

412 (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), (_cygexpath), False), 

413 (re.compile(r"(\w):[/\\](.*)"), (_cygexpath), False), 

414 (re.compile(r"file:(.*)", re.I), (lambda rest_path: rest_path), True), 

415 (re.compile(r"(\w{2,}:.*)"), (lambda url: url), False), # remote URL, do nothing 

416) 

417 

418 

419def cygpath(path: str) -> str: 

420 """Use :meth:`git.cmd.Git.polish_url` instead, that works on any environment.""" 

421 path = str(path) # Ensure is str and not AnyPath. 

422 # Fix to use Paths when 3.5 dropped. Or to be just str if only for URLs? 

423 if not path.startswith(("/cygdrive", "//", "/proc/cygdrive")): 

424 for regex, parser, recurse in _cygpath_parsers: 

425 match = regex.match(path) 

426 if match: 

427 path = parser(*match.groups()) 

428 if recurse: 

429 path = cygpath(path) 

430 break 

431 else: 

432 path = _cygexpath(None, path) 

433 

434 return path 

435 

436 

437_decygpath_regex = re.compile(r"(?:/proc)?/cygdrive/(\w)(/.*)?") 

438 

439 

440def decygpath(path: PathLike) -> str: 

441 path = str(path) 

442 m = _decygpath_regex.match(path) 

443 if m: 

444 drive, rest_path = m.groups() 

445 path = "%s:%s" % (drive.upper(), rest_path or "") 

446 

447 return path.replace("/", "\\") 

448 

449 

450#: Store boolean flags denoting if a specific Git executable 

451#: is from a Cygwin installation (since `cache_lru()` unsupported on PY2). 

452_is_cygwin_cache: Dict[str, Optional[bool]] = {} 

453 

454 

455def _is_cygwin_git(git_executable: str) -> bool: 

456 is_cygwin = _is_cygwin_cache.get(git_executable) # type: Optional[bool] 

457 if is_cygwin is None: 

458 is_cygwin = False 

459 try: 

460 git_dir = osp.dirname(git_executable) 

461 if not git_dir: 

462 res = py_where(git_executable) 

463 git_dir = osp.dirname(res[0]) if res else "" 

464 

465 # Just a name given, not a real path. 

466 uname_cmd = osp.join(git_dir, "uname") 

467 

468 if not (pathlib.Path(uname_cmd).is_file() and os.access(uname_cmd, os.X_OK)): 

469 _logger.debug(f"Failed checking if running in CYGWIN: {uname_cmd} is not an executable") 

470 _is_cygwin_cache[git_executable] = is_cygwin 

471 return is_cygwin 

472 

473 process = subprocess.Popen([uname_cmd], stdout=subprocess.PIPE, universal_newlines=True) 

474 uname_out, _ = process.communicate() 

475 # retcode = process.poll() 

476 is_cygwin = "CYGWIN" in uname_out 

477 except Exception as ex: 

478 _logger.debug("Failed checking if running in CYGWIN due to: %r", ex) 

479 _is_cygwin_cache[git_executable] = is_cygwin 

480 

481 return is_cygwin 

482 

483 

484@overload 

485def is_cygwin_git(git_executable: None) -> Literal[False]: ... 

486 

487 

488@overload 

489def is_cygwin_git(git_executable: PathLike) -> bool: ... 

490 

491 

492def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool: 

493 # TODO: when py3.7 support is dropped, use the new interpolation f"{variable=}" 

494 _logger.debug(f"sys.platform={sys.platform!r}, git_executable={git_executable!r}") 

495 if sys.platform != "cygwin": 

496 return False 

497 elif git_executable is None: 

498 return False 

499 else: 

500 return _is_cygwin_git(str(git_executable)) 

501 

502 

503def get_user_id() -> str: 

504 """:return: String identifying the currently active system user as ``name@node``""" 

505 return "%s@%s" % (getpass.getuser(), platform.node()) 

506 

507 

508def finalize_process(proc: Union[subprocess.Popen, "Git.AutoInterrupt"], **kwargs: Any) -> None: 

509 """Wait for the process (clone, fetch, pull or push) and handle its errors 

510 accordingly.""" 

511 # TODO: No close proc-streams?? 

512 proc.wait(**kwargs) 

513 

514 

515@overload 

516def expand_path(p: None, expand_vars: bool = ...) -> None: ... 

517 

518 

519@overload 

520def expand_path(p: PathLike, expand_vars: bool = ...) -> str: 

521 # TODO: Support for Python 3.5 has been dropped, so these overloads can be improved. 

522 ... 

523 

524 

525def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[PathLike]: 

526 if isinstance(p, pathlib.Path): 

527 return p.resolve() 

528 try: 

529 p = osp.expanduser(p) # type: ignore[arg-type] 

530 if expand_vars: 

531 p = osp.expandvars(p) 

532 return osp.normpath(osp.abspath(p)) 

533 except Exception: 

534 return None 

535 

536 

537def remove_password_if_present(cmdline: Sequence[str]) -> List[str]: 

538 """Parse any command line argument and if one of the elements is an URL with a 

539 username and/or password, replace them by stars (in-place). 

540 

541 If nothing is found, this just returns the command line as-is. 

542 

543 This should be used for every log line that print a command line, as well as 

544 exception messages. 

545 """ 

546 new_cmdline = [] 

547 for index, to_parse in enumerate(cmdline): 

548 new_cmdline.append(to_parse) 

549 try: 

550 url = urlsplit(to_parse) 

551 # Remove password from the URL if present. 

552 if url.password is None and url.username is None: 

553 continue 

554 

555 if url.password is not None: 

556 url = url._replace(netloc=url.netloc.replace(url.password, "*****")) 

557 if url.username is not None: 

558 url = url._replace(netloc=url.netloc.replace(url.username, "*****")) 

559 new_cmdline[index] = urlunsplit(url) 

560 except ValueError: 

561 # This is not a valid URL. 

562 continue 

563 return new_cmdline 

564 

565 

566# } END utilities 

567 

568# { Classes 

569 

570 

571class RemoteProgress: 

572 """Handler providing an interface to parse progress information emitted by 

573 :manpage:`git-push(1)` and :manpage:`git-fetch(1)` and to dispatch callbacks 

574 allowing subclasses to react to the progress.""" 

575 

576 _num_op_codes: int = 9 

577 ( 

578 BEGIN, 

579 END, 

580 COUNTING, 

581 COMPRESSING, 

582 WRITING, 

583 RECEIVING, 

584 RESOLVING, 

585 FINDING_SOURCES, 

586 CHECKING_OUT, 

587 ) = [1 << x for x in range(_num_op_codes)] 

588 STAGE_MASK = BEGIN | END 

589 OP_MASK = ~STAGE_MASK 

590 

591 DONE_TOKEN = "done." 

592 TOKEN_SEPARATOR = ", " 

593 

594 __slots__ = ( 

595 "_cur_line", 

596 "_seen_ops", 

597 "error_lines", # Lines that started with 'error:' or 'fatal:'. 

598 "other_lines", # Lines not denoting progress (i.e.g. push-infos). 

599 ) 

600 re_op_absolute = re.compile(r"(remote: )?([\w\s]+):\s+()(\d+)()(.*)") 

601 re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)") 

602 

603 def __init__(self) -> None: 

604 self._seen_ops: List[int] = [] 

605 self._cur_line: Optional[str] = None 

606 self.error_lines: List[str] = [] 

607 self.other_lines: List[str] = [] 

608 

609 def _parse_progress_line(self, line: AnyStr) -> None: 

610 """Parse progress information from the given line as retrieved by 

611 :manpage:`git-push(1)` or :manpage:`git-fetch(1)`. 

612 

613 - Lines that do not contain progress info are stored in :attr:`other_lines`. 

614 - Lines that seem to contain an error (i.e. start with ``error:`` or ``fatal:``) 

615 are stored in :attr:`error_lines`. 

616 """ 

617 # handle 

618 # Counting objects: 4, done. 

619 # Compressing objects: 50% (1/2) 

620 # Compressing objects: 100% (2/2) 

621 # Compressing objects: 100% (2/2), done. 

622 if isinstance(line, bytes): # mypy argues about ternary assignment. 

623 line_str = line.decode("utf-8") 

624 else: 

625 line_str = line 

626 self._cur_line = line_str 

627 

628 if self._cur_line.startswith(("error:", "fatal:")): 

629 self.error_lines.append(self._cur_line) 

630 return 

631 

632 cur_count, max_count = None, None 

633 match = self.re_op_relative.match(line_str) 

634 if match is None: 

635 match = self.re_op_absolute.match(line_str) 

636 

637 if not match: 

638 self.line_dropped(line_str) 

639 self.other_lines.append(line_str) 

640 return 

641 # END could not get match 

642 

643 op_code = 0 

644 _remote, op_name, _percent, cur_count, max_count, message = match.groups() 

645 

646 # Get operation ID. 

647 if op_name == "Counting objects": 

648 op_code |= self.COUNTING 

649 elif op_name == "Compressing objects": 

650 op_code |= self.COMPRESSING 

651 elif op_name == "Writing objects": 

652 op_code |= self.WRITING 

653 elif op_name == "Receiving objects": 

654 op_code |= self.RECEIVING 

655 elif op_name == "Resolving deltas": 

656 op_code |= self.RESOLVING 

657 elif op_name == "Finding sources": 

658 op_code |= self.FINDING_SOURCES 

659 elif op_name == "Checking out files": 

660 op_code |= self.CHECKING_OUT 

661 else: 

662 # Note: On Windows it can happen that partial lines are sent. 

663 # Hence we get something like "CompreReceiving objects", which is 

664 # a blend of "Compressing objects" and "Receiving objects". 

665 # This can't really be prevented, so we drop the line verbosely 

666 # to make sure we get informed in case the process spits out new 

667 # commands at some point. 

668 self.line_dropped(line_str) 

669 # Note: Don't add this line to the other lines, as we have to silently 

670 # drop it. 

671 return 

672 # END handle op code 

673 

674 # Figure out stage. 

675 if op_code not in self._seen_ops: 

676 self._seen_ops.append(op_code) 

677 op_code |= self.BEGIN 

678 # END begin opcode 

679 

680 if message is None: 

681 message = "" 

682 # END message handling 

683 

684 message = message.strip() 

685 if message.endswith(self.DONE_TOKEN): 

686 op_code |= self.END 

687 message = message[: -len(self.DONE_TOKEN)] 

688 # END end message handling 

689 message = message.strip(self.TOKEN_SEPARATOR) 

690 

691 self.update( 

692 op_code, 

693 cur_count and float(cur_count), 

694 max_count and float(max_count), 

695 message, 

696 ) 

697 

698 def new_message_handler(self) -> Callable[[str], None]: 

699 """ 

700 :return: 

701 A progress handler suitable for :func:`~git.cmd.handle_process_output`, 

702 passing lines on to this progress handler in a suitable format. 

703 """ 

704 

705 def handler(line: AnyStr) -> None: 

706 return self._parse_progress_line(line.rstrip()) 

707 

708 # END handler 

709 

710 return handler 

711 

712 def line_dropped(self, line: str) -> None: 

713 """Called whenever a line could not be understood and was therefore dropped.""" 

714 pass 

715 

716 def update( 

717 self, 

718 op_code: int, 

719 cur_count: Union[str, float], 

720 max_count: Union[str, float, None] = None, 

721 message: str = "", 

722 ) -> None: 

723 """Called whenever the progress changes. 

724 

725 :param op_code: 

726 Integer allowing to be compared against Operation IDs and stage IDs. 

727 

728 Stage IDs are :const:`BEGIN` and :const:`END`. :const:`BEGIN` will only be 

729 set once for each Operation ID as well as :const:`END`. It may be that 

730 :const:`BEGIN` and :const:`END` are set at once in case only one progress 

731 message was emitted due to the speed of the operation. Between 

732 :const:`BEGIN` and :const:`END`, none of these flags will be set. 

733 

734 Operation IDs are all held within the :const:`OP_MASK`. Only one Operation 

735 ID will be active per call. 

736 

737 :param cur_count: 

738 Current absolute count of items. 

739 

740 :param max_count: 

741 The maximum count of items we expect. It may be ``None`` in case there is no 

742 maximum number of items or if it is (yet) unknown. 

743 

744 :param message: 

745 In case of the :const:`WRITING` operation, it contains the amount of bytes 

746 transferred. It may possibly be used for other purposes as well. 

747 

748 :note: 

749 You may read the contents of the current line in 

750 :attr:`self._cur_line <_cur_line>`. 

751 """ 

752 pass 

753 

754 

755class CallableRemoteProgress(RemoteProgress): 

756 """A :class:`RemoteProgress` implementation forwarding updates to any callable. 

757 

758 :note: 

759 Like direct instances of :class:`RemoteProgress`, instances of this 

760 :class:`CallableRemoteProgress` class are not themselves directly callable. 

761 Rather, instances of this class wrap a callable and forward to it. This should 

762 therefore not be confused with :class:`git.types.CallableProgress`. 

763 """ 

764 

765 __slots__ = ("_callable",) 

766 

767 def __init__(self, fn: Callable) -> None: 

768 self._callable = fn 

769 super().__init__() 

770 

771 def update(self, *args: Any, **kwargs: Any) -> None: 

772 self._callable(*args, **kwargs) 

773 

774 

775class Actor: 

776 """Actors hold information about a person acting on the repository. They can be 

777 committers and authors or anything with a name and an email as mentioned in the git 

778 log entries.""" 

779 

780 # PRECOMPILED REGEX 

781 name_only_regex = re.compile(r"<(.*)>") 

782 name_email_regex = re.compile(r"(.*) <(.*?)>") 

783 

784 # ENVIRONMENT VARIABLES 

785 # These are read when creating new commits. 

786 env_author_name = "GIT_AUTHOR_NAME" 

787 env_author_email = "GIT_AUTHOR_EMAIL" 

788 env_committer_name = "GIT_COMMITTER_NAME" 

789 env_committer_email = "GIT_COMMITTER_EMAIL" 

790 

791 # CONFIGURATION KEYS 

792 conf_name = "name" 

793 conf_email = "email" 

794 

795 __slots__ = ("name", "email") 

796 

797 def __init__(self, name: Optional[str], email: Optional[str]) -> None: 

798 self.name = name 

799 self.email = email 

800 

801 def __eq__(self, other: Any) -> bool: 

802 return self.name == other.name and self.email == other.email 

803 

804 def __ne__(self, other: Any) -> bool: 

805 return not (self == other) 

806 

807 def __hash__(self) -> int: 

808 return hash((self.name, self.email)) 

809 

810 def __str__(self) -> str: 

811 return self.name if self.name else "" 

812 

813 def __repr__(self) -> str: 

814 return '<git.Actor "%s <%s>">' % (self.name, self.email) 

815 

816 @classmethod 

817 def _from_string(cls, string: str) -> "Actor": 

818 """Create an :class:`Actor` from a string. 

819 

820 :param string: 

821 The string, which is expected to be in regular git format:: 

822 

823 John Doe <jdoe@example.com> 

824 

825 :return: 

826 :class:`Actor` 

827 """ 

828 m = cls.name_email_regex.search(string) 

829 if m: 

830 name, email = m.groups() 

831 return Actor(name, email) 

832 else: 

833 m = cls.name_only_regex.search(string) 

834 if m: 

835 return Actor(m.group(1), None) 

836 # Assume the best and use the whole string as name. 

837 return Actor(string, None) 

838 # END special case name 

839 # END handle name/email matching 

840 

841 @classmethod 

842 def _main_actor( 

843 cls, 

844 env_name: str, 

845 env_email: str, 

846 config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None, 

847 ) -> "Actor": 

848 actor = Actor("", "") 

849 user_id = None # We use this to avoid multiple calls to getpass.getuser(). 

850 

851 def default_email() -> str: 

852 nonlocal user_id 

853 if not user_id: 

854 user_id = get_user_id() 

855 return user_id 

856 

857 def default_name() -> str: 

858 return default_email().split("@")[0] 

859 

860 for attr, evar, cvar, default in ( 

861 ("name", env_name, cls.conf_name, default_name), 

862 ("email", env_email, cls.conf_email, default_email), 

863 ): 

864 try: 

865 val = os.environ[evar] 

866 setattr(actor, attr, val) 

867 except KeyError: 

868 if config_reader is not None: 

869 try: 

870 val = config_reader.get("user", cvar) 

871 except Exception: 

872 val = default() 

873 setattr(actor, attr, val) 

874 # END config-reader handling 

875 if not getattr(actor, attr): 

876 setattr(actor, attr, default()) 

877 # END handle name 

878 # END for each item to retrieve 

879 return actor 

880 

881 @classmethod 

882 def committer(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor": 

883 """ 

884 :return: 

885 :class:`Actor` instance corresponding to the configured committer. It 

886 behaves similar to the git implementation, such that the environment will 

887 override configuration values of `config_reader`. If no value is set at all, 

888 it will be generated. 

889 

890 :param config_reader: 

891 ConfigReader to use to retrieve the values from in case they are not set in 

892 the environment. 

893 """ 

894 return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader) 

895 

896 @classmethod 

897 def author(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor": 

898 """Same as :meth:`committer`, but defines the main author. It may be specified 

899 in the environment, but defaults to the committer.""" 

900 return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader) 

901 

902 

903class Stats: 

904 """Represents stat information as presented by git at the end of a merge. It is 

905 created from the output of a diff operation. 

906 

907 Example:: 

908 

909 c = Commit( sha1 ) 

910 s = c.stats 

911 s.total # full-stat-dict 

912 s.files # dict( filepath : stat-dict ) 

913 

914 ``stat-dict`` 

915 

916 A dictionary with the following keys and values:: 

917 

918 deletions = number of deleted lines as int 

919 insertions = number of inserted lines as int 

920 lines = total number of lines changed as int, or deletions + insertions 

921 change_type = type of change as str, A|C|D|M|R|T|U|X|B 

922 

923 ``full-stat-dict`` 

924 

925 In addition to the items in the stat-dict, it features additional information:: 

926 

927 files = number of changed files as int 

928 """ 

929 

930 __slots__ = ("total", "files") 

931 

932 def __init__(self, total: Total_TD, files: Dict[PathLike, Files_TD]) -> None: 

933 self.total = total 

934 self.files = files 

935 

936 @classmethod 

937 def _list_from_string(cls, repo: "Repo", text: str) -> "Stats": 

938 """Create a :class:`Stats` object from output retrieved by 

939 :manpage:`git-diff(1)`. 

940 

941 :return: 

942 :class:`git.Stats` 

943 """ 

944 

945 hsh: HSH_TD = { 

946 "total": {"insertions": 0, "deletions": 0, "lines": 0, "files": 0}, 

947 "files": {}, 

948 } 

949 for line in text.splitlines(): 

950 (change_type, raw_insertions, raw_deletions, filename) = line.split("\t") 

951 insertions = raw_insertions != "-" and int(raw_insertions) or 0 

952 deletions = raw_deletions != "-" and int(raw_deletions) or 0 

953 hsh["total"]["insertions"] += insertions 

954 hsh["total"]["deletions"] += deletions 

955 hsh["total"]["lines"] += insertions + deletions 

956 hsh["total"]["files"] += 1 

957 files_dict: Files_TD = { 

958 "insertions": insertions, 

959 "deletions": deletions, 

960 "lines": insertions + deletions, 

961 "change_type": change_type, 

962 } 

963 hsh["files"][filename.strip()] = files_dict 

964 return Stats(hsh["total"], hsh["files"]) 

965 

966 

967class IndexFileSHA1Writer: 

968 """Wrapper around a file-like object that remembers the SHA1 of the data written to 

969 it. It will write a sha when the stream is closed or if asked for explicitly using 

970 :meth:`write_sha`. 

971 

972 Only useful to the index file. 

973 

974 :note: 

975 Based on the dulwich project. 

976 """ 

977 

978 __slots__ = ("f", "sha1") 

979 

980 def __init__(self, f: IO) -> None: 

981 self.f = f 

982 self.sha1 = make_sha(b"") 

983 

984 def write(self, data: AnyStr) -> int: 

985 self.sha1.update(data) 

986 return self.f.write(data) 

987 

988 def write_sha(self) -> bytes: 

989 sha = self.sha1.digest() 

990 self.f.write(sha) 

991 return sha 

992 

993 def close(self) -> bytes: 

994 sha = self.write_sha() 

995 self.f.close() 

996 return sha 

997 

998 def tell(self) -> int: 

999 return self.f.tell() 

1000 

1001 

1002class LockFile: 

1003 """Provides methods to obtain, check for, and release a file based lock which 

1004 should be used to handle concurrent access to the same file. 

1005 

1006 As we are a utility class to be derived from, we only use protected methods. 

1007 

1008 Locks will automatically be released on destruction. 

1009 """ 

1010 

1011 __slots__ = ("_file_path", "_owns_lock") 

1012 

1013 def __init__(self, file_path: PathLike) -> None: 

1014 self._file_path = file_path 

1015 self._owns_lock = False 

1016 

1017 def __del__(self) -> None: 

1018 self._release_lock() 

1019 

1020 def _lock_file_path(self) -> str: 

1021 """:return: Path to lockfile""" 

1022 return "%s.lock" % (self._file_path) 

1023 

1024 def _has_lock(self) -> bool: 

1025 """ 

1026 :return: 

1027 True if we have a lock and if the lockfile still exists 

1028 

1029 :raise AssertionError: 

1030 If our lock-file does not exist. 

1031 """ 

1032 return self._owns_lock 

1033 

1034 def _obtain_lock_or_raise(self) -> None: 

1035 """Create a lock file as flag for other instances, mark our instance as 

1036 lock-holder. 

1037 

1038 :raise IOError: 

1039 If a lock was already present or a lock file could not be written. 

1040 """ 

1041 if self._has_lock(): 

1042 return 

1043 lock_file = self._lock_file_path() 

1044 if osp.isfile(lock_file): 

1045 raise IOError( 

1046 "Lock for file %r did already exist, delete %r in case the lock is illegal" 

1047 % (self._file_path, lock_file) 

1048 ) 

1049 

1050 try: 

1051 with open(lock_file, mode="w"): 

1052 pass 

1053 except OSError as e: 

1054 raise IOError(str(e)) from e 

1055 

1056 self._owns_lock = True 

1057 

1058 def _obtain_lock(self) -> None: 

1059 """The default implementation will raise if a lock cannot be obtained. 

1060 

1061 Subclasses may override this method to provide a different implementation. 

1062 """ 

1063 return self._obtain_lock_or_raise() 

1064 

1065 def _release_lock(self) -> None: 

1066 """Release our lock if we have one.""" 

1067 if not self._has_lock(): 

1068 return 

1069 

1070 # If someone removed our file beforehand, lets just flag this issue instead of 

1071 # failing, to make it more usable. 

1072 lfp = self._lock_file_path() 

1073 try: 

1074 rmfile(lfp) 

1075 except OSError: 

1076 pass 

1077 self._owns_lock = False 

1078 

1079 

1080class BlockingLockFile(LockFile): 

1081 """The lock file will block until a lock could be obtained, or fail after a 

1082 specified timeout. 

1083 

1084 :note: 

1085 If the directory containing the lock was removed, an exception will be raised 

1086 during the blocking period, preventing hangs as the lock can never be obtained. 

1087 """ 

1088 

1089 __slots__ = ("_check_interval", "_max_block_time") 

1090 

1091 def __init__( 

1092 self, 

1093 file_path: PathLike, 

1094 check_interval_s: float = 0.3, 

1095 max_block_time_s: int = sys.maxsize, 

1096 ) -> None: 

1097 """Configure the instance. 

1098 

1099 :param check_interval_s: 

1100 Period of time to sleep until the lock is checked the next time. 

1101 By default, it waits a nearly unlimited time. 

1102 

1103 :param max_block_time_s: 

1104 Maximum amount of seconds we may lock. 

1105 """ 

1106 super().__init__(file_path) 

1107 self._check_interval = check_interval_s 

1108 self._max_block_time = max_block_time_s 

1109 

1110 def _obtain_lock(self) -> None: 

1111 """This method blocks until it obtained the lock, or raises :exc:`IOError` if it 

1112 ran out of time or if the parent directory was not available anymore. 

1113 

1114 If this method returns, you are guaranteed to own the lock. 

1115 """ 

1116 starttime = time.time() 

1117 maxtime = starttime + float(self._max_block_time) 

1118 while True: 

1119 try: 

1120 super()._obtain_lock() 

1121 except IOError as e: 

1122 # synity check: if the directory leading to the lockfile is not 

1123 # readable anymore, raise an exception 

1124 curtime = time.time() 

1125 if not osp.isdir(osp.dirname(self._lock_file_path())): 

1126 msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % ( 

1127 self._lock_file_path(), 

1128 curtime - starttime, 

1129 ) 

1130 raise IOError(msg) from e 

1131 # END handle missing directory 

1132 

1133 if curtime >= maxtime: 

1134 msg = "Waited %g seconds for lock at %r" % ( 

1135 maxtime - starttime, 

1136 self._lock_file_path(), 

1137 ) 

1138 raise IOError(msg) from e 

1139 # END abort if we wait too long 

1140 time.sleep(self._check_interval) 

1141 else: 

1142 break 

1143 # END endless loop 

1144 

1145 

1146class IterableList(List[T_IterableObj]): 

1147 """List of iterable objects allowing to query an object by id or by named index:: 

1148 

1149 heads = repo.heads 

1150 heads.master 

1151 heads['master'] 

1152 heads[0] 

1153 

1154 Iterable parent objects: 

1155 

1156 * :class:`Commit <git.objects.Commit>` 

1157 * :class:`Submodule <git.objects.submodule.base.Submodule>` 

1158 * :class:`Reference <git.refs.reference.Reference>` 

1159 * :class:`FetchInfo <git.remote.FetchInfo>` 

1160 * :class:`PushInfo <git.remote.PushInfo>` 

1161 

1162 Iterable via inheritance: 

1163 

1164 * :class:`Head <git.refs.head.Head>` 

1165 * :class:`TagReference <git.refs.tag.TagReference>` 

1166 * :class:`RemoteReference <git.refs.remote.RemoteReference>` 

1167 

1168 This requires an ``id_attribute`` name to be set which will be queried from its 

1169 contained items to have a means for comparison. 

1170 

1171 A prefix can be specified which is to be used in case the id returned by the items 

1172 always contains a prefix that does not matter to the user, so it can be left out. 

1173 """ 

1174 

1175 __slots__ = ("_id_attr", "_prefix") 

1176 

1177 def __new__(cls, id_attr: str, prefix: str = "") -> "IterableList[T_IterableObj]": 

1178 return super().__new__(cls) 

1179 

1180 def __init__(self, id_attr: str, prefix: str = "") -> None: 

1181 self._id_attr = id_attr 

1182 self._prefix = prefix 

1183 

1184 def __contains__(self, attr: object) -> bool: 

1185 # First try identity match for performance. 

1186 try: 

1187 rval = list.__contains__(self, attr) 

1188 if rval: 

1189 return rval 

1190 except (AttributeError, TypeError): 

1191 pass 

1192 # END handle match 

1193 

1194 # Otherwise make a full name search. 

1195 try: 

1196 getattr(self, cast(str, attr)) # Use cast to silence mypy. 

1197 return True 

1198 except (AttributeError, TypeError): 

1199 return False 

1200 # END handle membership 

1201 

1202 def __getattr__(self, attr: str) -> T_IterableObj: 

1203 attr = self._prefix + attr 

1204 for item in self: 

1205 if getattr(item, self._id_attr) == attr: 

1206 return item 

1207 # END for each item 

1208 return list.__getattribute__(self, attr) 

1209 

1210 def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> T_IterableObj: # type: ignore[override] 

1211 if isinstance(index, int): 

1212 return list.__getitem__(self, index) 

1213 elif isinstance(index, slice): 

1214 raise ValueError("Index should be an int or str") 

1215 else: 

1216 try: 

1217 return getattr(self, index) 

1218 except AttributeError as e: 

1219 raise IndexError("No item found with id %r" % (self._prefix + index)) from e 

1220 # END handle getattr 

1221 

1222 def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> None: 

1223 delindex = cast(int, index) 

1224 if not isinstance(index, int): 

1225 delindex = -1 

1226 name = self._prefix + index 

1227 for i, item in enumerate(self): 

1228 if getattr(item, self._id_attr) == name: 

1229 delindex = i 

1230 break 

1231 # END search index 

1232 # END for each item 

1233 if delindex == -1: 

1234 raise IndexError("Item with name %s not found" % name) 

1235 # END handle error 

1236 # END get index to delete 

1237 list.__delitem__(self, delindex) 

1238 

1239 

1240@runtime_checkable 

1241class IterableObj(Protocol): 

1242 """Defines an interface for iterable items, so there is a uniform way to retrieve 

1243 and iterate items within the git repository. 

1244 

1245 Subclasses: 

1246 

1247 * :class:`Submodule <git.objects.submodule.base.Submodule>` 

1248 * :class:`Commit <git.objects.Commit>` 

1249 * :class:`Reference <git.refs.reference.Reference>` 

1250 * :class:`PushInfo <git.remote.PushInfo>` 

1251 * :class:`FetchInfo <git.remote.FetchInfo>` 

1252 * :class:`Remote <git.remote.Remote>` 

1253 """ 

1254 

1255 __slots__ = () 

1256 

1257 _id_attribute_: str 

1258 

1259 @classmethod 

1260 @abstractmethod 

1261 def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Iterator[T_IterableObj]: 

1262 # Return-typed to be compatible with subtypes e.g. Remote. 

1263 """Find (all) items of this type. 

1264 

1265 Subclasses can specify `args` and `kwargs` differently, and may use them for 

1266 filtering. However, when the method is called with no additional positional or 

1267 keyword arguments, subclasses are obliged to to yield all items. 

1268 

1269 :return: 

1270 Iterator yielding Items 

1271 """ 

1272 raise NotImplementedError("To be implemented by Subclass") 

1273 

1274 @classmethod 

1275 def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> IterableList[T_IterableObj]: 

1276 """Find (all) items of this type and collect them into a list. 

1277 

1278 For more information about the arguments, see :meth:`iter_items`. 

1279 

1280 :note: 

1281 Favor the :meth:`iter_items` method as it will avoid eagerly collecting all 

1282 items. When there are many items, that can slow performance and increase 

1283 memory usage. 

1284 

1285 :return: 

1286 list(Item,...) list of item instances 

1287 """ 

1288 out_list: IterableList = IterableList(cls._id_attribute_) 

1289 out_list.extend(cls.iter_items(repo, *args, **kwargs)) 

1290 return out_list 

1291 

1292 

1293class IterableClassWatcher(type): 

1294 """Metaclass that issues :exc:`DeprecationWarning` when :class:`git.util.Iterable` 

1295 is subclassed.""" 

1296 

1297 def __init__(cls, name: str, bases: Tuple, clsdict: Dict) -> None: 

1298 for base in bases: 

1299 if type(base) is IterableClassWatcher: 

1300 warnings.warn( 

1301 f"GitPython Iterable subclassed by {name}." 

1302 " Iterable is deprecated due to naming clash since v3.1.18" 

1303 " and will be removed in 4.0.0." 

1304 " Use IterableObj instead.", 

1305 DeprecationWarning, 

1306 stacklevel=2, 

1307 ) 

1308 

1309 

1310class Iterable(metaclass=IterableClassWatcher): 

1311 """Deprecated, use :class:`IterableObj` instead. 

1312 

1313 Defines an interface for iterable items, so there is a uniform way to retrieve 

1314 and iterate items within the git repository. 

1315 """ 

1316 

1317 __slots__ = () 

1318 

1319 _id_attribute_ = "attribute that most suitably identifies your instance" 

1320 

1321 @classmethod 

1322 def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any: 

1323 """Deprecated, use :class:`IterableObj` instead. 

1324 

1325 Find (all) items of this type. 

1326 

1327 See :meth:`IterableObj.iter_items` for details on usage. 

1328 

1329 :return: 

1330 Iterator yielding Items 

1331 """ 

1332 raise NotImplementedError("To be implemented by Subclass") 

1333 

1334 @classmethod 

1335 def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any: 

1336 """Deprecated, use :class:`IterableObj` instead. 

1337 

1338 Find (all) items of this type and collect them into a list. 

1339 

1340 See :meth:`IterableObj.list_items` for details on usage. 

1341 

1342 :return: 

1343 list(Item,...) list of item instances 

1344 """ 

1345 out_list: Any = IterableList(cls._id_attribute_) 

1346 out_list.extend(cls.iter_items(repo, *args, **kwargs)) 

1347 return out_list 

1348 

1349 

1350# } END classes