Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/git/util.py: 50%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors
2#
3# This module is part of GitPython and is released under the
4# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
6import sys
8__all__ = [
9 "stream_copy",
10 "join_path",
11 "to_native_path_linux",
12 "join_path_native",
13 "Stats",
14 "IndexFileSHA1Writer",
15 "IterableObj",
16 "IterableList",
17 "BlockingLockFile",
18 "LockFile",
19 "Actor",
20 "get_user_id",
21 "assure_directory_exists",
22 "RemoteProgress",
23 "CallableRemoteProgress",
24 "rmtree",
25 "unbare_repo",
26 "HIDE_WINDOWS_KNOWN_ERRORS",
27]
29if sys.platform == "win32":
30 __all__.append("to_native_path_windows")
32from abc import abstractmethod
33import contextlib
34from functools import wraps
35import getpass
36import logging
37import os
38import os.path as osp
39import pathlib
40import platform
41import re
42import shutil
43import stat
44import subprocess
45import time
46from urllib.parse import urlsplit, urlunsplit
47import warnings
49# NOTE: Unused imports can be improved now that CI testing has fully resumed. Some of
50# these be used indirectly through other GitPython modules, which avoids having to write
51# gitdb all the time in their imports. They are not in __all__, at least currently,
52# because they could be removed or changed at any time, and so should not be considered
53# conceptually public to code outside GitPython. Linters of course do not like it.
54from gitdb.util import (
55 LazyMixin, # noqa: F401
56 LockedFD, # noqa: F401
57 bin_to_hex, # noqa: F401
58 file_contents_ro, # noqa: F401
59 file_contents_ro_filepath, # noqa: F401
60 hex_to_bin, # noqa: F401
61 make_sha,
62 to_bin_sha, # noqa: F401
63 to_hex_sha, # noqa: F401
64)
66# typing ---------------------------------------------------------
68from typing import (
69 Any,
70 AnyStr,
71 BinaryIO,
72 Callable,
73 Dict,
74 Generator,
75 IO,
76 Iterator,
77 List,
78 Optional,
79 Pattern,
80 Sequence,
81 Tuple,
82 TYPE_CHECKING,
83 TypeVar,
84 Union,
85 cast,
86 overload,
87)
89if TYPE_CHECKING:
90 from git.cmd import Git
91 from git.config import GitConfigParser, SectionConstraint
92 from git.remote import Remote
93 from git.repo.base import Repo
95from git.types import (
96 Files_TD,
97 Has_id_attribute,
98 HSH_TD,
99 Literal,
100 PathLike,
101 Protocol,
102 SupportsIndex,
103 Total_TD,
104 runtime_checkable,
105)
107# ---------------------------------------------------------------------
109T_IterableObj = TypeVar("T_IterableObj", bound=Union["IterableObj", "Has_id_attribute"], covariant=True)
110# So IterableList[Head] is subtype of IterableList[IterableObj].
112_logger = logging.getLogger(__name__)
115def _read_env_flag(name: str, default: bool) -> bool:
116 """Read a boolean flag from an environment variable.
118 :return:
119 The flag, or the `default` value if absent or ambiguous.
120 """
121 try:
122 value = os.environ[name]
123 except KeyError:
124 return default
126 _logger.warning(
127 "The %s environment variable is deprecated. Its effect has never been documented and changes without warning.",
128 name,
129 )
131 adjusted_value = value.strip().lower()
133 if adjusted_value in {"", "0", "false", "no"}:
134 return False
135 if adjusted_value in {"1", "true", "yes"}:
136 return True
137 _logger.warning("%s has unrecognized value %r, treating as %r.", name, value, default)
138 return default
141def _read_win_env_flag(name: str, default: bool) -> bool:
142 """Read a boolean flag from an environment variable on Windows.
144 :return:
145 On Windows, the flag, or the `default` value if absent or ambiguous.
146 On all other operating systems, ``False``.
148 :note:
149 This only accesses the environment on Windows.
150 """
151 return sys.platform == "win32" and _read_env_flag(name, default)
154#: We need an easy way to see if Appveyor TCs start failing,
155#: so the errors marked with this var are considered "acknowledged" ones, awaiting remedy,
156#: till then, we wish to hide them.
157HIDE_WINDOWS_KNOWN_ERRORS = _read_win_env_flag("HIDE_WINDOWS_KNOWN_ERRORS", True)
158HIDE_WINDOWS_FREEZE_ERRORS = _read_win_env_flag("HIDE_WINDOWS_FREEZE_ERRORS", True)
160# { Utility Methods
162T = TypeVar("T")
165def unbare_repo(func: Callable[..., T]) -> Callable[..., T]:
166 """Methods with this decorator raise :exc:`~git.exc.InvalidGitRepositoryError` if
167 they encounter a bare repository."""
169 from .exc import InvalidGitRepositoryError
171 @wraps(func)
172 def wrapper(self: "Remote", *args: Any, **kwargs: Any) -> T:
173 if self.repo.bare:
174 raise InvalidGitRepositoryError("Method '%s' cannot operate on bare repositories" % func.__name__)
175 # END bare method
176 return func(self, *args, **kwargs)
178 # END wrapper
180 return wrapper
183@contextlib.contextmanager
184def cwd(new_dir: PathLike) -> Generator[PathLike, None, None]:
185 """Context manager to temporarily change directory.
187 This is similar to :func:`contextlib.chdir` introduced in Python 3.11, but the
188 context manager object returned by a single call to this function is not reentrant.
189 """
190 old_dir = os.getcwd()
191 os.chdir(new_dir)
192 try:
193 yield new_dir
194 finally:
195 os.chdir(old_dir)
198@contextlib.contextmanager
199def patch_env(name: str, value: str) -> Generator[None, None, None]:
200 """Context manager to temporarily patch an environment variable."""
201 old_value = os.getenv(name)
202 os.environ[name] = value
203 try:
204 yield
205 finally:
206 if old_value is None:
207 del os.environ[name]
208 else:
209 os.environ[name] = old_value
212def rmtree(path: PathLike) -> None:
213 """Remove the given directory tree recursively.
215 :note:
216 We use :func:`shutil.rmtree` but adjust its behaviour to see whether files that
217 couldn't be deleted are read-only. Windows will not remove them in that case.
218 """
220 def handler(function: Callable, path: PathLike, _excinfo: Any) -> None:
221 """Callback for :func:`shutil.rmtree`.
223 This works as either a ``onexc`` or ``onerror`` style callback.
224 """
225 # Is the error an access error?
226 os.chmod(path, stat.S_IWUSR)
228 try:
229 function(path)
230 except PermissionError as ex:
231 if HIDE_WINDOWS_KNOWN_ERRORS:
232 from unittest import SkipTest
234 raise SkipTest(f"FIXME: fails with: PermissionError\n {ex}") from ex
235 raise
237 if sys.platform != "win32":
238 shutil.rmtree(path)
239 elif sys.version_info >= (3, 12):
240 shutil.rmtree(path, onexc=handler)
241 else:
242 shutil.rmtree(path, onerror=handler)
245def rmfile(path: PathLike) -> None:
246 """Ensure file deleted also on *Windows* where read-only files need special
247 treatment."""
248 if osp.isfile(path):
249 if sys.platform == "win32":
250 os.chmod(path, 0o777)
251 os.remove(path)
254def stream_copy(source: BinaryIO, destination: BinaryIO, chunk_size: int = 512 * 1024) -> int:
255 """Copy all data from the `source` stream into the `destination` stream in chunks
256 of size `chunk_size`.
258 :return:
259 Number of bytes written
260 """
261 br = 0
262 while True:
263 chunk = source.read(chunk_size)
264 destination.write(chunk)
265 br += len(chunk)
266 if len(chunk) < chunk_size:
267 break
268 # END reading output stream
269 return br
272def join_path(a: PathLike, *p: PathLike) -> PathLike:
273 R"""Join path tokens together similar to osp.join, but always use ``/`` instead of
274 possibly ``\`` on Windows."""
275 path = str(a)
276 for b in p:
277 b = str(b)
278 if not b:
279 continue
280 if b.startswith("/"):
281 path += b[1:]
282 elif path == "" or path.endswith("/"):
283 path += b
284 else:
285 path += "/" + b
286 # END for each path token to add
287 return path
290if sys.platform == "win32":
292 def to_native_path_windows(path: PathLike) -> PathLike:
293 path = str(path)
294 return path.replace("/", "\\")
296 def to_native_path_linux(path: PathLike) -> str:
297 path = str(path)
298 return path.replace("\\", "/")
300 to_native_path = to_native_path_windows
301else:
302 # No need for any work on Linux.
303 def to_native_path_linux(path: PathLike) -> str:
304 return str(path)
306 to_native_path = to_native_path_linux
309def join_path_native(a: PathLike, *p: PathLike) -> PathLike:
310 R"""Like :func:`join_path`, but makes sure an OS native path is returned.
312 This is only needed to play it safe on Windows and to ensure nice paths that only
313 use ``\``.
314 """
315 return to_native_path(join_path(a, *p))
318def assure_directory_exists(path: PathLike, is_file: bool = False) -> bool:
319 """Make sure that the directory pointed to by path exists.
321 :param is_file:
322 If ``True``, `path` is assumed to be a file and handled correctly.
323 Otherwise it must be a directory.
325 :return:
326 ``True`` if the directory was created, ``False`` if it already existed.
327 """
328 if is_file:
329 path = osp.dirname(path)
330 # END handle file
331 if not osp.isdir(path):
332 os.makedirs(path, exist_ok=True)
333 return True
334 return False
337def _get_exe_extensions() -> Sequence[str]:
338 PATHEXT = os.environ.get("PATHEXT", None)
339 if PATHEXT:
340 return tuple(p.upper() for p in PATHEXT.split(os.pathsep))
341 elif sys.platform == "win32":
342 return (".BAT", ".COM", ".EXE")
343 else:
344 return ()
347def py_where(program: str, path: Optional[PathLike] = None) -> List[str]:
348 """Perform a path search to assist :func:`is_cygwin_git`.
350 This is not robust for general use. It is an implementation detail of
351 :func:`is_cygwin_git`. When a search following all shell rules is needed,
352 :func:`shutil.which` can be used instead.
354 :note:
355 Neither this function nor :func:`shutil.which` will predict the effect of an
356 executable search on a native Windows system due to a :class:`subprocess.Popen`
357 call without ``shell=True``, because shell and non-shell executable search on
358 Windows differ considerably.
359 """
360 # From: http://stackoverflow.com/a/377028/548792
361 winprog_exts = _get_exe_extensions()
363 def is_exec(fpath: str) -> bool:
364 return (
365 osp.isfile(fpath)
366 and os.access(fpath, os.X_OK)
367 and (
368 sys.platform != "win32" or not winprog_exts or any(fpath.upper().endswith(ext) for ext in winprog_exts)
369 )
370 )
372 progs = []
373 if not path:
374 path = os.environ["PATH"]
375 for folder in str(path).split(os.pathsep):
376 folder = folder.strip('"')
377 if folder:
378 exe_path = osp.join(folder, program)
379 for f in [exe_path] + ["%s%s" % (exe_path, e) for e in winprog_exts]:
380 if is_exec(f):
381 progs.append(f)
382 return progs
385def _cygexpath(drive: Optional[str], path: str) -> str:
386 if osp.isabs(path) and not drive:
387 # Invoked from `cygpath()` directly with `D:Apps\123`?
388 # It's an error, leave it alone just slashes)
389 p = path # convert to str if AnyPath given
390 else:
391 p = path and osp.normpath(osp.expandvars(osp.expanduser(path)))
392 if osp.isabs(p):
393 if drive:
394 # Confusing, maybe a remote system should expand vars.
395 p = path
396 else:
397 p = cygpath(p)
398 elif drive:
399 p = "/proc/cygdrive/%s/%s" % (drive.lower(), p)
400 p_str = str(p) # ensure it is a str and not AnyPath
401 return p_str.replace("\\", "/")
404_cygpath_parsers: Tuple[Tuple[Pattern[str], Callable, bool], ...] = (
405 # See: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx
406 # and: https://www.cygwin.com/cygwin-ug-net/using.html#unc-paths
407 (
408 re.compile(r"\\\\\?\\UNC\\([^\\]+)\\([^\\]+)(?:\\(.*))?"),
409 (lambda server, share, rest_path: "//%s/%s/%s" % (server, share, rest_path.replace("\\", "/"))),
410 False,
411 ),
412 (re.compile(r"\\\\\?\\(\w):[/\\](.*)"), (_cygexpath), False),
413 (re.compile(r"(\w):[/\\](.*)"), (_cygexpath), False),
414 (re.compile(r"file:(.*)", re.I), (lambda rest_path: rest_path), True),
415 (re.compile(r"(\w{2,}:.*)"), (lambda url: url), False), # remote URL, do nothing
416)
419def cygpath(path: str) -> str:
420 """Use :meth:`git.cmd.Git.polish_url` instead, that works on any environment."""
421 path = str(path) # Ensure is str and not AnyPath.
422 # Fix to use Paths when 3.5 dropped. Or to be just str if only for URLs?
423 if not path.startswith(("/cygdrive", "//", "/proc/cygdrive")):
424 for regex, parser, recurse in _cygpath_parsers:
425 match = regex.match(path)
426 if match:
427 path = parser(*match.groups())
428 if recurse:
429 path = cygpath(path)
430 break
431 else:
432 path = _cygexpath(None, path)
434 return path
437_decygpath_regex = re.compile(r"(?:/proc)?/cygdrive/(\w)(/.*)?")
440def decygpath(path: PathLike) -> str:
441 path = str(path)
442 m = _decygpath_regex.match(path)
443 if m:
444 drive, rest_path = m.groups()
445 path = "%s:%s" % (drive.upper(), rest_path or "")
447 return path.replace("/", "\\")
450#: Store boolean flags denoting if a specific Git executable
451#: is from a Cygwin installation (since `cache_lru()` unsupported on PY2).
452_is_cygwin_cache: Dict[str, Optional[bool]] = {}
455def _is_cygwin_git(git_executable: str) -> bool:
456 is_cygwin = _is_cygwin_cache.get(git_executable) # type: Optional[bool]
457 if is_cygwin is None:
458 is_cygwin = False
459 try:
460 git_dir = osp.dirname(git_executable)
461 if not git_dir:
462 res = py_where(git_executable)
463 git_dir = osp.dirname(res[0]) if res else ""
465 # Just a name given, not a real path.
466 uname_cmd = osp.join(git_dir, "uname")
467 process = subprocess.Popen([uname_cmd], stdout=subprocess.PIPE, universal_newlines=True)
468 uname_out, _ = process.communicate()
469 # retcode = process.poll()
470 is_cygwin = "CYGWIN" in uname_out
471 except Exception as ex:
472 _logger.debug("Failed checking if running in CYGWIN due to: %r", ex)
473 _is_cygwin_cache[git_executable] = is_cygwin
475 return is_cygwin
478@overload
479def is_cygwin_git(git_executable: None) -> Literal[False]: ...
482@overload
483def is_cygwin_git(git_executable: PathLike) -> bool: ...
486def is_cygwin_git(git_executable: Union[None, PathLike]) -> bool:
487 if sys.platform == "win32": # TODO: See if we can use `sys.platform != "cygwin"`.
488 return False
489 elif git_executable is None:
490 return False
491 else:
492 return _is_cygwin_git(str(git_executable))
495def get_user_id() -> str:
496 """:return: String identifying the currently active system user as ``name@node``"""
497 return "%s@%s" % (getpass.getuser(), platform.node())
500def finalize_process(proc: Union[subprocess.Popen, "Git.AutoInterrupt"], **kwargs: Any) -> None:
501 """Wait for the process (clone, fetch, pull or push) and handle its errors
502 accordingly."""
503 # TODO: No close proc-streams??
504 proc.wait(**kwargs)
507@overload
508def expand_path(p: None, expand_vars: bool = ...) -> None: ...
511@overload
512def expand_path(p: PathLike, expand_vars: bool = ...) -> str:
513 # TODO: Support for Python 3.5 has been dropped, so these overloads can be improved.
514 ...
517def expand_path(p: Union[None, PathLike], expand_vars: bool = True) -> Optional[PathLike]:
518 if isinstance(p, pathlib.Path):
519 return p.resolve()
520 try:
521 p = osp.expanduser(p) # type: ignore[arg-type]
522 if expand_vars:
523 p = osp.expandvars(p)
524 return osp.normpath(osp.abspath(p))
525 except Exception:
526 return None
529def remove_password_if_present(cmdline: Sequence[str]) -> List[str]:
530 """Parse any command line argument and if one of the elements is an URL with a
531 username and/or password, replace them by stars (in-place).
533 If nothing is found, this just returns the command line as-is.
535 This should be used for every log line that print a command line, as well as
536 exception messages.
537 """
538 new_cmdline = []
539 for index, to_parse in enumerate(cmdline):
540 new_cmdline.append(to_parse)
541 try:
542 url = urlsplit(to_parse)
543 # Remove password from the URL if present.
544 if url.password is None and url.username is None:
545 continue
547 if url.password is not None:
548 url = url._replace(netloc=url.netloc.replace(url.password, "*****"))
549 if url.username is not None:
550 url = url._replace(netloc=url.netloc.replace(url.username, "*****"))
551 new_cmdline[index] = urlunsplit(url)
552 except ValueError:
553 # This is not a valid URL.
554 continue
555 return new_cmdline
558# } END utilities
560# { Classes
563class RemoteProgress:
564 """Handler providing an interface to parse progress information emitted by
565 :manpage:`git-push(1)` and :manpage:`git-fetch(1)` and to dispatch callbacks
566 allowing subclasses to react to the progress."""
568 _num_op_codes: int = 9
569 (
570 BEGIN,
571 END,
572 COUNTING,
573 COMPRESSING,
574 WRITING,
575 RECEIVING,
576 RESOLVING,
577 FINDING_SOURCES,
578 CHECKING_OUT,
579 ) = [1 << x for x in range(_num_op_codes)]
580 STAGE_MASK = BEGIN | END
581 OP_MASK = ~STAGE_MASK
583 DONE_TOKEN = "done."
584 TOKEN_SEPARATOR = ", "
586 __slots__ = (
587 "_cur_line",
588 "_seen_ops",
589 "error_lines", # Lines that started with 'error:' or 'fatal:'.
590 "other_lines", # Lines not denoting progress (i.e.g. push-infos).
591 )
592 re_op_absolute = re.compile(r"(remote: )?([\w\s]+):\s+()(\d+)()(.*)")
593 re_op_relative = re.compile(r"(remote: )?([\w\s]+):\s+(\d+)% \((\d+)/(\d+)\)(.*)")
595 def __init__(self) -> None:
596 self._seen_ops: List[int] = []
597 self._cur_line: Optional[str] = None
598 self.error_lines: List[str] = []
599 self.other_lines: List[str] = []
601 def _parse_progress_line(self, line: AnyStr) -> None:
602 """Parse progress information from the given line as retrieved by
603 :manpage:`git-push(1)` or :manpage:`git-fetch(1)`.
605 - Lines that do not contain progress info are stored in :attr:`other_lines`.
606 - Lines that seem to contain an error (i.e. start with ``error:`` or ``fatal:``)
607 are stored in :attr:`error_lines`.
608 """
609 # handle
610 # Counting objects: 4, done.
611 # Compressing objects: 50% (1/2)
612 # Compressing objects: 100% (2/2)
613 # Compressing objects: 100% (2/2), done.
614 if isinstance(line, bytes): # mypy argues about ternary assignment.
615 line_str = line.decode("utf-8")
616 else:
617 line_str = line
618 self._cur_line = line_str
620 if self._cur_line.startswith(("error:", "fatal:")):
621 self.error_lines.append(self._cur_line)
622 return
624 cur_count, max_count = None, None
625 match = self.re_op_relative.match(line_str)
626 if match is None:
627 match = self.re_op_absolute.match(line_str)
629 if not match:
630 self.line_dropped(line_str)
631 self.other_lines.append(line_str)
632 return
633 # END could not get match
635 op_code = 0
636 _remote, op_name, _percent, cur_count, max_count, message = match.groups()
638 # Get operation ID.
639 if op_name == "Counting objects":
640 op_code |= self.COUNTING
641 elif op_name == "Compressing objects":
642 op_code |= self.COMPRESSING
643 elif op_name == "Writing objects":
644 op_code |= self.WRITING
645 elif op_name == "Receiving objects":
646 op_code |= self.RECEIVING
647 elif op_name == "Resolving deltas":
648 op_code |= self.RESOLVING
649 elif op_name == "Finding sources":
650 op_code |= self.FINDING_SOURCES
651 elif op_name == "Checking out files":
652 op_code |= self.CHECKING_OUT
653 else:
654 # Note: On Windows it can happen that partial lines are sent.
655 # Hence we get something like "CompreReceiving objects", which is
656 # a blend of "Compressing objects" and "Receiving objects".
657 # This can't really be prevented, so we drop the line verbosely
658 # to make sure we get informed in case the process spits out new
659 # commands at some point.
660 self.line_dropped(line_str)
661 # Note: Don't add this line to the other lines, as we have to silently
662 # drop it.
663 return
664 # END handle op code
666 # Figure out stage.
667 if op_code not in self._seen_ops:
668 self._seen_ops.append(op_code)
669 op_code |= self.BEGIN
670 # END begin opcode
672 if message is None:
673 message = ""
674 # END message handling
676 message = message.strip()
677 if message.endswith(self.DONE_TOKEN):
678 op_code |= self.END
679 message = message[: -len(self.DONE_TOKEN)]
680 # END end message handling
681 message = message.strip(self.TOKEN_SEPARATOR)
683 self.update(
684 op_code,
685 cur_count and float(cur_count),
686 max_count and float(max_count),
687 message,
688 )
690 def new_message_handler(self) -> Callable[[str], None]:
691 """
692 :return:
693 A progress handler suitable for :func:`~git.cmd.handle_process_output`,
694 passing lines on to this progress handler in a suitable format.
695 """
697 def handler(line: AnyStr) -> None:
698 return self._parse_progress_line(line.rstrip())
700 # END handler
702 return handler
704 def line_dropped(self, line: str) -> None:
705 """Called whenever a line could not be understood and was therefore dropped."""
706 pass
708 def update(
709 self,
710 op_code: int,
711 cur_count: Union[str, float],
712 max_count: Union[str, float, None] = None,
713 message: str = "",
714 ) -> None:
715 """Called whenever the progress changes.
717 :param op_code:
718 Integer allowing to be compared against Operation IDs and stage IDs.
720 Stage IDs are :const:`BEGIN` and :const:`END`. :const:`BEGIN` will only be
721 set once for each Operation ID as well as :const:`END`. It may be that
722 :const:`BEGIN` and :const:`END` are set at once in case only one progress
723 message was emitted due to the speed of the operation. Between
724 :const:`BEGIN` and :const:`END`, none of these flags will be set.
726 Operation IDs are all held within the :const:`OP_MASK`. Only one Operation
727 ID will be active per call.
729 :param cur_count:
730 Current absolute count of items.
732 :param max_count:
733 The maximum count of items we expect. It may be ``None`` in case there is no
734 maximum number of items or if it is (yet) unknown.
736 :param message:
737 In case of the :const:`WRITING` operation, it contains the amount of bytes
738 transferred. It may possibly be used for other purposes as well.
740 :note:
741 You may read the contents of the current line in
742 :attr:`self._cur_line <_cur_line>`.
743 """
744 pass
747class CallableRemoteProgress(RemoteProgress):
748 """A :class:`RemoteProgress` implementation forwarding updates to any callable.
750 :note:
751 Like direct instances of :class:`RemoteProgress`, instances of this
752 :class:`CallableRemoteProgress` class are not themselves directly callable.
753 Rather, instances of this class wrap a callable and forward to it. This should
754 therefore not be confused with :class:`git.types.CallableProgress`.
755 """
757 __slots__ = ("_callable",)
759 def __init__(self, fn: Callable) -> None:
760 self._callable = fn
761 super().__init__()
763 def update(self, *args: Any, **kwargs: Any) -> None:
764 self._callable(*args, **kwargs)
767class Actor:
768 """Actors hold information about a person acting on the repository. They can be
769 committers and authors or anything with a name and an email as mentioned in the git
770 log entries."""
772 # PRECOMPILED REGEX
773 name_only_regex = re.compile(r"<(.*)>")
774 name_email_regex = re.compile(r"(.*) <(.*?)>")
776 # ENVIRONMENT VARIABLES
777 # These are read when creating new commits.
778 env_author_name = "GIT_AUTHOR_NAME"
779 env_author_email = "GIT_AUTHOR_EMAIL"
780 env_committer_name = "GIT_COMMITTER_NAME"
781 env_committer_email = "GIT_COMMITTER_EMAIL"
783 # CONFIGURATION KEYS
784 conf_name = "name"
785 conf_email = "email"
787 __slots__ = ("name", "email")
789 def __init__(self, name: Optional[str], email: Optional[str]) -> None:
790 self.name = name
791 self.email = email
793 def __eq__(self, other: Any) -> bool:
794 return self.name == other.name and self.email == other.email
796 def __ne__(self, other: Any) -> bool:
797 return not (self == other)
799 def __hash__(self) -> int:
800 return hash((self.name, self.email))
802 def __str__(self) -> str:
803 return self.name if self.name else ""
805 def __repr__(self) -> str:
806 return '<git.Actor "%s <%s>">' % (self.name, self.email)
808 @classmethod
809 def _from_string(cls, string: str) -> "Actor":
810 """Create an :class:`Actor` from a string.
812 :param string:
813 The string, which is expected to be in regular git format::
815 John Doe <jdoe@example.com>
817 :return:
818 :class:`Actor`
819 """
820 m = cls.name_email_regex.search(string)
821 if m:
822 name, email = m.groups()
823 return Actor(name, email)
824 else:
825 m = cls.name_only_regex.search(string)
826 if m:
827 return Actor(m.group(1), None)
828 # Assume the best and use the whole string as name.
829 return Actor(string, None)
830 # END special case name
831 # END handle name/email matching
833 @classmethod
834 def _main_actor(
835 cls,
836 env_name: str,
837 env_email: str,
838 config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None,
839 ) -> "Actor":
840 actor = Actor("", "")
841 user_id = None # We use this to avoid multiple calls to getpass.getuser().
843 def default_email() -> str:
844 nonlocal user_id
845 if not user_id:
846 user_id = get_user_id()
847 return user_id
849 def default_name() -> str:
850 return default_email().split("@")[0]
852 for attr, evar, cvar, default in (
853 ("name", env_name, cls.conf_name, default_name),
854 ("email", env_email, cls.conf_email, default_email),
855 ):
856 try:
857 val = os.environ[evar]
858 setattr(actor, attr, val)
859 except KeyError:
860 if config_reader is not None:
861 try:
862 val = config_reader.get("user", cvar)
863 except Exception:
864 val = default()
865 setattr(actor, attr, val)
866 # END config-reader handling
867 if not getattr(actor, attr):
868 setattr(actor, attr, default())
869 # END handle name
870 # END for each item to retrieve
871 return actor
873 @classmethod
874 def committer(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor":
875 """
876 :return:
877 :class:`Actor` instance corresponding to the configured committer. It
878 behaves similar to the git implementation, such that the environment will
879 override configuration values of `config_reader`. If no value is set at all,
880 it will be generated.
882 :param config_reader:
883 ConfigReader to use to retrieve the values from in case they are not set in
884 the environment.
885 """
886 return cls._main_actor(cls.env_committer_name, cls.env_committer_email, config_reader)
888 @classmethod
889 def author(cls, config_reader: Union[None, "GitConfigParser", "SectionConstraint"] = None) -> "Actor":
890 """Same as :meth:`committer`, but defines the main author. It may be specified
891 in the environment, but defaults to the committer."""
892 return cls._main_actor(cls.env_author_name, cls.env_author_email, config_reader)
895class Stats:
896 """Represents stat information as presented by git at the end of a merge. It is
897 created from the output of a diff operation.
899 Example::
901 c = Commit( sha1 )
902 s = c.stats
903 s.total # full-stat-dict
904 s.files # dict( filepath : stat-dict )
906 ``stat-dict``
908 A dictionary with the following keys and values::
910 deletions = number of deleted lines as int
911 insertions = number of inserted lines as int
912 lines = total number of lines changed as int, or deletions + insertions
914 ``full-stat-dict``
916 In addition to the items in the stat-dict, it features additional information::
918 files = number of changed files as int
919 """
921 __slots__ = ("total", "files")
923 def __init__(self, total: Total_TD, files: Dict[PathLike, Files_TD]) -> None:
924 self.total = total
925 self.files = files
927 @classmethod
928 def _list_from_string(cls, repo: "Repo", text: str) -> "Stats":
929 """Create a :class:`Stats` object from output retrieved by
930 :manpage:`git-diff(1)`.
932 :return:
933 :class:`git.Stats`
934 """
936 hsh: HSH_TD = {
937 "total": {"insertions": 0, "deletions": 0, "lines": 0, "files": 0},
938 "files": {},
939 }
940 for line in text.splitlines():
941 (raw_insertions, raw_deletions, filename) = line.split("\t")
942 insertions = raw_insertions != "-" and int(raw_insertions) or 0
943 deletions = raw_deletions != "-" and int(raw_deletions) or 0
944 hsh["total"]["insertions"] += insertions
945 hsh["total"]["deletions"] += deletions
946 hsh["total"]["lines"] += insertions + deletions
947 hsh["total"]["files"] += 1
948 files_dict: Files_TD = {
949 "insertions": insertions,
950 "deletions": deletions,
951 "lines": insertions + deletions,
952 }
953 hsh["files"][filename.strip()] = files_dict
954 return Stats(hsh["total"], hsh["files"])
957class IndexFileSHA1Writer:
958 """Wrapper around a file-like object that remembers the SHA1 of the data written to
959 it. It will write a sha when the stream is closed or if asked for explicitly using
960 :meth:`write_sha`.
962 Only useful to the index file.
964 :note:
965 Based on the dulwich project.
966 """
968 __slots__ = ("f", "sha1")
970 def __init__(self, f: IO) -> None:
971 self.f = f
972 self.sha1 = make_sha(b"")
974 def write(self, data: AnyStr) -> int:
975 self.sha1.update(data)
976 return self.f.write(data)
978 def write_sha(self) -> bytes:
979 sha = self.sha1.digest()
980 self.f.write(sha)
981 return sha
983 def close(self) -> bytes:
984 sha = self.write_sha()
985 self.f.close()
986 return sha
988 def tell(self) -> int:
989 return self.f.tell()
992class LockFile:
993 """Provides methods to obtain, check for, and release a file based lock which
994 should be used to handle concurrent access to the same file.
996 As we are a utility class to be derived from, we only use protected methods.
998 Locks will automatically be released on destruction.
999 """
1001 __slots__ = ("_file_path", "_owns_lock")
1003 def __init__(self, file_path: PathLike) -> None:
1004 self._file_path = file_path
1005 self._owns_lock = False
1007 def __del__(self) -> None:
1008 self._release_lock()
1010 def _lock_file_path(self) -> str:
1011 """:return: Path to lockfile"""
1012 return "%s.lock" % (self._file_path)
1014 def _has_lock(self) -> bool:
1015 """
1016 :return:
1017 True if we have a lock and if the lockfile still exists
1019 :raise AssertionError:
1020 If our lock-file does not exist.
1021 """
1022 return self._owns_lock
1024 def _obtain_lock_or_raise(self) -> None:
1025 """Create a lock file as flag for other instances, mark our instance as
1026 lock-holder.
1028 :raise IOError:
1029 If a lock was already present or a lock file could not be written.
1030 """
1031 if self._has_lock():
1032 return
1033 lock_file = self._lock_file_path()
1034 if osp.isfile(lock_file):
1035 raise IOError(
1036 "Lock for file %r did already exist, delete %r in case the lock is illegal"
1037 % (self._file_path, lock_file)
1038 )
1040 try:
1041 with open(lock_file, mode="w"):
1042 pass
1043 except OSError as e:
1044 raise IOError(str(e)) from e
1046 self._owns_lock = True
1048 def _obtain_lock(self) -> None:
1049 """The default implementation will raise if a lock cannot be obtained.
1051 Subclasses may override this method to provide a different implementation.
1052 """
1053 return self._obtain_lock_or_raise()
1055 def _release_lock(self) -> None:
1056 """Release our lock if we have one."""
1057 if not self._has_lock():
1058 return
1060 # If someone removed our file beforehand, lets just flag this issue instead of
1061 # failing, to make it more usable.
1062 lfp = self._lock_file_path()
1063 try:
1064 rmfile(lfp)
1065 except OSError:
1066 pass
1067 self._owns_lock = False
1070class BlockingLockFile(LockFile):
1071 """The lock file will block until a lock could be obtained, or fail after a
1072 specified timeout.
1074 :note:
1075 If the directory containing the lock was removed, an exception will be raised
1076 during the blocking period, preventing hangs as the lock can never be obtained.
1077 """
1079 __slots__ = ("_check_interval", "_max_block_time")
1081 def __init__(
1082 self,
1083 file_path: PathLike,
1084 check_interval_s: float = 0.3,
1085 max_block_time_s: int = sys.maxsize,
1086 ) -> None:
1087 """Configure the instance.
1089 :param check_interval_s:
1090 Period of time to sleep until the lock is checked the next time.
1091 By default, it waits a nearly unlimited time.
1093 :param max_block_time_s:
1094 Maximum amount of seconds we may lock.
1095 """
1096 super().__init__(file_path)
1097 self._check_interval = check_interval_s
1098 self._max_block_time = max_block_time_s
1100 def _obtain_lock(self) -> None:
1101 """This method blocks until it obtained the lock, or raises :exc:`IOError` if it
1102 ran out of time or if the parent directory was not available anymore.
1104 If this method returns, you are guaranteed to own the lock.
1105 """
1106 starttime = time.time()
1107 maxtime = starttime + float(self._max_block_time)
1108 while True:
1109 try:
1110 super()._obtain_lock()
1111 except IOError as e:
1112 # synity check: if the directory leading to the lockfile is not
1113 # readable anymore, raise an exception
1114 curtime = time.time()
1115 if not osp.isdir(osp.dirname(self._lock_file_path())):
1116 msg = "Directory containing the lockfile %r was not readable anymore after waiting %g seconds" % (
1117 self._lock_file_path(),
1118 curtime - starttime,
1119 )
1120 raise IOError(msg) from e
1121 # END handle missing directory
1123 if curtime >= maxtime:
1124 msg = "Waited %g seconds for lock at %r" % (
1125 maxtime - starttime,
1126 self._lock_file_path(),
1127 )
1128 raise IOError(msg) from e
1129 # END abort if we wait too long
1130 time.sleep(self._check_interval)
1131 else:
1132 break
1133 # END endless loop
1136class IterableList(List[T_IterableObj]):
1137 """List of iterable objects allowing to query an object by id or by named index::
1139 heads = repo.heads
1140 heads.master
1141 heads['master']
1142 heads[0]
1144 Iterable parent objects:
1146 * :class:`Commit <git.objects.Commit>`
1147 * :class:`Submodule <git.objects.submodule.base.Submodule>`
1148 * :class:`Reference <git.refs.reference.Reference>`
1149 * :class:`FetchInfo <git.remote.FetchInfo>`
1150 * :class:`PushInfo <git.remote.PushInfo>`
1152 Iterable via inheritance:
1154 * :class:`Head <git.refs.head.Head>`
1155 * :class:`TagReference <git.refs.tag.TagReference>`
1156 * :class:`RemoteReference <git.refs.remote.RemoteReference>`
1158 This requires an ``id_attribute`` name to be set which will be queried from its
1159 contained items to have a means for comparison.
1161 A prefix can be specified which is to be used in case the id returned by the items
1162 always contains a prefix that does not matter to the user, so it can be left out.
1163 """
1165 __slots__ = ("_id_attr", "_prefix")
1167 def __new__(cls, id_attr: str, prefix: str = "") -> "IterableList[T_IterableObj]":
1168 return super().__new__(cls)
1170 def __init__(self, id_attr: str, prefix: str = "") -> None:
1171 self._id_attr = id_attr
1172 self._prefix = prefix
1174 def __contains__(self, attr: object) -> bool:
1175 # First try identity match for performance.
1176 try:
1177 rval = list.__contains__(self, attr)
1178 if rval:
1179 return rval
1180 except (AttributeError, TypeError):
1181 pass
1182 # END handle match
1184 # Otherwise make a full name search.
1185 try:
1186 getattr(self, cast(str, attr)) # Use cast to silence mypy.
1187 return True
1188 except (AttributeError, TypeError):
1189 return False
1190 # END handle membership
1192 def __getattr__(self, attr: str) -> T_IterableObj:
1193 attr = self._prefix + attr
1194 for item in self:
1195 if getattr(item, self._id_attr) == attr:
1196 return item
1197 # END for each item
1198 return list.__getattribute__(self, attr)
1200 def __getitem__(self, index: Union[SupportsIndex, int, slice, str]) -> T_IterableObj: # type: ignore[override]
1201 assert isinstance(index, (int, str, slice)), "Index of IterableList should be an int or str"
1203 if isinstance(index, int):
1204 return list.__getitem__(self, index)
1205 elif isinstance(index, slice):
1206 raise ValueError("Index should be an int or str")
1207 else:
1208 try:
1209 return getattr(self, index)
1210 except AttributeError as e:
1211 raise IndexError("No item found with id %r" % (self._prefix + index)) from e
1212 # END handle getattr
1214 def __delitem__(self, index: Union[SupportsIndex, int, slice, str]) -> None:
1215 assert isinstance(index, (int, str)), "Index of IterableList should be an int or str"
1217 delindex = cast(int, index)
1218 if not isinstance(index, int):
1219 delindex = -1
1220 name = self._prefix + index
1221 for i, item in enumerate(self):
1222 if getattr(item, self._id_attr) == name:
1223 delindex = i
1224 break
1225 # END search index
1226 # END for each item
1227 if delindex == -1:
1228 raise IndexError("Item with name %s not found" % name)
1229 # END handle error
1230 # END get index to delete
1231 list.__delitem__(self, delindex)
1234@runtime_checkable
1235class IterableObj(Protocol):
1236 """Defines an interface for iterable items, so there is a uniform way to retrieve
1237 and iterate items within the git repository.
1239 Subclasses:
1241 * :class:`Submodule <git.objects.submodule.base.Submodule>`
1242 * :class:`Commit <git.objects.Commit>`
1243 * :class:`Reference <git.refs.reference.Reference>`
1244 * :class:`PushInfo <git.remote.PushInfo>`
1245 * :class:`FetchInfo <git.remote.FetchInfo>`
1246 * :class:`Remote <git.remote.Remote>`
1247 """
1249 __slots__ = ()
1251 _id_attribute_: str
1253 @classmethod
1254 @abstractmethod
1255 def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Iterator[T_IterableObj]:
1256 # Return-typed to be compatible with subtypes e.g. Remote.
1257 """Find (all) items of this type.
1259 Subclasses can specify `args` and `kwargs` differently, and may use them for
1260 filtering. However, when the method is called with no additional positional or
1261 keyword arguments, subclasses are obliged to to yield all items.
1263 :return:
1264 Iterator yielding Items
1265 """
1266 raise NotImplementedError("To be implemented by Subclass")
1268 @classmethod
1269 def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> IterableList[T_IterableObj]:
1270 """Find (all) items of this type and collect them into a list.
1272 For more information about the arguments, see :meth:`iter_items`.
1274 :note:
1275 Favor the :meth:`iter_items` method as it will avoid eagerly collecting all
1276 items. When there are many items, that can slow performance and increase
1277 memory usage.
1279 :return:
1280 list(Item,...) list of item instances
1281 """
1282 out_list: IterableList = IterableList(cls._id_attribute_)
1283 out_list.extend(cls.iter_items(repo, *args, **kwargs))
1284 return out_list
1287class IterableClassWatcher(type):
1288 """Metaclass that issues :exc:`DeprecationWarning` when :class:`git.util.Iterable`
1289 is subclassed."""
1291 def __init__(cls, name: str, bases: Tuple, clsdict: Dict) -> None:
1292 for base in bases:
1293 if type(base) is IterableClassWatcher:
1294 warnings.warn(
1295 f"GitPython Iterable subclassed by {name}."
1296 " Iterable is deprecated due to naming clash since v3.1.18"
1297 " and will be removed in 4.0.0."
1298 " Use IterableObj instead.",
1299 DeprecationWarning,
1300 stacklevel=2,
1301 )
1304class Iterable(metaclass=IterableClassWatcher):
1305 """Deprecated, use :class:`IterableObj` instead.
1307 Defines an interface for iterable items, so there is a uniform way to retrieve
1308 and iterate items within the git repository.
1309 """
1311 __slots__ = ()
1313 _id_attribute_ = "attribute that most suitably identifies your instance"
1315 @classmethod
1316 def iter_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any:
1317 """Deprecated, use :class:`IterableObj` instead.
1319 Find (all) items of this type.
1321 See :meth:`IterableObj.iter_items` for details on usage.
1323 :return:
1324 Iterator yielding Items
1325 """
1326 raise NotImplementedError("To be implemented by Subclass")
1328 @classmethod
1329 def list_items(cls, repo: "Repo", *args: Any, **kwargs: Any) -> Any:
1330 """Deprecated, use :class:`IterableObj` instead.
1332 Find (all) items of this type and collect them into a list.
1334 See :meth:`IterableObj.list_items` for details on usage.
1336 :return:
1337 list(Item,...) list of item instances
1338 """
1339 out_list: Any = IterableList(cls._id_attribute_)
1340 out_list.extend(cls.iter_items(repo, *args, **kwargs))
1341 return out_list
1344# } END classes