Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# repo.py -- For dealing with git repositories.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
24"""Repository access.
26This module contains the base class for git repositories
27(BaseRepo) and an implementation which uses a repository on
28local disk (Repo).
30"""
32__all__ = [
33 "BASE_DIRECTORIES",
34 "COMMONDIR",
35 "CONTROLDIR",
36 "DEFAULT_BRANCH",
37 "DEFAULT_OFS_DELTA",
38 "GITDIR",
39 "INDEX_FILENAME",
40 "OBJECTDIR",
41 "REFSDIR",
42 "REFSDIR_HEADS",
43 "REFSDIR_TAGS",
44 "WORKTREES",
45 "BaseRepo",
46 "DefaultIdentityNotFound",
47 "InvalidUserIdentity",
48 "MemoryRepo",
49 "ParentsProvider",
50 "Repo",
51 "UnsupportedExtension",
52 "UnsupportedVersion",
53 "check_user_identity",
54 "get_user_identity",
55 "parse_graftpoints",
56 "parse_shared_repository",
57 "read_gitfile",
58 "serialize_graftpoints",
59]
61import os
62import stat
63import sys
64import time
65import warnings
66from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Sequence
67from io import BytesIO
68from types import TracebackType
69from typing import (
70 TYPE_CHECKING,
71 Any,
72 BinaryIO,
73 TypeVar,
74)
76if TYPE_CHECKING:
77 # There are no circular imports here, but we try to defer imports as long
78 # as possible to reduce start-up time for anything that doesn't need
79 # these imports.
80 from .attrs import GitAttributes
81 from .config import ConditionMatcher, ConfigFile, StackedConfig
82 from .diff_tree import RenameDetector
83 from .filters import FilterBlobNormalizer, FilterContext
84 from .index import Index
85 from .notes import Notes
86 from .object_store import BaseObjectStore, GraphWalker
87 from .pack import UnpackedObject
88 from .rebase import RebaseStateManager
89 from .walk import Walker
90 from .worktree import WorkTree
92from . import reflog, replace_me
93from .errors import (
94 NoIndexPresent,
95 NotBlobError,
96 NotCommitError,
97 NotGitRepository,
98 NotTagError,
99 NotTreeError,
100 RefFormatError,
101)
102from .file import GitFile
103from .hooks import (
104 CommitMsgShellHook,
105 Hook,
106 PostCommitShellHook,
107 PostReceiveShellHook,
108 PreCommitShellHook,
109)
110from .object_store import (
111 DiskObjectStore,
112 MemoryObjectStore,
113 MissingObjectFinder,
114 ObjectStoreGraphWalker,
115 PackBasedObjectStore,
116 PackCapableObjectStore,
117 find_shallow,
118 peel_sha,
119)
120from .objects import (
121 Blob,
122 Commit,
123 ObjectID,
124 RawObjectID,
125 ShaFile,
126 Tag,
127 Tree,
128 check_hexsha,
129 valid_hexsha,
130)
131from .pack import generate_unpacked_objects
132from .refs import (
133 HEADREF,
134 LOCAL_TAG_PREFIX, # noqa: F401
135 SYMREF, # noqa: F401
136 DictRefsContainer,
137 DiskRefsContainer,
138 Ref,
139 RefsContainer,
140 _set_default_branch,
141 _set_head,
142 _set_origin_head,
143 check_ref_format, # noqa: F401
144 extract_branch_name,
145 is_per_worktree_ref,
146 local_branch_name,
147 read_packed_refs, # noqa: F401
148 read_packed_refs_with_peeled, # noqa: F401
149 write_packed_refs, # noqa: F401
150)
152CONTROLDIR = ".git"
153OBJECTDIR = "objects"
154DEFAULT_OFS_DELTA = True
156T = TypeVar("T", bound="ShaFile")
157REFSDIR = "refs"
158REFSDIR_TAGS = "tags"
159REFSDIR_HEADS = "heads"
160INDEX_FILENAME = "index"
161COMMONDIR = "commondir"
162GITDIR = "gitdir"
163WORKTREES = "worktrees"
165BASE_DIRECTORIES = [
166 ["branches"],
167 [REFSDIR],
168 [REFSDIR, REFSDIR_TAGS],
169 [REFSDIR, REFSDIR_HEADS],
170 ["hooks"],
171 ["info"],
172]
174DEFAULT_BRANCH = b"master"
177class InvalidUserIdentity(Exception):
178 """User identity is not of the format 'user <email>'."""
180 def __init__(self, identity: str) -> None:
181 """Initialize InvalidUserIdentity exception."""
182 self.identity = identity
185class DefaultIdentityNotFound(Exception):
186 """Default identity could not be determined."""
189# TODO(jelmer): Cache?
190def _get_default_identity() -> tuple[str, str]:
191 import socket
193 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
194 username = os.environ.get(name)
195 if username:
196 break
197 else:
198 username = None
200 try:
201 import pwd
202 except ImportError:
203 fullname = None
204 else:
205 try:
206 entry = pwd.getpwuid(os.getuid()) # type: ignore[attr-defined,unused-ignore]
207 except KeyError:
208 fullname = None
209 else:
210 if getattr(entry, "gecos", None):
211 fullname = entry.pw_gecos.split(",")[0]
212 else:
213 fullname = None
214 if username is None:
215 username = entry.pw_name
216 if not fullname:
217 if username is None:
218 raise DefaultIdentityNotFound("no username found")
219 fullname = username
220 email = os.environ.get("EMAIL")
221 if email is None:
222 if username is None:
223 raise DefaultIdentityNotFound("no username found")
224 email = f"{username}@{socket.gethostname()}"
225 return (fullname, email)
228def get_user_identity(config: "StackedConfig", kind: str | None = None) -> bytes:
229 """Determine the identity to use for new commits.
231 If kind is set, this first checks
232 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL.
234 If those variables are not set, then it will fall back
235 to reading the user.name and user.email settings from
236 the specified configuration.
238 If that also fails, then it will fall back to using
239 the current users' identity as obtained from the host
240 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f).
242 Args:
243 config: Configuration stack to read from
244 kind: Optional kind to return identity for,
245 usually either "AUTHOR" or "COMMITTER".
247 Returns:
248 A user identity
249 """
250 user: bytes | None = None
251 email: bytes | None = None
252 if kind:
253 user_uc = os.environ.get("GIT_" + kind + "_NAME")
254 if user_uc is not None:
255 user = user_uc.encode("utf-8")
256 email_uc = os.environ.get("GIT_" + kind + "_EMAIL")
257 if email_uc is not None:
258 email = email_uc.encode("utf-8")
259 if user is None:
260 try:
261 user = config.get(("user",), "name")
262 except KeyError:
263 user = None
264 if email is None:
265 try:
266 email = config.get(("user",), "email")
267 except KeyError:
268 email = None
269 default_user, default_email = _get_default_identity()
270 if user is None:
271 user = default_user.encode("utf-8")
272 if email is None:
273 email = default_email.encode("utf-8")
274 if email.startswith(b"<") and email.endswith(b">"):
275 email = email[1:-1]
276 return user + b" <" + email + b">"
279def check_user_identity(identity: bytes) -> None:
280 """Verify that a user identity is formatted correctly.
282 Args:
283 identity: User identity bytestring
284 Raises:
285 InvalidUserIdentity: Raised when identity is invalid
286 """
287 try:
288 _fst, snd = identity.split(b" <", 1)
289 except ValueError as exc:
290 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc
291 if b">" not in snd:
292 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
293 if b"\0" in identity or b"\n" in identity:
294 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
297def parse_graftpoints(
298 graftpoints: Iterable[bytes],
299) -> dict[ObjectID, list[ObjectID]]:
300 """Convert a list of graftpoints into a dict.
302 Args:
303 graftpoints: Iterator of graftpoint lines
305 Each line is formatted as:
306 <commit sha1> <parent sha1> [<parent sha1>]*
308 Resulting dictionary is:
309 <commit sha1>: [<parent sha1>*]
311 https://git.wiki.kernel.org/index.php/GraftPoint
312 """
313 grafts: dict[ObjectID, list[ObjectID]] = {}
314 for line in graftpoints:
315 raw_graft = line.split(None, 1)
317 commit = ObjectID(raw_graft[0])
318 if len(raw_graft) == 2:
319 parents = [ObjectID(p) for p in raw_graft[1].split()]
320 else:
321 parents = []
323 for sha in [commit, *parents]:
324 check_hexsha(sha, "Invalid graftpoint")
326 grafts[commit] = parents
327 return grafts
330def serialize_graftpoints(graftpoints: Mapping[ObjectID, Sequence[ObjectID]]) -> bytes:
331 """Convert a dictionary of grafts into string.
333 The graft dictionary is:
334 <commit sha1>: [<parent sha1>*]
336 Each line is formatted as:
337 <commit sha1> <parent sha1> [<parent sha1>]*
339 https://git.wiki.kernel.org/index.php/GraftPoint
341 """
342 graft_lines = []
343 for commit, parents in graftpoints.items():
344 if parents:
345 graft_lines.append(commit + b" " + b" ".join(parents))
346 else:
347 graft_lines.append(commit)
348 return b"\n".join(graft_lines)
351def _set_filesystem_hidden(path: str) -> None:
352 """Mark path as to be hidden if supported by platform and filesystem.
354 On win32 uses SetFileAttributesW api:
355 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw>
356 """
357 if sys.platform == "win32":
358 import ctypes
359 from ctypes.wintypes import BOOL, DWORD, LPCWSTR
361 FILE_ATTRIBUTE_HIDDEN = 2
362 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)(
363 ("SetFileAttributesW", ctypes.windll.kernel32)
364 )
366 if isinstance(path, bytes):
367 path = os.fsdecode(path)
368 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN):
369 pass # Could raise or log `ctypes.WinError()` here
371 # Could implement other platform specific filesystem hiding here
374def parse_shared_repository(
375 value: str | bytes | bool,
376) -> tuple[int | None, int | None]:
377 """Parse core.sharedRepository configuration value.
379 Args:
380 value: Configuration value (string, bytes, or boolean)
382 Returns:
383 tuple of (file_mask, directory_mask) or (None, None) if not shared
385 The masks are permission bits to apply via chmod.
386 """
387 if isinstance(value, bytes):
388 value = value.decode("utf-8", errors="replace")
390 # Handle boolean values
391 if isinstance(value, bool):
392 if value:
393 # true = group (same as "group")
394 return (0o664, 0o2775)
395 else:
396 # false = umask (use system umask, no adjustment)
397 return (None, None)
399 # Handle string values
400 value_lower = value.lower()
402 if value_lower in ("false", "0", ""):
403 # Use umask (no adjustment)
404 return (None, None)
406 if value_lower in ("true", "1", "group"):
407 # Group writable (with setgid bit)
408 return (0o664, 0o2775)
410 if value_lower in ("all", "world", "everybody", "2"):
411 # World readable/writable (with setgid bit)
412 return (0o666, 0o2777)
414 if value_lower == "umask":
415 # Explicitly use umask
416 return (None, None)
418 # Try to parse as octal
419 if value.startswith("0"):
420 try:
421 mode = int(value, 8)
422 # For directories, add execute bits where read bits are set
423 # and add setgid bit for shared repositories
424 dir_mode = mode | 0o2000 # Add setgid bit
425 if mode & 0o004:
426 dir_mode |= 0o001
427 if mode & 0o040:
428 dir_mode |= 0o010
429 if mode & 0o400:
430 dir_mode |= 0o100
431 return (mode, dir_mode)
432 except ValueError:
433 pass
435 # Default to umask for unrecognized values
436 return (None, None)
439class ParentsProvider:
440 """Provider for commit parent information."""
442 def __init__(
443 self,
444 store: "BaseObjectStore",
445 grafts: dict[ObjectID, list[ObjectID]] = {},
446 shallows: Iterable[ObjectID] = [],
447 ) -> None:
448 """Initialize ParentsProvider.
450 Args:
451 store: Object store to use
452 grafts: Graft information
453 shallows: Shallow commit SHAs
454 """
455 self.store = store
456 self.grafts = grafts
457 self.shallows = set(shallows)
459 # Get commit graph once at initialization for performance
460 self.commit_graph = store.get_commit_graph()
462 def get_parents(
463 self, commit_id: ObjectID, commit: Commit | None = None
464 ) -> list[ObjectID]:
465 """Get parents for a commit using the parents provider."""
466 try:
467 return self.grafts[commit_id]
468 except KeyError:
469 pass
470 if commit_id in self.shallows:
471 return []
473 # Try to use commit graph for faster parent lookup
474 if self.commit_graph:
475 parents = self.commit_graph.get_parents(commit_id)
476 if parents is not None:
477 return parents
479 # Fallback to reading the commit object
480 if commit is None:
481 obj = self.store[commit_id]
482 assert isinstance(obj, Commit)
483 commit = obj
484 result: list[ObjectID] = commit.parents
485 return result
488class BaseRepo:
489 """Base class for a git repository.
491 This base class is meant to be used for Repository implementations that e.g.
492 work on top of a different transport than a standard filesystem path.
494 Attributes:
495 object_store: Dictionary-like object for accessing
496 the objects
497 refs: Dictionary-like object with the refs in this
498 repository
499 """
501 def __init__(
502 self, object_store: "PackCapableObjectStore", refs: RefsContainer
503 ) -> None:
504 """Open a repository.
506 This shouldn't be called directly, but rather through one of the
507 base classes, such as MemoryRepo or Repo.
509 Args:
510 object_store: Object store to use
511 refs: Refs container to use
512 """
513 self.object_store = object_store
514 self.refs = refs
516 self._graftpoints: dict[ObjectID, list[ObjectID]] = {}
517 self.hooks: dict[str, Hook] = {}
519 def _determine_file_mode(self) -> bool:
520 """Probe the file-system to determine whether permissions can be trusted.
522 Returns: True if permissions can be trusted, False otherwise.
523 """
524 raise NotImplementedError(self._determine_file_mode)
526 def _determine_symlinks(self) -> bool:
527 """Probe the filesystem to determine whether symlinks can be created.
529 Returns: True if symlinks can be created, False otherwise.
530 """
531 # For now, just mimic the old behaviour
532 return sys.platform != "win32"
534 def _init_files(
535 self,
536 bare: bool,
537 symlinks: bool | None = None,
538 format: int | None = None,
539 shared_repository: str | bool | None = None,
540 ) -> None:
541 """Initialize a default set of named files."""
542 from .config import ConfigFile
544 self._put_named_file("description", b"Unnamed repository")
545 f = BytesIO()
546 cf = ConfigFile()
547 if format is None:
548 format = 0
549 if format not in (0, 1):
550 raise ValueError(f"Unsupported repository format version: {format}")
551 cf.set("core", "repositoryformatversion", str(format))
552 if self._determine_file_mode():
553 cf.set("core", "filemode", True)
554 else:
555 cf.set("core", "filemode", False)
557 if symlinks is None and not bare:
558 symlinks = self._determine_symlinks()
560 if symlinks is False:
561 cf.set("core", "symlinks", symlinks)
563 cf.set("core", "bare", bare)
564 cf.set("core", "logallrefupdates", True)
566 # Set shared repository if specified
567 if shared_repository is not None:
568 if isinstance(shared_repository, bool):
569 cf.set("core", "sharedRepository", shared_repository)
570 else:
571 cf.set("core", "sharedRepository", shared_repository)
573 cf.write_to_file(f)
574 self._put_named_file("config", f.getvalue())
575 self._put_named_file(os.path.join("info", "exclude"), b"")
577 def get_named_file(self, path: str) -> BinaryIO | None:
578 """Get a file from the control dir with a specific name.
580 Although the filename should be interpreted as a filename relative to
581 the control dir in a disk-based Repo, the object returned need not be
582 pointing to a file in that location.
584 Args:
585 path: The path to the file, relative to the control dir.
586 Returns: An open file object, or None if the file does not exist.
587 """
588 raise NotImplementedError(self.get_named_file)
590 def _put_named_file(self, path: str, contents: bytes) -> None:
591 """Write a file to the control dir with the given name and contents.
593 Args:
594 path: The path to the file, relative to the control dir.
595 contents: A string to write to the file.
596 """
597 raise NotImplementedError(self._put_named_file)
599 def _del_named_file(self, path: str) -> None:
600 """Delete a file in the control directory with the given name."""
601 raise NotImplementedError(self._del_named_file)
603 def open_index(self) -> "Index":
604 """Open the index for this repository.
606 Raises:
607 NoIndexPresent: If no index is present
608 Returns: The matching `Index`
609 """
610 raise NotImplementedError(self.open_index)
612 def fetch(
613 self,
614 target: "BaseRepo",
615 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]]
616 | None = None,
617 progress: Callable[..., None] | None = None,
618 depth: int | None = None,
619 ) -> dict[Ref, ObjectID]:
620 """Fetch objects into another repository.
622 Args:
623 target: The target repository
624 determine_wants: Optional function to determine what refs to
625 fetch.
626 progress: Optional progress function
627 depth: Optional shallow fetch depth
628 Returns: The local refs
629 """
630 if determine_wants is None:
631 determine_wants = target.object_store.determine_wants_all
632 count, pack_data = self.fetch_pack_data(
633 determine_wants,
634 target.get_graph_walker(),
635 progress=progress,
636 depth=depth,
637 )
638 target.object_store.add_pack_data(count, pack_data, progress)
639 return self.get_refs()
641 def fetch_pack_data(
642 self,
643 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]],
644 graph_walker: "GraphWalker",
645 progress: Callable[[bytes], None] | None,
646 *,
647 get_tagged: Callable[[], dict[ObjectID, ObjectID]] | None = None,
648 depth: int | None = None,
649 ) -> tuple[int, Iterator["UnpackedObject"]]:
650 """Fetch the pack data required for a set of revisions.
652 Args:
653 determine_wants: Function that takes a dictionary with heads
654 and returns the list of heads to fetch.
655 graph_walker: Object that can iterate over the list of revisions
656 to fetch and has an "ack" method that will be called to acknowledge
657 that a revision is present.
658 progress: Simple progress function that will be called with
659 updated progress strings.
660 get_tagged: Function that returns a dict of pointed-to sha ->
661 tag sha for including tags.
662 depth: Shallow fetch depth
663 Returns: count and iterator over pack data
664 """
665 missing_objects = self.find_missing_objects(
666 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth
667 )
668 if missing_objects is None:
669 return 0, iter([])
670 remote_has = missing_objects.get_remote_has()
671 object_ids = list(missing_objects)
672 return len(object_ids), generate_unpacked_objects(
673 self.object_store, object_ids, progress=progress, other_haves=remote_has
674 )
676 def find_missing_objects(
677 self,
678 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]],
679 graph_walker: "GraphWalker",
680 progress: Callable[[bytes], None] | None,
681 *,
682 get_tagged: Callable[[], dict[ObjectID, ObjectID]] | None = None,
683 depth: int | None = None,
684 ) -> MissingObjectFinder | None:
685 """Fetch the missing objects required for a set of revisions.
687 Args:
688 determine_wants: Function that takes a dictionary with heads
689 and returns the list of heads to fetch.
690 graph_walker: Object that can iterate over the list of revisions
691 to fetch and has an "ack" method that will be called to acknowledge
692 that a revision is present.
693 progress: Simple progress function that will be called with
694 updated progress strings.
695 get_tagged: Function that returns a dict of pointed-to sha ->
696 tag sha for including tags.
697 depth: Shallow fetch depth
698 Returns: iterator over objects, with __len__ implemented
699 """
700 import logging
702 # Filter out refs pointing to missing objects to avoid errors downstream.
703 # This makes Dulwich more robust when dealing with broken refs on disk.
704 # Previously serialize_refs() did this filtering as a side-effect.
705 all_refs = self.get_refs()
706 refs: dict[Ref, ObjectID] = {}
707 for ref, sha in all_refs.items():
708 if sha in self.object_store:
709 refs[ref] = sha
710 else:
711 logging.warning(
712 "ref %s points at non-present sha %s",
713 ref.decode("utf-8", "replace"),
714 sha.decode("ascii"),
715 )
717 wants = determine_wants(refs, depth)
718 if not isinstance(wants, list):
719 raise TypeError("determine_wants() did not return a list")
721 current_shallow = set(getattr(graph_walker, "shallow", set()))
723 if depth not in (None, 0):
724 assert depth is not None
725 shallow, not_shallow = find_shallow(self.object_store, wants, depth)
726 # Only update if graph_walker has shallow attribute
727 if hasattr(graph_walker, "shallow"):
728 graph_walker.shallow.update(shallow - not_shallow)
729 new_shallow = graph_walker.shallow - current_shallow
730 unshallow = not_shallow & current_shallow
731 setattr(graph_walker, "unshallow", unshallow)
732 if hasattr(graph_walker, "update_shallow"):
733 graph_walker.update_shallow(new_shallow, unshallow)
734 else:
735 unshallow = getattr(graph_walker, "unshallow", set())
737 if wants == []:
738 # TODO(dborowitz): find a way to short-circuit that doesn't change
739 # this interface.
741 if getattr(graph_walker, "shallow", set()) or unshallow:
742 # Do not send a pack in shallow short-circuit path
743 return None
745 # Return an actual MissingObjectFinder with empty wants
746 return MissingObjectFinder(
747 self.object_store,
748 haves=[],
749 wants=[],
750 )
752 # If the graph walker is set up with an implementation that can
753 # ACK/NAK to the wire, it will write data to the client through
754 # this call as a side-effect.
755 haves = self.object_store.find_common_revisions(graph_walker)
757 # Deal with shallow requests separately because the haves do
758 # not reflect what objects are missing
759 if getattr(graph_walker, "shallow", set()) or unshallow:
760 # TODO: filter the haves commits from iter_shas. the specific
761 # commits aren't missing.
762 haves = []
764 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow)
766 def get_parents(commit: Commit) -> list[ObjectID]:
767 """Get parents for a commit using the parents provider.
769 Args:
770 commit: Commit object
772 Returns:
773 List of parent commit SHAs
774 """
775 return parents_provider.get_parents(commit.id, commit)
777 return MissingObjectFinder(
778 self.object_store,
779 haves=haves,
780 wants=wants,
781 shallow=getattr(graph_walker, "shallow", set()),
782 progress=progress,
783 get_tagged=get_tagged,
784 get_parents=get_parents,
785 )
787 def generate_pack_data(
788 self,
789 have: set[ObjectID],
790 want: set[ObjectID],
791 *,
792 shallow: set[ObjectID] | None = None,
793 progress: Callable[[str], None] | None = None,
794 ofs_delta: bool | None = None,
795 ) -> tuple[int, Iterator["UnpackedObject"]]:
796 """Generate pack data objects for a set of wants/haves.
798 Args:
799 have: List of SHA1s of objects that should not be sent
800 want: List of SHA1s of objects that should be sent
801 shallow: Set of shallow commit SHA1s to skip (defaults to repo's shallow commits)
802 ofs_delta: Whether OFS deltas can be included
803 progress: Optional progress reporting method
804 """
805 if shallow is None:
806 shallow = self.get_shallow()
807 return self.object_store.generate_pack_data(
808 have,
809 want,
810 shallow=shallow,
811 progress=progress,
812 ofs_delta=ofs_delta if ofs_delta is not None else DEFAULT_OFS_DELTA,
813 )
815 def get_graph_walker(
816 self, heads: list[ObjectID] | None = None
817 ) -> ObjectStoreGraphWalker:
818 """Retrieve a graph walker.
820 A graph walker is used by a remote repository (or proxy)
821 to find out which objects are present in this repository.
823 Args:
824 heads: Repository heads to use (optional)
825 Returns: A graph walker object
826 """
827 if heads is None:
828 heads = [
829 sha
830 for sha in self.refs.as_dict(Ref(b"refs/heads")).values()
831 if sha in self.object_store
832 ]
833 parents_provider = ParentsProvider(self.object_store)
834 return ObjectStoreGraphWalker(
835 heads,
836 parents_provider.get_parents,
837 shallow=self.get_shallow(),
838 update_shallow=self.update_shallow,
839 )
841 def get_refs(self) -> dict[Ref, ObjectID]:
842 """Get dictionary with all refs.
844 Returns: A ``dict`` mapping ref names to SHA1s
845 """
846 return self.refs.as_dict()
848 def head(self) -> ObjectID:
849 """Return the SHA1 pointed at by HEAD."""
850 # TODO: move this method to WorkTree
851 return self.refs[HEADREF]
853 def _get_object(self, sha: bytes, cls: type[T]) -> T:
854 assert len(sha) in (20, 40)
855 obj_id = ObjectID(sha) if len(sha) == 40 else RawObjectID(sha)
856 ret = self.get_object(obj_id)
857 if not isinstance(ret, cls):
858 if cls is Commit:
859 raise NotCommitError(ret.id)
860 elif cls is Blob:
861 raise NotBlobError(ret.id)
862 elif cls is Tree:
863 raise NotTreeError(ret.id)
864 elif cls is Tag:
865 raise NotTagError(ret.id)
866 else:
867 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}")
868 return ret
870 def get_object(self, sha: ObjectID | RawObjectID) -> ShaFile:
871 """Retrieve the object with the specified SHA.
873 Args:
874 sha: SHA to retrieve
875 Returns: A ShaFile object
876 Raises:
877 KeyError: when the object can not be found
878 """
879 return self.object_store[sha]
881 def parents_provider(self) -> ParentsProvider:
882 """Get a parents provider for this repository.
884 Returns:
885 ParentsProvider instance configured with grafts and shallows
886 """
887 return ParentsProvider(
888 self.object_store,
889 grafts=self._graftpoints,
890 shallows=self.get_shallow(),
891 )
893 def get_parents(
894 self, sha: ObjectID, commit: Commit | None = None
895 ) -> list[ObjectID]:
896 """Retrieve the parents of a specific commit.
898 If the specific commit is a graftpoint, the graft parents
899 will be returned instead.
901 Args:
902 sha: SHA of the commit for which to retrieve the parents
903 commit: Optional commit matching the sha
904 Returns: List of parents
905 """
906 return self.parents_provider().get_parents(sha, commit)
908 def get_config(self) -> "ConfigFile":
909 """Retrieve the config object.
911 Returns: `ConfigFile` object for the ``.git/config`` file.
912 """
913 raise NotImplementedError(self.get_config)
915 def get_worktree_config(self) -> "ConfigFile":
916 """Retrieve the worktree config object."""
917 raise NotImplementedError(self.get_worktree_config)
919 def get_description(self) -> bytes | None:
920 """Retrieve the description for this repository.
922 Returns: Bytes with the description of the repository
923 as set by the user.
924 """
925 raise NotImplementedError(self.get_description)
927 def set_description(self, description: bytes) -> None:
928 """Set the description for this repository.
930 Args:
931 description: Text to set as description for this repository.
932 """
933 raise NotImplementedError(self.set_description)
935 def get_rebase_state_manager(self) -> "RebaseStateManager":
936 """Get the appropriate rebase state manager for this repository.
938 Returns: RebaseStateManager instance
939 """
940 raise NotImplementedError(self.get_rebase_state_manager)
942 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
943 """Return a BlobNormalizer object for checkin/checkout operations.
945 Returns: BlobNormalizer instance
946 """
947 raise NotImplementedError(self.get_blob_normalizer)
949 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
950 """Read gitattributes for the repository.
952 Args:
953 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
955 Returns:
956 GitAttributes object that can be used to match paths
957 """
958 raise NotImplementedError(self.get_gitattributes)
960 def get_config_stack(self) -> "StackedConfig":
961 """Return a config stack for this repository.
963 This stack accesses the configuration for both this repository
964 itself (.git/config) and the global configuration, which usually
965 lives in ~/.gitconfig.
967 Returns: `Config` instance for this repository
968 """
969 from .config import ConfigFile, StackedConfig
971 local_config = self.get_config()
972 backends: list[ConfigFile] = [local_config]
973 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
974 backends.append(self.get_worktree_config())
976 backends += StackedConfig.default_backends()
977 return StackedConfig(backends, writable=local_config)
979 def get_shallow(self) -> set[ObjectID]:
980 """Get the set of shallow commits.
982 Returns: Set of shallow commits.
983 """
984 f = self.get_named_file("shallow")
985 if f is None:
986 return set()
987 with f:
988 return {ObjectID(line.strip()) for line in f}
990 def update_shallow(
991 self, new_shallow: set[ObjectID] | None, new_unshallow: set[ObjectID] | None
992 ) -> None:
993 """Update the list of shallow objects.
995 Args:
996 new_shallow: Newly shallow objects
997 new_unshallow: Newly no longer shallow objects
998 """
999 shallow = self.get_shallow()
1000 if new_shallow:
1001 shallow.update(new_shallow)
1002 if new_unshallow:
1003 shallow.difference_update(new_unshallow)
1004 if shallow:
1005 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow]))
1006 else:
1007 self._del_named_file("shallow")
1009 def get_peeled(self, ref: Ref) -> ObjectID:
1010 """Get the peeled value of a ref.
1012 Args:
1013 ref: The refname to peel.
1014 Returns: The fully-peeled SHA1 of a tag object, after peeling all
1015 intermediate tags; if the original ref does not point to a tag,
1016 this will equal the original SHA1.
1017 """
1018 cached = self.refs.get_peeled(ref)
1019 if cached is not None:
1020 return cached
1021 return peel_sha(self.object_store, self.refs[ref])[1].id
1023 @property
1024 def notes(self) -> "Notes":
1025 """Access notes functionality for this repository.
1027 Returns:
1028 Notes object for accessing notes
1029 """
1030 from .notes import Notes
1032 return Notes(self.object_store, self.refs)
1034 def get_walker(
1035 self,
1036 include: Sequence[ObjectID] | None = None,
1037 exclude: Sequence[ObjectID] | None = None,
1038 order: str = "date",
1039 reverse: bool = False,
1040 max_entries: int | None = None,
1041 paths: Sequence[bytes] | None = None,
1042 rename_detector: "RenameDetector | None" = None,
1043 follow: bool = False,
1044 since: int | None = None,
1045 until: int | None = None,
1046 queue_cls: type | None = None,
1047 ) -> "Walker":
1048 """Obtain a walker for this repository.
1050 Args:
1051 include: Iterable of SHAs of commits to include along with their
1052 ancestors. Defaults to [HEAD]
1053 exclude: Iterable of SHAs of commits to exclude along with their
1054 ancestors, overriding includes.
1055 order: ORDER_* constant specifying the order of results.
1056 Anything other than ORDER_DATE may result in O(n) memory usage.
1057 reverse: If True, reverse the order of output, requiring O(n)
1058 memory.
1059 max_entries: The maximum number of entries to yield, or None for
1060 no limit.
1061 paths: Iterable of file or subtree paths to show entries for.
1062 rename_detector: diff.RenameDetector object for detecting
1063 renames.
1064 follow: If True, follow path across renames/copies. Forces a
1065 default rename_detector.
1066 since: Timestamp to list commits after.
1067 until: Timestamp to list commits before.
1068 queue_cls: A class to use for a queue of commits, supporting the
1069 iterator protocol. The constructor takes a single argument, the Walker.
1071 Returns: A `Walker` object
1072 """
1073 from .walk import Walker, _CommitTimeQueue
1075 if include is None:
1076 include = [self.head()]
1078 # Pass all arguments to Walker explicitly to avoid type issues with **kwargs
1079 return Walker(
1080 self.object_store,
1081 include,
1082 exclude=exclude,
1083 order=order,
1084 reverse=reverse,
1085 max_entries=max_entries,
1086 paths=paths,
1087 rename_detector=rename_detector,
1088 follow=follow,
1089 since=since,
1090 until=until,
1091 get_parents=lambda commit: self.get_parents(commit.id, commit),
1092 queue_cls=queue_cls if queue_cls is not None else _CommitTimeQueue,
1093 )
1095 def __getitem__(self, name: ObjectID | Ref | bytes) -> "ShaFile":
1096 """Retrieve a Git object by SHA1 or ref.
1098 Args:
1099 name: A Git object SHA1 or a ref name
1100 Returns: A `ShaFile` object, such as a Commit or Blob
1101 Raises:
1102 KeyError: when the specified ref or object does not exist
1103 """
1104 if not isinstance(name, bytes):
1105 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}")
1106 if len(name) in (20, 40):
1107 try:
1108 # Try as ObjectID/RawObjectID
1109 return self.object_store[
1110 ObjectID(name) if len(name) == 40 else RawObjectID(name)
1111 ]
1112 except (KeyError, ValueError):
1113 pass
1114 try:
1115 return self.object_store[self.refs[Ref(name)]]
1116 except RefFormatError as exc:
1117 raise KeyError(name) from exc
1119 def __contains__(self, name: bytes) -> bool:
1120 """Check if a specific Git object or ref is present.
1122 Args:
1123 name: Git object SHA1 or ref name
1124 """
1125 if len(name) == 20:
1126 return RawObjectID(name) in self.object_store or Ref(name) in self.refs
1127 elif len(name) == 40 and valid_hexsha(name):
1128 return ObjectID(name) in self.object_store or Ref(name) in self.refs
1129 else:
1130 return Ref(name) in self.refs
1132 def __setitem__(self, name: bytes, value: ShaFile | bytes) -> None:
1133 """Set a ref.
1135 Args:
1136 name: ref name
1137 value: Ref value - either a ShaFile object, or a hex sha
1138 """
1139 if name.startswith(b"refs/") or name == HEADREF:
1140 ref_name = Ref(name)
1141 if isinstance(value, ShaFile):
1142 self.refs[ref_name] = value.id
1143 elif isinstance(value, bytes):
1144 self.refs[ref_name] = ObjectID(value)
1145 else:
1146 raise TypeError(value)
1147 else:
1148 raise ValueError(name)
1150 def __delitem__(self, name: bytes) -> None:
1151 """Remove a ref.
1153 Args:
1154 name: Name of the ref to remove
1155 """
1156 if name.startswith(b"refs/") or name == HEADREF:
1157 del self.refs[Ref(name)]
1158 else:
1159 raise ValueError(name)
1161 def _get_user_identity(
1162 self, config: "StackedConfig", kind: str | None = None
1163 ) -> bytes:
1164 """Determine the identity to use for new commits."""
1165 warnings.warn(
1166 "use get_user_identity() rather than Repo._get_user_identity",
1167 DeprecationWarning,
1168 )
1169 return get_user_identity(config)
1171 def _add_graftpoints(
1172 self, updated_graftpoints: dict[ObjectID, list[ObjectID]]
1173 ) -> None:
1174 """Add or modify graftpoints.
1176 Args:
1177 updated_graftpoints: Dict of commit shas to list of parent shas
1178 """
1179 # Simple validation
1180 for commit, parents in updated_graftpoints.items():
1181 for sha in [commit, *parents]:
1182 check_hexsha(sha, "Invalid graftpoint")
1184 self._graftpoints.update(updated_graftpoints)
1186 def _remove_graftpoints(self, to_remove: Sequence[ObjectID] = ()) -> None:
1187 """Remove graftpoints.
1189 Args:
1190 to_remove: List of commit shas
1191 """
1192 for sha in to_remove:
1193 del self._graftpoints[sha]
1195 def _read_heads(self, name: str) -> list[ObjectID]:
1196 f = self.get_named_file(name)
1197 if f is None:
1198 return []
1199 with f:
1200 return [ObjectID(line.strip()) for line in f.readlines() if line.strip()]
1202 def get_worktree(self) -> "WorkTree":
1203 """Get the working tree for this repository.
1205 Returns:
1206 WorkTree instance for performing working tree operations
1208 Raises:
1209 NotImplementedError: If the repository doesn't support working trees
1210 """
1211 raise NotImplementedError(
1212 "Working tree operations not supported by this repository type"
1213 )
1215 @replace_me(remove_in="0.26.0")
1216 def do_commit(
1217 self,
1218 message: bytes | None = None,
1219 committer: bytes | None = None,
1220 author: bytes | None = None,
1221 commit_timestamp: float | None = None,
1222 commit_timezone: int | None = None,
1223 author_timestamp: float | None = None,
1224 author_timezone: int | None = None,
1225 tree: ObjectID | None = None,
1226 encoding: bytes | None = None,
1227 ref: Ref | None = HEADREF,
1228 merge_heads: list[ObjectID] | None = None,
1229 no_verify: bool = False,
1230 sign: bool = False,
1231 ) -> bytes:
1232 """Create a new commit.
1234 If not specified, committer and author default to
1235 get_user_identity(..., 'COMMITTER')
1236 and get_user_identity(..., 'AUTHOR') respectively.
1238 Args:
1239 message: Commit message (bytes or callable that takes (repo, commit)
1240 and returns bytes)
1241 committer: Committer fullname
1242 author: Author fullname
1243 commit_timestamp: Commit timestamp (defaults to now)
1244 commit_timezone: Commit timestamp timezone (defaults to GMT)
1245 author_timestamp: Author timestamp (defaults to commit
1246 timestamp)
1247 author_timezone: Author timestamp timezone
1248 (defaults to commit timestamp timezone)
1249 tree: SHA1 of the tree root to use (if not specified the
1250 current index will be committed).
1251 encoding: Encoding
1252 ref: Optional ref to commit to (defaults to current branch).
1253 If None, creates a dangling commit without updating any ref.
1254 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
1255 no_verify: Skip pre-commit and commit-msg hooks
1256 sign: GPG Sign the commit (bool, defaults to False,
1257 pass True to use default GPG key,
1258 pass a str containing Key ID to use a specific GPG key)
1260 Returns:
1261 New commit SHA1
1262 """
1263 return self.get_worktree().commit(
1264 message=message,
1265 committer=committer,
1266 author=author,
1267 commit_timestamp=commit_timestamp,
1268 commit_timezone=commit_timezone,
1269 author_timestamp=author_timestamp,
1270 author_timezone=author_timezone,
1271 tree=tree,
1272 encoding=encoding,
1273 ref=ref,
1274 merge_heads=merge_heads,
1275 no_verify=no_verify,
1276 sign=sign,
1277 )
1280def read_gitfile(f: BinaryIO) -> str:
1281 """Read a ``.git`` file.
1283 The first line of the file should start with "gitdir: "
1285 Args:
1286 f: File-like object to read from
1287 Returns: A path
1288 """
1289 cs = f.read()
1290 if not cs.startswith(b"gitdir: "):
1291 raise ValueError("Expected file to start with 'gitdir: '")
1292 return cs[len(b"gitdir: ") :].rstrip(b"\r\n").decode("utf-8")
1295class UnsupportedVersion(Exception):
1296 """Unsupported repository version."""
1298 def __init__(self, version: int) -> None:
1299 """Initialize UnsupportedVersion exception.
1301 Args:
1302 version: The unsupported repository version
1303 """
1304 self.version = version
1307class UnsupportedExtension(Exception):
1308 """Unsupported repository extension."""
1310 def __init__(self, extension: str) -> None:
1311 """Initialize UnsupportedExtension exception.
1313 Args:
1314 extension: The unsupported repository extension
1315 """
1316 self.extension = extension
1319class Repo(BaseRepo):
1320 """A git repository backed by local disk.
1322 To open an existing repository, call the constructor with
1323 the path of the repository.
1325 To create a new repository, use the Repo.init class method.
1327 Note that a repository object may hold on to resources such
1328 as file handles for performance reasons; call .close() to free
1329 up those resources.
1331 Attributes:
1332 path: Path to the working copy (if it exists) or repository control
1333 directory (if the repository is bare)
1334 bare: Whether this is a bare repository
1335 """
1337 path: str
1338 bare: bool
1339 object_store: DiskObjectStore
1340 filter_context: "FilterContext | None"
1342 def __init__(
1343 self,
1344 root: str | bytes | os.PathLike[str],
1345 object_store: PackBasedObjectStore | None = None,
1346 bare: bool | None = None,
1347 ) -> None:
1348 """Open a repository on disk.
1350 Args:
1351 root: Path to the repository's root.
1352 object_store: ObjectStore to use; if omitted, we use the
1353 repository's default object store
1354 bare: True if this is a bare repository.
1355 """
1356 root = os.fspath(root)
1357 if isinstance(root, bytes):
1358 root = os.fsdecode(root)
1359 hidden_path = os.path.join(root, CONTROLDIR)
1360 if bare is None:
1361 if os.path.isfile(hidden_path) or os.path.isdir(
1362 os.path.join(hidden_path, OBJECTDIR)
1363 ):
1364 bare = False
1365 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(
1366 os.path.join(root, REFSDIR)
1367 ):
1368 bare = True
1369 else:
1370 raise NotGitRepository(
1371 "No git repository was found at {path}".format(**dict(path=root))
1372 )
1374 self.bare = bare
1375 if bare is False:
1376 if os.path.isfile(hidden_path):
1377 with open(hidden_path, "rb") as f:
1378 path = read_gitfile(f)
1379 self._controldir = os.path.join(root, path)
1380 else:
1381 self._controldir = hidden_path
1382 else:
1383 self._controldir = root
1384 commondir = self.get_named_file(COMMONDIR)
1385 if commondir is not None:
1386 with commondir:
1387 self._commondir = os.path.join(
1388 self.controldir(),
1389 os.fsdecode(commondir.read().rstrip(b"\r\n")),
1390 )
1391 else:
1392 self._commondir = self._controldir
1393 self.path = root
1395 # Initialize refs early so they're available for config condition matchers
1396 self.refs = DiskRefsContainer(
1397 self.commondir(), self._controldir, logger=self._write_reflog
1398 )
1400 # Initialize worktrees container
1401 from .worktree import WorkTreeContainer
1403 self.worktrees = WorkTreeContainer(self)
1405 config = self.get_config()
1406 try:
1407 repository_format_version = config.get("core", "repositoryformatversion")
1408 format_version = (
1409 0
1410 if repository_format_version is None
1411 else int(repository_format_version)
1412 )
1413 except KeyError:
1414 format_version = 0
1416 if format_version not in (0, 1):
1417 raise UnsupportedVersion(format_version)
1419 # Track extensions we encounter
1420 has_reftable_extension = False
1421 for extension, value in config.items((b"extensions",)):
1422 if extension.lower() == b"refstorage":
1423 if value == b"reftable":
1424 has_reftable_extension = True
1425 else:
1426 raise UnsupportedExtension(f"refStorage = {value.decode()}")
1427 elif extension.lower() not in (b"worktreeconfig",):
1428 raise UnsupportedExtension(extension.decode("utf-8"))
1430 if object_store is None:
1431 # Get shared repository permissions from config
1432 try:
1433 shared_value = config.get(("core",), "sharedRepository")
1434 file_mode, dir_mode = parse_shared_repository(shared_value)
1435 except KeyError:
1436 file_mode, dir_mode = None, None
1438 object_store = DiskObjectStore.from_config(
1439 os.path.join(self.commondir(), OBJECTDIR),
1440 config,
1441 file_mode=file_mode,
1442 dir_mode=dir_mode,
1443 )
1445 # Use reftable if extension is configured
1446 if has_reftable_extension:
1447 from .reftable import ReftableRefsContainer
1449 self.refs = ReftableRefsContainer(self.commondir())
1450 # Update worktrees container after refs change
1451 self.worktrees = WorkTreeContainer(self)
1452 BaseRepo.__init__(self, object_store, self.refs)
1454 self._graftpoints = {}
1455 graft_file = self.get_named_file(
1456 os.path.join("info", "grafts"), basedir=self.commondir()
1457 )
1458 if graft_file:
1459 with graft_file:
1460 self._graftpoints.update(parse_graftpoints(graft_file))
1461 graft_file = self.get_named_file("shallow", basedir=self.commondir())
1462 if graft_file:
1463 with graft_file:
1464 self._graftpoints.update(parse_graftpoints(graft_file))
1466 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir())
1467 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir())
1468 self.hooks["post-commit"] = PostCommitShellHook(self.controldir())
1469 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir())
1471 # Initialize filter context as None, will be created lazily
1472 self.filter_context = None
1474 def get_worktree(self) -> "WorkTree":
1475 """Get the working tree for this repository.
1477 Returns:
1478 WorkTree instance for performing working tree operations
1479 """
1480 from .worktree import WorkTree
1482 return WorkTree(self, self.path)
1484 def _write_reflog(
1485 self,
1486 ref: bytes,
1487 old_sha: bytes,
1488 new_sha: bytes,
1489 committer: bytes | None,
1490 timestamp: int | None,
1491 timezone: int | None,
1492 message: bytes,
1493 ) -> None:
1494 from .reflog import format_reflog_line
1496 path = self._reflog_path(ref)
1498 # Get shared repository permissions
1499 file_mode, dir_mode = self._get_shared_repository_permissions()
1501 # Create directory with appropriate permissions
1502 parent_dir = os.path.dirname(path)
1503 # Create directory tree, setting permissions on each level if needed
1504 parts = []
1505 current = parent_dir
1506 while current and not os.path.exists(current):
1507 parts.append(current)
1508 current = os.path.dirname(current)
1509 parts.reverse()
1510 for part in parts:
1511 os.mkdir(part)
1512 if dir_mode is not None:
1513 os.chmod(part, dir_mode)
1514 if committer is None:
1515 config = self.get_config_stack()
1516 committer = get_user_identity(config)
1517 check_user_identity(committer)
1518 if timestamp is None:
1519 timestamp = int(time.time())
1520 if timezone is None:
1521 timezone = 0 # FIXME
1522 with open(path, "ab") as f:
1523 f.write(
1524 format_reflog_line(
1525 old_sha, new_sha, committer, timestamp, timezone, message
1526 )
1527 + b"\n"
1528 )
1530 # Set file permissions (open() respects umask, so we need chmod to set the actual mode)
1531 # Always chmod to ensure correct permissions even if file already existed
1532 if file_mode is not None:
1533 os.chmod(path, file_mode)
1535 def _reflog_path(self, ref: bytes) -> str:
1536 if ref.startswith((b"main-worktree/", b"worktrees/")):
1537 raise NotImplementedError(f"refs {ref.decode()} are not supported")
1539 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir()
1540 return os.path.join(base, "logs", os.fsdecode(ref))
1542 def read_reflog(self, ref: bytes) -> Generator[reflog.Entry, None, None]:
1543 """Read reflog entries for a reference.
1545 Args:
1546 ref: Reference name (e.g. b'HEAD', b'refs/heads/master')
1548 Yields:
1549 reflog.Entry objects in chronological order (oldest first)
1550 """
1551 from .reflog import read_reflog
1553 path = self._reflog_path(ref)
1554 try:
1555 with open(path, "rb") as f:
1556 yield from read_reflog(f)
1557 except FileNotFoundError:
1558 return
1560 @classmethod
1561 def discover(cls, start: str | bytes | os.PathLike[str] = ".") -> "Repo":
1562 """Iterate parent directories to discover a repository.
1564 Return a Repo object for the first parent directory that looks like a
1565 Git repository.
1567 Args:
1568 start: The directory to start discovery from (defaults to '.')
1569 """
1570 path = os.path.abspath(start)
1571 while True:
1572 try:
1573 return cls(path)
1574 except NotGitRepository:
1575 new_path, _tail = os.path.split(path)
1576 if new_path == path: # Root reached
1577 break
1578 path = new_path
1579 start_str = os.fspath(start)
1580 if isinstance(start_str, bytes):
1581 start_str = start_str.decode("utf-8")
1582 raise NotGitRepository(f"No git repository was found at {start_str}")
1584 def controldir(self) -> str:
1585 """Return the path of the control directory."""
1586 return self._controldir
1588 def commondir(self) -> str:
1589 """Return the path of the common directory.
1591 For a main working tree, it is identical to controldir().
1593 For a linked working tree, it is the control directory of the
1594 main working tree.
1595 """
1596 return self._commondir
1598 def _determine_file_mode(self) -> bool:
1599 """Probe the file-system to determine whether permissions can be trusted.
1601 Returns: True if permissions can be trusted, False otherwise.
1602 """
1603 fname = os.path.join(self.path, ".probe-permissions")
1604 with open(fname, "w") as f:
1605 f.write("")
1607 st1 = os.lstat(fname)
1608 try:
1609 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR)
1610 except PermissionError:
1611 return False
1612 st2 = os.lstat(fname)
1614 os.unlink(fname)
1616 mode_differs = st1.st_mode != st2.st_mode
1617 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0
1619 return mode_differs and st2_has_exec
1621 def _determine_symlinks(self) -> bool:
1622 """Probe the filesystem to determine whether symlinks can be created.
1624 Returns: True if symlinks can be created, False otherwise.
1625 """
1626 # TODO(jelmer): Actually probe disk / look at filesystem
1627 return sys.platform != "win32"
1629 def _get_shared_repository_permissions(
1630 self,
1631 ) -> tuple[int | None, int | None]:
1632 """Get shared repository file and directory permissions from config.
1634 Returns:
1635 tuple of (file_mask, directory_mask) or (None, None) if not shared
1636 """
1637 try:
1638 config = self.get_config()
1639 value = config.get(("core",), "sharedRepository")
1640 return parse_shared_repository(value)
1641 except KeyError:
1642 return (None, None)
1644 def _put_named_file(self, path: str, contents: bytes) -> None:
1645 """Write a file to the control dir with the given name and contents.
1647 Args:
1648 path: The path to the file, relative to the control dir.
1649 contents: A string to write to the file.
1650 """
1651 path = path.lstrip(os.path.sep)
1653 # Get shared repository permissions
1654 file_mode, _ = self._get_shared_repository_permissions()
1656 # Create file with appropriate permissions
1657 if file_mode is not None:
1658 with GitFile(
1659 os.path.join(self.controldir(), path), "wb", mask=file_mode
1660 ) as f:
1661 f.write(contents)
1662 else:
1663 with GitFile(os.path.join(self.controldir(), path), "wb") as f:
1664 f.write(contents)
1666 def _del_named_file(self, path: str) -> None:
1667 try:
1668 os.unlink(os.path.join(self.controldir(), path))
1669 except FileNotFoundError:
1670 return
1672 def get_named_file(
1673 self,
1674 path: str | bytes,
1675 basedir: str | None = None,
1676 ) -> BinaryIO | None:
1677 """Get a file from the control dir with a specific name.
1679 Although the filename should be interpreted as a filename relative to
1680 the control dir in a disk-based Repo, the object returned need not be
1681 pointing to a file in that location.
1683 Args:
1684 path: The path to the file, relative to the control dir.
1685 basedir: Optional argument that specifies an alternative to the
1686 control dir.
1687 Returns: An open file object, or None if the file does not exist.
1688 """
1689 # TODO(dborowitz): sanitize filenames, since this is used directly by
1690 # the dumb web serving code.
1691 if basedir is None:
1692 basedir = self.controldir()
1693 if isinstance(path, bytes):
1694 path = path.decode("utf-8")
1695 path = path.lstrip(os.path.sep)
1696 try:
1697 return open(os.path.join(basedir, path), "rb")
1698 except FileNotFoundError:
1699 return None
1701 def index_path(self) -> str:
1702 """Return path to the index file."""
1703 return os.path.join(self.controldir(), INDEX_FILENAME)
1705 def open_index(self) -> "Index":
1706 """Open the index for this repository.
1708 Raises:
1709 NoIndexPresent: If no index is present
1710 Returns: The matching `Index`
1711 """
1712 from .index import Index
1714 if not self.has_index():
1715 raise NoIndexPresent
1717 # Check for manyFiles feature configuration
1718 config = self.get_config_stack()
1719 many_files = config.get_boolean(b"feature", b"manyFiles", False)
1720 skip_hash = False
1721 index_version = None
1723 if many_files:
1724 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true
1725 try:
1726 index_version_str = config.get(b"index", b"version")
1727 index_version = int(index_version_str)
1728 except KeyError:
1729 index_version = 4 # Default to version 4 for manyFiles
1730 skip_hash = config.get_boolean(b"index", b"skipHash", True)
1731 else:
1732 # Check for explicit index settings
1733 try:
1734 index_version_str = config.get(b"index", b"version")
1735 index_version = int(index_version_str)
1736 except KeyError:
1737 index_version = None
1738 skip_hash = config.get_boolean(b"index", b"skipHash", False)
1740 # Get shared repository permissions for index file
1741 file_mode, _ = self._get_shared_repository_permissions()
1743 return Index(
1744 self.index_path(),
1745 skip_hash=skip_hash,
1746 version=index_version,
1747 file_mode=file_mode,
1748 )
1750 def has_index(self) -> bool:
1751 """Check if an index is present."""
1752 # Bare repos must never have index files; non-bare repos may have a
1753 # missing index file, which is treated as empty.
1754 return not self.bare
1756 @replace_me(remove_in="0.26.0")
1757 def stage(
1758 self,
1759 fs_paths: str
1760 | bytes
1761 | os.PathLike[str]
1762 | Iterable[str | bytes | os.PathLike[str]],
1763 ) -> None:
1764 """Stage a set of paths.
1766 Args:
1767 fs_paths: List of paths, relative to the repository path
1768 """
1769 return self.get_worktree().stage(fs_paths)
1771 @replace_me(remove_in="0.26.0")
1772 def unstage(self, fs_paths: Sequence[str]) -> None:
1773 """Unstage specific file in the index.
1775 Args:
1776 fs_paths: a list of files to unstage,
1777 relative to the repository path.
1778 """
1779 return self.get_worktree().unstage(fs_paths)
1781 def clone(
1782 self,
1783 target_path: str | bytes | os.PathLike[str],
1784 *,
1785 mkdir: bool = True,
1786 bare: bool = False,
1787 origin: bytes = b"origin",
1788 checkout: bool | None = None,
1789 branch: bytes | None = None,
1790 progress: Callable[[str], None] | None = None,
1791 depth: int | None = None,
1792 symlinks: bool | None = None,
1793 ) -> "Repo":
1794 """Clone this repository.
1796 Args:
1797 target_path: Target path
1798 mkdir: Create the target directory
1799 bare: Whether to create a bare repository
1800 checkout: Whether or not to check-out HEAD after cloning
1801 origin: Base name for refs in target repository
1802 cloned from this repository
1803 branch: Optional branch or tag to be used as HEAD in the new repository
1804 instead of this repository's HEAD.
1805 progress: Optional progress function
1806 depth: Depth at which to fetch
1807 symlinks: Symlinks setting (default to autodetect)
1808 Returns: Created repository as `Repo`
1809 """
1810 encoded_path = os.fsencode(self.path)
1812 if mkdir:
1813 os.mkdir(target_path)
1815 try:
1816 if not bare:
1817 target = Repo.init(target_path, symlinks=symlinks)
1818 if checkout is None:
1819 checkout = True
1820 else:
1821 if checkout:
1822 raise ValueError("checkout and bare are incompatible")
1823 target = Repo.init_bare(target_path)
1825 try:
1826 target_config = target.get_config()
1827 target_config.set((b"remote", origin), b"url", encoded_path)
1828 target_config.set(
1829 (b"remote", origin),
1830 b"fetch",
1831 b"+refs/heads/*:refs/remotes/" + origin + b"/*",
1832 )
1833 target_config.write_to_path()
1835 ref_message = b"clone: from " + encoded_path
1836 self.fetch(target, depth=depth)
1837 target.refs.import_refs(
1838 Ref(b"refs/remotes/" + origin),
1839 self.refs.as_dict(Ref(b"refs/heads")),
1840 message=ref_message,
1841 )
1842 target.refs.import_refs(
1843 Ref(b"refs/tags"),
1844 self.refs.as_dict(Ref(b"refs/tags")),
1845 message=ref_message,
1846 )
1848 head_chain, origin_sha = self.refs.follow(HEADREF)
1849 origin_head = head_chain[-1] if head_chain else None
1850 if origin_sha and not origin_head:
1851 # set detached HEAD
1852 target.refs[HEADREF] = origin_sha
1853 else:
1854 _set_origin_head(target.refs, origin, origin_head)
1855 head_ref = _set_default_branch(
1856 target.refs, origin, origin_head, branch, ref_message
1857 )
1859 # Update target head
1860 if head_ref:
1861 head = _set_head(target.refs, head_ref, ref_message)
1862 else:
1863 head = None
1865 if checkout and head is not None:
1866 target.get_worktree().reset_index()
1867 except BaseException:
1868 target.close()
1869 raise
1870 except BaseException:
1871 if mkdir:
1872 import shutil
1874 shutil.rmtree(target_path)
1875 raise
1876 return target
1878 @replace_me(remove_in="0.26.0")
1879 def reset_index(self, tree: ObjectID | None = None) -> None:
1880 """Reset the index back to a specific tree.
1882 Args:
1883 tree: Tree SHA to reset to, None for current HEAD tree.
1884 """
1885 return self.get_worktree().reset_index(tree)
1887 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]:
1888 """Get condition matchers for includeIf conditions.
1890 Returns a dict of condition prefix to matcher function.
1891 """
1892 from pathlib import Path
1894 from .config import ConditionMatcher, match_glob_pattern
1896 # Add gitdir matchers
1897 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool:
1898 """Match gitdir against a pattern.
1900 Args:
1901 pattern: Pattern to match against
1902 case_sensitive: Whether to match case-sensitively
1904 Returns:
1905 True if gitdir matches pattern
1906 """
1907 # Handle relative patterns (starting with ./)
1908 if pattern.startswith("./"):
1909 # Can't handle relative patterns without config directory context
1910 return False
1912 # Normalize repository path
1913 try:
1914 repo_path = str(Path(self._controldir).resolve())
1915 except (OSError, ValueError):
1916 return False
1918 # Expand ~ in pattern and normalize
1919 pattern = os.path.expanduser(pattern)
1921 # Normalize pattern following Git's rules
1922 pattern = pattern.replace("\\", "/")
1923 if not pattern.startswith(("~/", "./", "/", "**")):
1924 # Check for Windows absolute path
1925 if len(pattern) >= 2 and pattern[1] == ":":
1926 pass
1927 else:
1928 pattern = "**/" + pattern
1929 if pattern.endswith("/"):
1930 pattern = pattern + "**"
1932 # Use the existing _match_gitdir_pattern function
1933 from .config import _match_gitdir_pattern
1935 pattern_bytes = pattern.encode("utf-8", errors="replace")
1936 repo_path_bytes = repo_path.encode("utf-8", errors="replace")
1938 return _match_gitdir_pattern(
1939 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive
1940 )
1942 # Add onbranch matcher
1943 def match_onbranch(pattern: str) -> bool:
1944 """Match current branch against a pattern.
1946 Args:
1947 pattern: Pattern to match against
1949 Returns:
1950 True if current branch matches pattern
1951 """
1952 try:
1953 # Get the current branch using refs
1954 ref_chain, _ = self.refs.follow(HEADREF)
1955 head_ref = ref_chain[-1] # Get the final resolved ref
1956 except KeyError:
1957 pass
1958 else:
1959 if head_ref and head_ref.startswith(b"refs/heads/"):
1960 # Extract branch name from ref
1961 branch = extract_branch_name(head_ref).decode(
1962 "utf-8", errors="replace"
1963 )
1964 return match_glob_pattern(branch, pattern)
1965 return False
1967 matchers: dict[str, ConditionMatcher] = {
1968 "onbranch:": match_onbranch,
1969 "gitdir:": lambda pattern: match_gitdir(pattern, True),
1970 "gitdir/i:": lambda pattern: match_gitdir(pattern, False),
1971 }
1973 return matchers
1975 def get_worktree_config(self) -> "ConfigFile":
1976 """Get the worktree-specific config.
1978 Returns:
1979 ConfigFile object for the worktree config
1980 """
1981 from .config import ConfigFile
1983 path = os.path.join(self.commondir(), "config.worktree")
1984 try:
1985 # Pass condition matchers for includeIf evaluation
1986 condition_matchers = self._get_config_condition_matchers()
1987 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1988 except FileNotFoundError:
1989 cf = ConfigFile()
1990 cf.path = path
1991 return cf
1993 def get_config(self) -> "ConfigFile":
1994 """Retrieve the config object.
1996 Returns: `ConfigFile` object for the ``.git/config`` file.
1997 """
1998 from .config import ConfigFile
2000 path = os.path.join(self._commondir, "config")
2001 try:
2002 # Pass condition matchers for includeIf evaluation
2003 condition_matchers = self._get_config_condition_matchers()
2004 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
2005 except FileNotFoundError:
2006 ret = ConfigFile()
2007 ret.path = path
2008 return ret
2010 def get_rebase_state_manager(self) -> "RebaseStateManager":
2011 """Get the appropriate rebase state manager for this repository.
2013 Returns: DiskRebaseStateManager instance
2014 """
2015 import os
2017 from .rebase import DiskRebaseStateManager
2019 path = os.path.join(self.controldir(), "rebase-merge")
2020 return DiskRebaseStateManager(path)
2022 def get_description(self) -> bytes | None:
2023 """Retrieve the description of this repository.
2025 Returns: Description as bytes or None.
2026 """
2027 path = os.path.join(self._controldir, "description")
2028 try:
2029 with GitFile(path, "rb") as f:
2030 return f.read()
2031 except FileNotFoundError:
2032 return None
2034 def __repr__(self) -> str:
2035 """Return string representation of this repository."""
2036 return f"<Repo at {self.path!r}>"
2038 def set_description(self, description: bytes) -> None:
2039 """Set the description for this repository.
2041 Args:
2042 description: Text to set as description for this repository.
2043 """
2044 self._put_named_file("description", description)
2046 @classmethod
2047 def _init_maybe_bare(
2048 cls,
2049 path: str | bytes | os.PathLike[str],
2050 controldir: str | bytes | os.PathLike[str],
2051 bare: bool,
2052 object_store: PackBasedObjectStore | None = None,
2053 config: "StackedConfig | None" = None,
2054 default_branch: bytes | None = None,
2055 symlinks: bool | None = None,
2056 format: int | None = None,
2057 shared_repository: str | bool | None = None,
2058 ) -> "Repo":
2059 path = os.fspath(path)
2060 if isinstance(path, bytes):
2061 path = os.fsdecode(path)
2062 controldir = os.fspath(controldir)
2063 if isinstance(controldir, bytes):
2064 controldir = os.fsdecode(controldir)
2066 # Determine shared repository permissions early
2067 file_mode: int | None = None
2068 dir_mode: int | None = None
2069 if shared_repository is not None:
2070 file_mode, dir_mode = parse_shared_repository(shared_repository)
2072 # Create base directories with appropriate permissions
2073 for d in BASE_DIRECTORIES:
2074 dir_path = os.path.join(controldir, *d)
2075 os.mkdir(dir_path)
2076 if dir_mode is not None:
2077 os.chmod(dir_path, dir_mode)
2079 if object_store is None:
2080 object_store = DiskObjectStore.init(
2081 os.path.join(controldir, OBJECTDIR),
2082 file_mode=file_mode,
2083 dir_mode=dir_mode,
2084 )
2085 ret = cls(path, bare=bare, object_store=object_store)
2086 if default_branch is None:
2087 if config is None:
2088 from .config import StackedConfig
2090 config = StackedConfig.default()
2091 try:
2092 default_branch = config.get("init", "defaultBranch")
2093 except KeyError:
2094 default_branch = DEFAULT_BRANCH
2095 ret.refs.set_symbolic_ref(HEADREF, local_branch_name(default_branch))
2096 ret._init_files(
2097 bare=bare,
2098 symlinks=symlinks,
2099 format=format,
2100 shared_repository=shared_repository,
2101 )
2102 return ret
2104 @classmethod
2105 def init(
2106 cls,
2107 path: str | bytes | os.PathLike[str],
2108 *,
2109 mkdir: bool = False,
2110 config: "StackedConfig | None" = None,
2111 default_branch: bytes | None = None,
2112 symlinks: bool | None = None,
2113 format: int | None = None,
2114 shared_repository: str | bool | None = None,
2115 ) -> "Repo":
2116 """Create a new repository.
2118 Args:
2119 path: Path in which to create the repository
2120 mkdir: Whether to create the directory
2121 config: Configuration object
2122 default_branch: Default branch name
2123 symlinks: Whether to support symlinks
2124 format: Repository format version (defaults to 0)
2125 shared_repository: Shared repository setting (group, all, umask, or octal)
2126 Returns: `Repo` instance
2127 """
2128 path = os.fspath(path)
2129 if isinstance(path, bytes):
2130 path = os.fsdecode(path)
2131 if mkdir:
2132 os.mkdir(path)
2133 controldir = os.path.join(path, CONTROLDIR)
2134 os.mkdir(controldir)
2135 _set_filesystem_hidden(controldir)
2136 return cls._init_maybe_bare(
2137 path,
2138 controldir,
2139 False,
2140 config=config,
2141 default_branch=default_branch,
2142 symlinks=symlinks,
2143 format=format,
2144 shared_repository=shared_repository,
2145 )
2147 @classmethod
2148 def _init_new_working_directory(
2149 cls,
2150 path: str | bytes | os.PathLike[str],
2151 main_repo: "Repo",
2152 identifier: str | None = None,
2153 mkdir: bool = False,
2154 ) -> "Repo":
2155 """Create a new working directory linked to a repository.
2157 Args:
2158 path: Path in which to create the working tree.
2159 main_repo: Main repository to reference
2160 identifier: Worktree identifier
2161 mkdir: Whether to create the directory
2162 Returns: `Repo` instance
2163 """
2164 path = os.fspath(path)
2165 if isinstance(path, bytes):
2166 path = os.fsdecode(path)
2167 if mkdir:
2168 os.mkdir(path)
2169 if identifier is None:
2170 identifier = os.path.basename(path)
2171 # Ensure we use absolute path for the worktree control directory
2172 main_controldir = os.path.abspath(main_repo.controldir())
2173 main_worktreesdir = os.path.join(main_controldir, WORKTREES)
2174 worktree_controldir = os.path.join(main_worktreesdir, identifier)
2175 gitdirfile = os.path.join(path, CONTROLDIR)
2176 with open(gitdirfile, "wb") as f:
2177 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n")
2179 # Get shared repository permissions from main repository
2180 _, dir_mode = main_repo._get_shared_repository_permissions()
2182 # Create directories with appropriate permissions
2183 try:
2184 os.mkdir(main_worktreesdir)
2185 if dir_mode is not None:
2186 os.chmod(main_worktreesdir, dir_mode)
2187 except FileExistsError:
2188 pass
2189 try:
2190 os.mkdir(worktree_controldir)
2191 if dir_mode is not None:
2192 os.chmod(worktree_controldir, dir_mode)
2193 except FileExistsError:
2194 pass
2195 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f:
2196 f.write(os.fsencode(gitdirfile) + b"\n")
2197 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f:
2198 f.write(b"../..\n")
2199 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f:
2200 f.write(main_repo.head() + b"\n")
2201 r = cls(os.path.normpath(path))
2202 r.get_worktree().reset_index()
2203 return r
2205 @classmethod
2206 def init_bare(
2207 cls,
2208 path: str | bytes | os.PathLike[str],
2209 *,
2210 mkdir: bool = False,
2211 object_store: PackBasedObjectStore | None = None,
2212 config: "StackedConfig | None" = None,
2213 default_branch: bytes | None = None,
2214 format: int | None = None,
2215 shared_repository: str | bool | None = None,
2216 ) -> "Repo":
2217 """Create a new bare repository.
2219 ``path`` should already exist and be an empty directory.
2221 Args:
2222 path: Path to create bare repository in
2223 mkdir: Whether to create the directory
2224 object_store: Object store to use
2225 config: Configuration object
2226 default_branch: Default branch name
2227 format: Repository format version (defaults to 0)
2228 shared_repository: Shared repository setting (group, all, umask, or octal)
2229 Returns: a `Repo` instance
2230 """
2231 path = os.fspath(path)
2232 if isinstance(path, bytes):
2233 path = os.fsdecode(path)
2234 if mkdir:
2235 os.mkdir(path)
2236 return cls._init_maybe_bare(
2237 path,
2238 path,
2239 True,
2240 object_store=object_store,
2241 config=config,
2242 default_branch=default_branch,
2243 format=format,
2244 shared_repository=shared_repository,
2245 )
2247 create = init_bare
2249 def close(self) -> None:
2250 """Close any files opened by this repository."""
2251 self.object_store.close()
2252 # Clean up filter context if it was created
2253 if self.filter_context is not None:
2254 self.filter_context.close()
2255 self.filter_context = None
2257 def __enter__(self) -> "Repo":
2258 """Enter context manager."""
2259 return self
2261 def __exit__(
2262 self,
2263 exc_type: type[BaseException] | None,
2264 exc_val: BaseException | None,
2265 exc_tb: TracebackType | None,
2266 ) -> None:
2267 """Exit context manager and close repository."""
2268 self.close()
2270 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]:
2271 """Read .gitattributes file from working tree.
2273 Returns:
2274 Dictionary mapping file patterns to attributes
2275 """
2276 gitattributes = {}
2277 gitattributes_path = os.path.join(self.path, ".gitattributes")
2279 if os.path.exists(gitattributes_path):
2280 with open(gitattributes_path, "rb") as f:
2281 for line in f:
2282 line = line.strip()
2283 if not line or line.startswith(b"#"):
2284 continue
2286 parts = line.split()
2287 if len(parts) < 2:
2288 continue
2290 pattern = parts[0]
2291 attrs = {}
2293 for attr in parts[1:]:
2294 if attr.startswith(b"-"):
2295 # Unset attribute
2296 attrs[attr[1:]] = b"false"
2297 elif b"=" in attr:
2298 # Set to value
2299 key, value = attr.split(b"=", 1)
2300 attrs[key] = value
2301 else:
2302 # Set attribute
2303 attrs[attr] = b"true"
2305 gitattributes[pattern] = attrs
2307 return gitattributes
2309 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
2310 """Return a BlobNormalizer object."""
2311 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2313 # Get fresh configuration and GitAttributes
2314 config_stack = self.get_config_stack()
2315 git_attributes = self.get_gitattributes()
2317 # Lazily create FilterContext if needed
2318 if self.filter_context is None:
2319 filter_registry = FilterRegistry(config_stack, self)
2320 self.filter_context = FilterContext(filter_registry)
2321 else:
2322 # Refresh the context with current config to handle config changes
2323 self.filter_context.refresh_config(config_stack)
2325 # Return a new FilterBlobNormalizer with the context
2326 return FilterBlobNormalizer(
2327 config_stack, git_attributes, filter_context=self.filter_context
2328 )
2330 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
2331 """Read gitattributes for the repository.
2333 Args:
2334 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
2336 Returns:
2337 GitAttributes object that can be used to match paths
2338 """
2339 from .attrs import (
2340 GitAttributes,
2341 Pattern,
2342 parse_git_attributes,
2343 )
2345 patterns = []
2347 # Read system gitattributes (TODO: implement this)
2348 # Read global gitattributes (TODO: implement this)
2350 # Read repository .gitattributes from index/tree
2351 if tree is None:
2352 try:
2353 # Try to get from HEAD
2354 head = self[b"HEAD"]
2355 if isinstance(head, Tag):
2356 _cls, obj = head.object
2357 head = self.get_object(obj)
2358 assert isinstance(head, Commit)
2359 tree = head.tree
2360 except KeyError:
2361 # No HEAD, no attributes from tree
2362 pass
2364 if tree is not None:
2365 try:
2366 tree_obj = self[tree]
2367 assert isinstance(tree_obj, Tree)
2368 if b".gitattributes" in tree_obj:
2369 _, attrs_sha = tree_obj[b".gitattributes"]
2370 attrs_blob = self[attrs_sha]
2371 if isinstance(attrs_blob, Blob):
2372 attrs_data = BytesIO(attrs_blob.data)
2373 for pattern_bytes, attrs in parse_git_attributes(attrs_data):
2374 pattern = Pattern(pattern_bytes)
2375 patterns.append((pattern, attrs))
2376 except (KeyError, NotTreeError):
2377 pass
2379 # Read .git/info/attributes
2380 info_attrs_path = os.path.join(self.controldir(), "info", "attributes")
2381 if os.path.exists(info_attrs_path):
2382 with open(info_attrs_path, "rb") as f:
2383 for pattern_bytes, attrs in parse_git_attributes(f):
2384 pattern = Pattern(pattern_bytes)
2385 patterns.append((pattern, attrs))
2387 # Read .gitattributes from working directory (if it exists)
2388 working_attrs_path = os.path.join(self.path, ".gitattributes")
2389 if os.path.exists(working_attrs_path):
2390 with open(working_attrs_path, "rb") as f:
2391 for pattern_bytes, attrs in parse_git_attributes(f):
2392 pattern = Pattern(pattern_bytes)
2393 patterns.append((pattern, attrs))
2395 return GitAttributes(patterns)
2397 @replace_me(remove_in="0.26.0")
2398 def _sparse_checkout_file_path(self) -> str:
2399 """Return the path of the sparse-checkout file in this repo's control dir."""
2400 return self.get_worktree()._sparse_checkout_file_path()
2402 @replace_me(remove_in="0.26.0")
2403 def configure_for_cone_mode(self) -> None:
2404 """Ensure the repository is configured for cone-mode sparse-checkout."""
2405 return self.get_worktree().configure_for_cone_mode()
2407 @replace_me(remove_in="0.26.0")
2408 def infer_cone_mode(self) -> bool:
2409 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
2410 return self.get_worktree().infer_cone_mode()
2412 @replace_me(remove_in="0.26.0")
2413 def get_sparse_checkout_patterns(self) -> list[str]:
2414 """Return a list of sparse-checkout patterns from info/sparse-checkout.
2416 Returns:
2417 A list of patterns. Returns an empty list if the file is missing.
2418 """
2419 return self.get_worktree().get_sparse_checkout_patterns()
2421 @replace_me(remove_in="0.26.0")
2422 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None:
2423 """Write the given sparse-checkout patterns into info/sparse-checkout.
2425 Creates the info/ directory if it does not exist.
2427 Args:
2428 patterns: A list of gitignore-style patterns to store.
2429 """
2430 return self.get_worktree().set_sparse_checkout_patterns(patterns)
2432 @replace_me(remove_in="0.26.0")
2433 def set_cone_mode_patterns(self, dirs: Sequence[str] | None = None) -> None:
2434 """Write the given cone-mode directory patterns into info/sparse-checkout.
2436 For each directory to include, add an inclusion line that "undoes" the prior
2437 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
2438 Never add the same line twice.
2439 """
2440 return self.get_worktree().set_cone_mode_patterns(dirs)
2443class MemoryRepo(BaseRepo):
2444 """Repo that stores refs, objects, and named files in memory.
2446 MemoryRepos are always bare: they have no working tree and no index, since
2447 those have a stronger dependency on the filesystem.
2448 """
2450 filter_context: "FilterContext | None"
2452 def __init__(self) -> None:
2453 """Create a new repository in memory."""
2454 from .config import ConfigFile
2456 self._reflog: list[Any] = []
2457 refs_container = DictRefsContainer({}, logger=self._append_reflog)
2458 BaseRepo.__init__(self, MemoryObjectStore(), refs_container)
2459 self._named_files: dict[str, bytes] = {}
2460 self.bare = True
2461 self._config = ConfigFile()
2462 self._description: bytes | None = None
2463 self.filter_context = None
2465 def _append_reflog(
2466 self,
2467 ref: bytes,
2468 old_sha: bytes | None,
2469 new_sha: bytes | None,
2470 committer: bytes | None,
2471 timestamp: int | None,
2472 timezone: int | None,
2473 message: bytes | None,
2474 ) -> None:
2475 self._reflog.append(
2476 (ref, old_sha, new_sha, committer, timestamp, timezone, message)
2477 )
2479 def set_description(self, description: bytes) -> None:
2480 """Set the description for this repository.
2482 Args:
2483 description: Text to set as description
2484 """
2485 self._description = description
2487 def get_description(self) -> bytes | None:
2488 """Get the description of this repository.
2490 Returns:
2491 Repository description as bytes
2492 """
2493 return self._description
2495 def _determine_file_mode(self) -> bool:
2496 """Probe the file-system to determine whether permissions can be trusted.
2498 Returns: True if permissions can be trusted, False otherwise.
2499 """
2500 return sys.platform != "win32"
2502 def _determine_symlinks(self) -> bool:
2503 """Probe the file-system to determine whether permissions can be trusted.
2505 Returns: True if permissions can be trusted, False otherwise.
2506 """
2507 return sys.platform != "win32"
2509 def _put_named_file(self, path: str, contents: bytes) -> None:
2510 """Write a file to the control dir with the given name and contents.
2512 Args:
2513 path: The path to the file, relative to the control dir.
2514 contents: A string to write to the file.
2515 """
2516 self._named_files[path] = contents
2518 def _del_named_file(self, path: str) -> None:
2519 try:
2520 del self._named_files[path]
2521 except KeyError:
2522 pass
2524 def get_named_file(
2525 self,
2526 path: str | bytes,
2527 basedir: str | None = None,
2528 ) -> BytesIO | None:
2529 """Get a file from the control dir with a specific name.
2531 Although the filename should be interpreted as a filename relative to
2532 the control dir in a disk-baked Repo, the object returned need not be
2533 pointing to a file in that location.
2535 Args:
2536 path: The path to the file, relative to the control dir.
2537 basedir: Optional base directory for the path
2538 Returns: An open file object, or None if the file does not exist.
2539 """
2540 path_str = path.decode() if isinstance(path, bytes) else path
2541 contents = self._named_files.get(path_str, None)
2542 if contents is None:
2543 return None
2544 return BytesIO(contents)
2546 def open_index(self) -> "Index":
2547 """Fail to open index for this repo, since it is bare.
2549 Raises:
2550 NoIndexPresent: Raised when no index is present
2551 """
2552 raise NoIndexPresent
2554 def get_config(self) -> "ConfigFile":
2555 """Retrieve the config object.
2557 Returns: `ConfigFile` object.
2558 """
2559 return self._config
2561 def get_rebase_state_manager(self) -> "RebaseStateManager":
2562 """Get the appropriate rebase state manager for this repository.
2564 Returns: MemoryRebaseStateManager instance
2565 """
2566 from .rebase import MemoryRebaseStateManager
2568 return MemoryRebaseStateManager(self)
2570 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
2571 """Return a BlobNormalizer object for checkin/checkout operations."""
2572 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2574 # Get fresh configuration and GitAttributes
2575 config_stack = self.get_config_stack()
2576 git_attributes = self.get_gitattributes()
2578 # Lazily create FilterContext if needed
2579 if self.filter_context is None:
2580 filter_registry = FilterRegistry(config_stack, self)
2581 self.filter_context = FilterContext(filter_registry)
2582 else:
2583 # Refresh the context with current config to handle config changes
2584 self.filter_context.refresh_config(config_stack)
2586 # Return a new FilterBlobNormalizer with the context
2587 return FilterBlobNormalizer(
2588 config_stack, git_attributes, filter_context=self.filter_context
2589 )
2591 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
2592 """Read gitattributes for the repository."""
2593 from .attrs import GitAttributes
2595 # Memory repos don't have working trees or gitattributes files
2596 # Return empty GitAttributes
2597 return GitAttributes([])
2599 def close(self) -> None:
2600 """Close any resources opened by this repository."""
2601 # Clean up filter context if it was created
2602 if self.filter_context is not None:
2603 self.filter_context.close()
2604 self.filter_context = None
2606 def do_commit(
2607 self,
2608 message: bytes | None = None,
2609 committer: bytes | None = None,
2610 author: bytes | None = None,
2611 commit_timestamp: float | None = None,
2612 commit_timezone: int | None = None,
2613 author_timestamp: float | None = None,
2614 author_timezone: int | None = None,
2615 tree: ObjectID | None = None,
2616 encoding: bytes | None = None,
2617 ref: Ref | None = HEADREF,
2618 merge_heads: list[ObjectID] | None = None,
2619 no_verify: bool = False,
2620 sign: bool = False,
2621 ) -> bytes:
2622 """Create a new commit.
2624 This is a simplified implementation for in-memory repositories that
2625 doesn't support worktree operations or hooks.
2627 Args:
2628 message: Commit message
2629 committer: Committer fullname
2630 author: Author fullname
2631 commit_timestamp: Commit timestamp (defaults to now)
2632 commit_timezone: Commit timestamp timezone (defaults to GMT)
2633 author_timestamp: Author timestamp (defaults to commit timestamp)
2634 author_timezone: Author timestamp timezone (defaults to commit timezone)
2635 tree: SHA1 of the tree root to use
2636 encoding: Encoding
2637 ref: Optional ref to commit to (defaults to current branch).
2638 If None, creates a dangling commit without updating any ref.
2639 merge_heads: Merge heads
2640 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo)
2641 sign: GPG Sign the commit (ignored for MemoryRepo)
2643 Returns:
2644 New commit SHA1
2645 """
2646 import time
2648 from .objects import Commit
2650 if tree is None:
2651 raise ValueError("tree must be specified for MemoryRepo")
2653 c = Commit()
2654 if len(tree) != 40:
2655 raise ValueError("tree must be a 40-byte hex sha string")
2656 c.tree = tree
2658 config = self.get_config_stack()
2659 if merge_heads is None:
2660 merge_heads = []
2661 if committer is None:
2662 committer = get_user_identity(config, kind="COMMITTER")
2663 check_user_identity(committer)
2664 c.committer = committer
2665 if commit_timestamp is None:
2666 commit_timestamp = time.time()
2667 c.commit_time = int(commit_timestamp)
2668 if commit_timezone is None:
2669 commit_timezone = 0
2670 c.commit_timezone = commit_timezone
2671 if author is None:
2672 author = get_user_identity(config, kind="AUTHOR")
2673 c.author = author
2674 check_user_identity(author)
2675 if author_timestamp is None:
2676 author_timestamp = commit_timestamp
2677 c.author_time = int(author_timestamp)
2678 if author_timezone is None:
2679 author_timezone = commit_timezone
2680 c.author_timezone = author_timezone
2681 if encoding is None:
2682 try:
2683 encoding = config.get(("i18n",), "commitEncoding")
2684 except KeyError:
2685 pass
2686 if encoding is not None:
2687 c.encoding = encoding
2689 # Handle message (for MemoryRepo, we don't support callable messages)
2690 if callable(message):
2691 message = message(self, c)
2692 if message is None:
2693 raise ValueError("Message callback returned None")
2695 if message is None:
2696 raise ValueError("No commit message specified")
2698 c.message = message
2700 if ref is None:
2701 # Create a dangling commit
2702 c.parents = merge_heads
2703 self.object_store.add_object(c)
2704 else:
2705 try:
2706 old_head = self.refs[ref]
2707 c.parents = [old_head, *merge_heads]
2708 self.object_store.add_object(c)
2709 ok = self.refs.set_if_equals(
2710 ref,
2711 old_head,
2712 c.id,
2713 message=b"commit: " + message,
2714 committer=committer,
2715 timestamp=int(commit_timestamp),
2716 timezone=commit_timezone,
2717 )
2718 except KeyError:
2719 c.parents = merge_heads
2720 self.object_store.add_object(c)
2721 ok = self.refs.add_if_new(
2722 ref,
2723 c.id,
2724 message=b"commit: " + message,
2725 committer=committer,
2726 timestamp=int(commit_timestamp),
2727 timezone=commit_timezone,
2728 )
2729 if not ok:
2730 from .errors import CommitError
2732 raise CommitError(f"{ref!r} changed during commit")
2734 return c.id
2736 @classmethod
2737 def init_bare(
2738 cls,
2739 objects: Iterable[ShaFile],
2740 refs: Mapping[Ref, ObjectID],
2741 format: int | None = None,
2742 ) -> "MemoryRepo":
2743 """Create a new bare repository in memory.
2745 Args:
2746 objects: Objects for the new repository,
2747 as iterable
2748 refs: Refs as dictionary, mapping names
2749 to object SHA1s
2750 format: Repository format version (defaults to 0)
2751 """
2752 ret = cls()
2753 for obj in objects:
2754 ret.object_store.add_object(obj)
2755 for refname, sha in refs.items():
2756 ret.refs.add_if_new(refname, sha)
2757 ret._init_files(bare=True, format=format)
2758 return ret