Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/git/repo/fun.py: 25%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# This module is part of GitPython and is released under the
2# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/
4"""General repository-related functions."""
6from __future__ import annotations
8__all__ = [
9 "rev_parse",
10 "is_git_dir",
11 "touch",
12 "find_submodule_git_dir",
13 "name_to_object",
14 "short_to_long",
15 "deref_tag",
16 "to_commit",
17 "find_worktree_git_dir",
18]
20import os
21import os.path as osp
22from pathlib import Path
23import re
24import stat
25from string import digits
27from gitdb.exc import BadName, BadObject
29from git.cmd import Git
30from git.exc import WorkTreeRepositoryUnsupported
31from git.objects import Object
32from git.objects.util import parse_date
33from git.refs import SymbolicReference
34from git.util import cygpath, bin_to_hex, hex_to_bin
36# Typing ----------------------------------------------------------------------
38from typing import Iterator, Optional, TYPE_CHECKING, Tuple, Union, cast, overload
40from git.types import AnyGitObject, Literal, PathLike
42if TYPE_CHECKING:
43 from git.db import GitCmdObjectDB
44 from git.objects import Commit
45 from git.refs.reference import Reference
46 from git.refs.log import RefLog, RefLogEntry
47 from git.refs.tag import Tag
49 from .base import Repo
51# ----------------------------------------------------------------------------
54def touch(filename: str) -> str:
55 with open(filename, "ab"):
56 pass
57 return filename
60def is_git_dir(d: PathLike) -> bool:
61 """This is taken from the git setup.c:is_git_directory function.
63 :raise git.exc.WorkTreeRepositoryUnsupported:
64 If it sees a worktree directory. It's quite hacky to do that here, but at least
65 clearly indicates that we don't support it. There is the unlikely danger to
66 throw if we see directories which just look like a worktree dir, but are none.
67 """
68 if osp.isdir(d):
69 if (osp.isdir(osp.join(d, "objects")) or "GIT_OBJECT_DIRECTORY" in os.environ) and osp.isdir(
70 osp.join(d, "refs")
71 ):
72 headref = osp.join(d, "HEAD")
73 return osp.isfile(headref) or (osp.islink(headref) and os.readlink(headref).startswith("refs"))
74 elif (
75 osp.isfile(osp.join(d, "gitdir"))
76 and osp.isfile(osp.join(d, "commondir"))
77 and osp.isfile(osp.join(d, "gitfile"))
78 ):
79 raise WorkTreeRepositoryUnsupported(d)
80 return False
83def find_worktree_git_dir(dotgit: PathLike) -> Optional[str]:
84 """Search for a gitdir for this worktree."""
85 try:
86 statbuf = os.stat(dotgit)
87 except OSError:
88 return None
89 if not stat.S_ISREG(statbuf.st_mode):
90 return None
92 try:
93 lines = Path(dotgit).read_text().splitlines()
94 for key, value in [line.strip().split(": ") for line in lines]:
95 if key == "gitdir":
96 return value
97 except ValueError:
98 pass
99 return None
102def find_submodule_git_dir(d: PathLike) -> Optional[PathLike]:
103 """Search for a submodule repo."""
104 if is_git_dir(d):
105 return d
107 try:
108 with open(d) as fp:
109 content = fp.read().rstrip()
110 except IOError:
111 # It's probably not a file.
112 pass
113 else:
114 if content.startswith("gitdir: "):
115 path = content[8:]
117 if Git.is_cygwin():
118 # Cygwin creates submodules prefixed with `/cygdrive/...`.
119 # Cygwin git understands Cygwin paths much better than Windows ones.
120 # Also the Cygwin tests are assuming Cygwin paths.
121 path = cygpath(path)
122 if not osp.isabs(path):
123 path = osp.normpath(osp.join(osp.dirname(d), path))
124 return find_submodule_git_dir(path)
125 # END handle exception
126 return None
129def short_to_long(odb: "GitCmdObjectDB", hexsha: str) -> Optional[bytes]:
130 """
131 :return:
132 Long hexadecimal sha1 from the given less than 40 byte hexsha, or ``None`` if no
133 candidate could be found.
135 :param hexsha:
136 hexsha with less than 40 bytes.
137 """
138 try:
139 return bin_to_hex(odb.partial_to_complete_sha_hex(hexsha))
140 except BadObject:
141 return None
142 # END exception handling
145def _describe_to_long(repo: "Repo", name: str) -> Optional[bytes]:
146 """Resolve git-describe style names to the abbreviated object they contain."""
147 match = re.match(r"^.+-\d+-g([0-9A-Fa-f]{4,40})(?:-dirty)?$", name)
148 if match is None:
149 match = re.match(r"^.+-g([0-9A-Fa-f]{4,40})(?:-dirty)?$", name)
150 if match is None:
151 match = re.match(r"^([0-9A-Fa-f]{4,40})-dirty$", name)
152 if match is None:
153 return None
154 # END handle match
156 hexsha = match.group(1)
157 if len(hexsha) == 40:
158 return hexsha.encode("ascii")
159 return short_to_long(repo.odb, hexsha)
162@overload
163def name_to_object(repo: "Repo", name: str, return_ref: Literal[False] = ...) -> AnyGitObject: ...
166@overload
167def name_to_object(repo: "Repo", name: str, return_ref: Literal[True]) -> Union[AnyGitObject, SymbolicReference]: ...
170def name_to_object(repo: "Repo", name: str, return_ref: bool = False) -> Union[AnyGitObject, SymbolicReference]:
171 """
172 :return:
173 Object specified by the given name - hexshas (short and long) as well as
174 references are supported.
176 :param return_ref:
177 If ``True``, and name specifies a reference, we will return the reference
178 instead of the object. Otherwise it will raise :exc:`~gitdb.exc.BadObject` or
179 :exc:`~gitdb.exc.BadName`.
180 """
181 hexsha: Union[None, str, bytes] = None
183 # Is it a hexsha? Try the most common ones, which is 7 to 40.
184 if repo.re_hexsha_shortened.match(name):
185 if len(name) != 40:
186 # Find long sha for short sha.
187 hexsha = short_to_long(repo.odb, name)
188 else:
189 hexsha = name
190 # END handle short shas
191 # END find sha if it matches
193 # If we couldn't find an object for what seemed to be a short hexsha, try to find it
194 # as reference anyway, it could be named 'aaa' for instance.
195 if hexsha is None:
196 for base in (
197 "%s",
198 "refs/%s",
199 "refs/tags/%s",
200 "refs/heads/%s",
201 "refs/remotes/%s",
202 "refs/remotes/%s/HEAD",
203 ):
204 try:
205 hexsha = SymbolicReference.dereference_recursive(repo, base % name)
206 if return_ref:
207 return SymbolicReference(repo, base % name)
208 # END handle symbolic ref
209 break
210 except ValueError:
211 pass
212 # END for each base
213 # END handle hexsha
215 if hexsha is None:
216 hexsha = _describe_to_long(repo, name)
217 # END handle describe output
219 # Didn't find any ref, this is an error.
220 if return_ref:
221 raise BadObject("Couldn't find reference named %r" % name)
222 # END handle return ref
224 # Tried everything ? fail.
225 if hexsha is None:
226 raise BadName(name)
227 # END assert hexsha was found
229 return Object.new_from_sha(repo, hex_to_bin(hexsha))
232def deref_tag(tag: "Tag") -> AnyGitObject:
233 """Recursively dereference a tag and return the resulting object."""
234 while True:
235 try:
236 tag = tag.object
237 except AttributeError:
238 break
239 # END dereference tag
240 return tag
243def to_commit(obj: Object) -> "Commit":
244 """Convert the given object to a commit if possible and return it."""
245 if obj.type == "tag":
246 obj = deref_tag(obj)
248 if obj.type != "commit":
249 raise ValueError("Cannot convert object %r to type commit" % obj)
250 # END verify type
251 return obj
254def _object_from_hexsha(repo: "Repo", hexsha: str) -> AnyGitObject:
255 return Object.new_from_sha(repo, hex_to_bin(hexsha))
258def _current_reflog_ref(repo: "Repo") -> SymbolicReference:
259 try:
260 return repo.head.ref
261 except TypeError:
262 return repo.head
263 # END handle detached head
266def _common_reflog_path(repo: "Repo", ref: SymbolicReference) -> Optional[str]:
267 if repo.common_dir == repo.git_dir:
268 return None
269 # END handle normal repository
270 return SymbolicReference._get_validated_path(osp.join(repo.common_dir, "logs"), ref.path)
273def _ref_log(repo: "Repo", ref: SymbolicReference) -> "RefLog":
274 try:
275 return ref.log()
276 except FileNotFoundError:
277 common_path = _common_reflog_path(repo, ref)
278 if common_path and osp.isfile(common_path):
279 from git.refs.log import RefLog
281 return RefLog.from_file(common_path)
282 # END handle linked-worktree branch logs
283 try:
284 if ref.path == repo.head.ref.path:
285 return repo.head.log()
286 # END handle linked-worktree current branch logs
287 except TypeError:
288 pass
289 # END handle detached head
290 raise
291 # END handle missing branch log
294def _ref_log_entry(repo: "Repo", ref: SymbolicReference, index: int) -> "RefLogEntry":
295 try:
296 return ref.log_entry(index)
297 except FileNotFoundError:
298 common_path = _common_reflog_path(repo, ref)
299 if common_path and osp.isfile(common_path):
300 from git.refs.log import RefLog
302 return RefLog.entry_at(common_path, index)
303 # END handle linked-worktree branch logs
304 try:
305 if ref.path == repo.head.ref.path:
306 return repo.head.log_entry(index)
307 # END handle linked-worktree current branch logs
308 except TypeError:
309 pass
310 # END handle detached head
311 raise
312 # END handle missing branch log
315def _find_reflog_entry_by_date(repo: "Repo", ref: SymbolicReference, spec: str) -> str:
316 try:
317 timestamp, _offset = parse_date(spec)
318 except ValueError as e:
319 raise NotImplementedError("Support for additional @{...} modes not implemented") from e
320 # END handle unsupported dates
321 log = _ref_log(repo, ref)
322 if not log:
323 raise IndexError("Invalid revlog date: %s" % spec)
324 # END handle empty log
326 for entry in reversed(log):
327 if entry.time[0] <= timestamp:
328 return entry.newhexsha
329 # END found candidate
330 # END for each entry
331 return log[0].newhexsha
334def _previous_checked_out_branch(repo: "Repo", nth: int) -> AnyGitObject:
335 if nth <= 0:
336 raise ValueError("Invalid previous checkout selector: -%i" % nth)
337 # END handle invalid input
339 seen = 0
340 for entry in reversed(_ref_log(repo, repo.head)):
341 message = entry.message or ""
342 prefix = "checkout: moving from "
343 if not message.startswith(prefix):
344 continue
345 # END skip non-checkouts
347 previous_branch = message[len(prefix) :].split(" to ", 1)[0]
348 seen += 1
349 if seen == nth:
350 return name_to_object(repo, previous_branch)
351 # END found selector
352 # END for each entry
353 raise IndexError("Invalid previous checkout selector: -%i" % nth)
356def _tracking_branch_object(repo: "Repo", ref: Optional[SymbolicReference]) -> AnyGitObject:
357 from git.refs.head import Head
359 if ref is None:
360 try:
361 head = repo.active_branch
362 except TypeError as e:
363 raise BadName("@{upstream}") from e
364 elif isinstance(ref, Head):
365 head = ref
366 elif os.fspath(ref.path).startswith("refs/heads/"):
367 head = Head(repo, ref.path)
368 else:
369 raise BadName("%s@{upstream}" % ref.name)
370 # END handle head
372 tracking_branch = head.tracking_branch()
373 if tracking_branch is None:
374 raise BadName("%s@{upstream}" % head.name)
375 # END handle missing upstream
376 return tracking_branch.commit
379def _apply_reflog(repo: "Repo", ref: Optional[SymbolicReference], content: str) -> AnyGitObject:
380 if content.startswith("+"):
381 content = content[1:]
382 # END handle explicit positive sign
384 if content.startswith("-"):
385 if ref is not None:
386 raise ValueError("Previous checkout selectors do not take an explicit ref")
387 if content == "-0":
388 raise ValueError("Negative zero is invalid in reflog selector")
389 # END handle invalid negative zero
390 try:
391 return _previous_checked_out_branch(repo, int(content[1:]))
392 except ValueError as e:
393 raise ValueError("Invalid previous checkout selector: %s" % content) from e
394 # END handle previous checkout branch
396 content_lower = content.lower()
397 if content_lower in ("u", "upstream", "push"):
398 return _tracking_branch_object(repo, ref)
399 # END handle sibling branches
401 ref = ref or _current_reflog_ref(repo)
402 try:
403 entry_no = int(content)
404 except ValueError:
405 hexsha = _find_reflog_entry_by_date(repo, ref, content)
406 else:
407 if entry_no >= 100000000:
408 hexsha = _find_reflog_entry_by_date(repo, ref, "%s +0000" % entry_no)
409 elif entry_no == 0:
410 return ref.commit
411 else:
412 try:
413 entry = _ref_log_entry(repo, ref, -(entry_no + 1))
414 except IndexError as e:
415 raise IndexError("Invalid revlog index: %i" % entry_no) from e
416 # END handle index out of bound
417 hexsha = entry.newhexsha
418 # END handle offset or date-like timestamp
419 # END handle content
420 return _object_from_hexsha(repo, hexsha)
423def _find_closing_brace(rev: str, start: int) -> int:
424 depth = 1
425 escaped = False
426 for idx in range(start + 1, len(rev)):
427 char = rev[idx]
428 if escaped:
429 escaped = False
430 elif char == "\\":
431 escaped = True
432 elif char == "{":
433 depth += 1
434 elif char == "}":
435 depth -= 1
436 if depth == 0:
437 return idx
438 # END found end
439 # END handle char
440 # END for each char
441 raise ValueError("Missing closing brace to define type in %s" % rev)
444def _parse_search(pattern: str) -> Tuple[str, bool]:
445 if not pattern:
446 raise ValueError("Revision search requires a pattern")
447 # END handle empty pattern
449 if pattern.startswith("!-"):
450 return pattern[2:], True
451 if pattern.startswith("!!"):
452 return pattern[1:], False
453 if pattern.startswith("!"):
454 raise ValueError("Need one character after /!, typically -")
455 return pattern, False
458def _unescape_braced_regex(pattern: str) -> str:
459 out = []
460 idx = 0
461 while idx < len(pattern):
462 char = pattern[idx]
463 if char == "\\" and idx + 1 < len(pattern):
464 next_char = pattern[idx + 1]
465 if next_char in "{}\\":
466 out.append(next_char)
467 else:
468 out.append(char)
469 out.append(next_char)
470 # END handle escaped char
471 idx += 2
472 continue
473 # END handle backslash
474 out.append(char)
475 idx += 1
476 # END for each char
477 return "".join(out)
480def _find_commit_by_message(
481 repo: "Repo", rev: Optional[AnyGitObject], pattern: str, braced: bool = False
482) -> AnyGitObject:
483 pattern, negated = _parse_search(_unescape_braced_regex(pattern) if braced else pattern)
484 try:
485 regex = re.compile(pattern)
486 except re.error as e:
487 raise ValueError("Invalid commit message regex %r" % pattern) from e
488 # END handle invalid regex
489 if rev is None:
490 commits = _all_ref_commits(repo)
491 else:
492 commits = _reachable_commits([to_commit(cast(Object, rev))])
493 # END handle starting point
495 for commit in commits:
496 message = commit.message
497 if isinstance(message, bytes):
498 message = message.decode(commit.encoding, "replace")
499 # END handle bytes message
500 matches = regex.search(message or "") is not None
501 if matches != negated:
502 return commit
503 # END found commit
504 # END for each commit
505 raise BadName("No commit found matching message pattern %r" % pattern)
508def _all_ref_commits(repo: "Repo") -> Iterator["Commit"]:
509 starts = []
510 for ref in repo.references:
511 try:
512 starts.append(to_commit(cast(Object, ref.object)))
513 except (BadName, ValueError):
514 pass
515 # END skip refs that do not point to commits
516 # END for each ref
517 try:
518 starts.append(repo.head.commit)
519 except ValueError:
520 pass
521 # END handle unborn head
522 return _reachable_commits(starts)
525def _reachable_commits(starts: list["Commit"]) -> Iterator["Commit"]:
526 seen = set()
527 pending = starts[:]
528 while pending:
529 pending.sort(key=lambda commit: commit.committed_date, reverse=True)
530 commit = pending.pop(0)
531 if commit.binsha in seen:
532 continue
533 # END skip seen commit
534 seen.add(commit.binsha)
535 yield commit
536 pending.extend(commit.parents)
537 # END while commits remain
540def _index_lookup(repo: "Repo", spec: str) -> AnyGitObject:
541 if not spec:
542 raise ValueError("':' must be followed by a path")
543 # END handle empty lookup
545 stage = 0
546 path = spec
547 if len(spec) >= 2 and spec[1] == ":" and spec[0] in "0123":
548 stage = int(spec[0])
549 path = spec[2:]
550 # END handle stage
552 try:
553 return repo.index.entries[(path, stage)].to_blob(repo)
554 except KeyError as e:
555 raise BadName("Path %r did not exist in the index at stage %i" % (path, stage)) from e
558def _tree_lookup(obj: AnyGitObject, path: str) -> AnyGitObject:
559 if obj.type != "tree":
560 obj = to_commit(cast(Object, obj)).tree
561 # END get tree
562 if not path:
563 return obj
564 return obj[path]
567def _peel(obj: AnyGitObject, output_type: str, repo: "Repo", rev: str) -> AnyGitObject:
568 if output_type.startswith("/"):
569 return _find_commit_by_message(repo, obj, output_type[1:], braced=True)
570 if output_type == "":
571 return deref_tag(obj) if obj.type == "tag" else obj
572 if output_type == "object":
573 return obj
574 if output_type == "commit":
575 return to_commit(cast(Object, obj))
576 if output_type == "tree":
577 return to_commit(cast(Object, obj)).tree if obj.type != "tree" else obj
578 if output_type == "blob":
579 obj = deref_tag(obj) if obj.type == "tag" else obj
580 if obj.type == output_type:
581 return obj
582 # END handle matching type
583 raise ValueError("Could not accommodate requested object type %r, got %s" % (output_type, obj.type))
584 if output_type == "tag":
585 if obj.type == output_type:
586 return obj
587 # END handle matching type
588 raise ValueError("Could not accommodate requested object type %r, got %s" % (output_type, obj.type))
589 # END handle known types
590 raise ValueError("Invalid output type: %s ( in %s )" % (output_type, rev))
593def _first_rev_token(rev: str) -> Optional[int]:
594 for idx, char in enumerate(rev):
595 if char in "^~:":
596 return idx
597 if char == "@":
598 next_char = rev[idx + 1] if idx + 1 < len(rev) else None
599 if idx == 0 and next_char in (None, "^", "~", ":", "{"):
600 return idx
601 if next_char == "{":
602 return idx
603 # END handle reflog selector
604 # END handle at symbol
605 # END for each char
606 return None
609def rev_parse(repo: "Repo", rev: str) -> AnyGitObject:
610 """Parse a revision string. Like :manpage:`git-rev-parse(1)`.
612 :return:
613 `~git.objects.base.Object` at the given revision.
615 This may be any type of git object:
617 * :class:`Commit <git.objects.commit.Commit>`
618 * :class:`TagObject <git.objects.tag.TagObject>`
619 * :class:`Tree <git.objects.tree.Tree>`
620 * :class:`Blob <git.objects.blob.Blob>`
622 :param rev:
623 :manpage:`git-rev-parse(1)`-compatible revision specification as string.
624 Please see :manpage:`git-rev-parse(1)` for details.
626 :raise gitdb.exc.BadObject:
627 If the given revision could not be found.
629 :raise ValueError:
630 If `rev` couldn't be parsed.
632 :raise IndexError:
633 If an invalid reflog index is specified.
634 """
635 if rev.startswith(":/"):
636 return _find_commit_by_message(repo, None, rev[2:])
637 if rev.startswith(":"):
638 return _index_lookup(repo, rev[1:])
639 # END handle top-level colon modes
641 obj: Optional[AnyGitObject] = None
642 ref = None
643 lr = len(rev)
644 first_token = _first_rev_token(rev)
645 if first_token is None:
646 return name_to_object(repo, rev)
647 # END handle plain name
649 if first_token == 0:
650 if rev[0] != "@":
651 raise ValueError("Revision specifier must start with an object name: %s" % rev)
652 # END handle invalid leading token
653 ref = _current_reflog_ref(repo)
654 obj = ref.commit
655 start = 0 if rev.startswith("@{") else 1
656 else:
657 if rev[first_token] == "@":
658 ref = cast("Reference", name_to_object(repo, rev[:first_token], return_ref=True))
659 obj = ref.commit
660 else:
661 obj = name_to_object(repo, rev[:first_token])
662 # END handle anchor
663 start = first_token
664 # END initialize anchor
666 while start < lr:
667 token = rev[start]
669 if token == "@":
670 if start + 1 >= lr or rev[start + 1] != "{":
671 raise ValueError("Invalid @ token in revision specifier: %s" % rev)
672 # END handle invalid @
673 end = _find_closing_brace(rev, start + 1)
674 obj = _apply_reflog(repo, ref if first_token != 0 and start == first_token else None, rev[start + 2 : end])
675 ref = None
676 start = end + 1
677 continue
678 # END handle reflog
680 if token == ":":
681 return _tree_lookup(obj, rev[start + 1 :])
682 # END handle path
684 start += 1
686 if token == "^" and start < lr and rev[start] == "{":
687 end = _find_closing_brace(rev, start)
688 obj = _peel(obj, rev[start + 1 : end], repo, rev)
689 ref = None
690 start = end + 1
691 continue
692 # END parse type
694 num = 0
695 found_digit = False
696 while start < lr:
697 if rev[start] in digits:
698 num = num * 10 + int(rev[start])
699 start += 1
700 found_digit = True
701 else:
702 break
703 # END handle number
704 # END number parse loop
706 if not found_digit:
707 num = 1
708 # END set default num
710 try:
711 if token == "~":
712 obj = to_commit(obj)
713 for _ in range(num):
714 obj = obj.parents[0]
715 # END for each history item to walk
716 elif token == "^":
717 obj = to_commit(obj)
718 if num == 0:
719 pass
720 else:
721 obj = obj.parents[num - 1]
722 # END handle parent
723 else:
724 raise ValueError("Invalid token: %r" % token)
725 # END end handle tag
726 except (IndexError, AttributeError) as e:
727 raise BadName(
728 f"Invalid revision spec '{rev}' - not enough parent commits to reach '{token}{int(num)}'"
729 ) from e
730 # END exception handling
731 # END parse loop
733 if obj is None:
734 raise ValueError("Revision specifier could not be parsed: %s" % rev)
736 return obj