Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 38%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# repo.py -- For dealing with git repositories.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
24"""Repository access.
26This module contains the base class for git repositories
27(BaseRepo) and an implementation which uses a repository on
28local disk (Repo).
30"""
32__all__ = [
33 "BASE_DIRECTORIES",
34 "COMMONDIR",
35 "CONTROLDIR",
36 "DEFAULT_BRANCH",
37 "DEFAULT_OFS_DELTA",
38 "GITDIR",
39 "INDEX_FILENAME",
40 "OBJECTDIR",
41 "REFSDIR",
42 "REFSDIR_HEADS",
43 "REFSDIR_TAGS",
44 "WORKTREES",
45 "BaseRepo",
46 "DefaultIdentityNotFound",
47 "InvalidUserIdentity",
48 "MemoryRepo",
49 "ParentsProvider",
50 "Repo",
51 "UnsupportedExtension",
52 "UnsupportedVersion",
53 "check_user_identity",
54 "get_user_identity",
55 "parse_graftpoints",
56 "parse_shared_repository",
57 "read_gitfile",
58 "serialize_graftpoints",
59]
61import os
62import stat
63import sys
64import time
65import warnings
66from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Sequence
67from io import BytesIO
68from types import TracebackType
69from typing import (
70 TYPE_CHECKING,
71 Any,
72 BinaryIO,
73 TypeVar,
74)
76if TYPE_CHECKING:
77 # There are no circular imports here, but we try to defer imports as long
78 # as possible to reduce start-up time for anything that doesn't need
79 # these imports.
80 from .attrs import GitAttributes
81 from .config import ConditionMatcher, ConfigFile, StackedConfig
82 from .diff_tree import RenameDetector
83 from .filters import FilterBlobNormalizer, FilterContext
84 from .index import Index
85 from .notes import Notes
86 from .object_format import ObjectFormat
87 from .object_store import BaseObjectStore, GraphWalker
88 from .pack import UnpackedObject
89 from .rebase import RebaseStateManager
90 from .walk import Walker
91 from .worktree import WorkTree
93from . import reflog
94from .errors import (
95 NoIndexPresent,
96 NotBlobError,
97 NotCommitError,
98 NotGitRepository,
99 NotTagError,
100 NotTreeError,
101 RefFormatError,
102)
103from .file import GitFile
104from .hooks import (
105 CommitMsgShellHook,
106 Hook,
107 PostCommitShellHook,
108 PostReceiveShellHook,
109 PreCommitShellHook,
110 PreReceiveShellHook,
111 UpdateShellHook,
112)
113from .object_store import (
114 DiskObjectStore,
115 MemoryObjectStore,
116 MissingObjectFinder,
117 ObjectStoreGraphWalker,
118 PackBasedObjectStore,
119 PackCapableObjectStore,
120 find_shallow,
121 peel_sha,
122)
123from .objects import (
124 Blob,
125 Commit,
126 ObjectID,
127 RawObjectID,
128 ShaFile,
129 Tag,
130 Tree,
131 check_hexsha,
132 valid_hexsha,
133)
134from .pack import generate_unpacked_objects
135from .refs import (
136 HEADREF,
137 LOCAL_TAG_PREFIX, # noqa: F401
138 SYMREF, # noqa: F401
139 DictRefsContainer,
140 DiskRefsContainer,
141 Ref,
142 RefsContainer,
143 _set_default_branch,
144 _set_head,
145 _set_origin_head,
146 check_ref_format, # noqa: F401
147 extract_branch_name,
148 is_per_worktree_ref,
149 local_branch_name,
150 read_packed_refs, # noqa: F401
151 read_packed_refs_with_peeled, # noqa: F401
152 write_packed_refs, # noqa: F401
153)
155CONTROLDIR = ".git"
156OBJECTDIR = "objects"
157DEFAULT_OFS_DELTA = True
159T = TypeVar("T", bound="ShaFile")
160REFSDIR = "refs"
161REFSDIR_TAGS = "tags"
162REFSDIR_HEADS = "heads"
163INDEX_FILENAME = "index"
164COMMONDIR = "commondir"
165GITDIR = "gitdir"
166WORKTREES = "worktrees"
168BASE_DIRECTORIES = [
169 ["branches"],
170 [REFSDIR],
171 [REFSDIR, REFSDIR_TAGS],
172 [REFSDIR, REFSDIR_HEADS],
173 ["hooks"],
174 ["info"],
175]
177DEFAULT_BRANCH = b"master"
180class InvalidUserIdentity(Exception):
181 """User identity is not of the format 'user <email>'."""
183 def __init__(self, identity: str) -> None:
184 """Initialize InvalidUserIdentity exception."""
185 self.identity = identity
188class DefaultIdentityNotFound(Exception):
189 """Default identity could not be determined."""
192# TODO(jelmer): Cache?
193def _get_default_identity() -> tuple[str, str]:
194 import socket
196 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
197 username = os.environ.get(name)
198 if username:
199 break
200 else:
201 username = None
203 try:
204 import pwd
205 except ImportError:
206 fullname = None
207 else:
208 try:
209 entry = pwd.getpwuid(os.getuid()) # type: ignore[attr-defined,unused-ignore]
210 except KeyError:
211 fullname = None
212 else:
213 if getattr(entry, "gecos", None):
214 fullname = entry.pw_gecos.split(",")[0]
215 else:
216 fullname = None
217 if username is None:
218 username = entry.pw_name
219 if not fullname:
220 if username is None:
221 raise DefaultIdentityNotFound("no username found")
222 fullname = username
223 email = os.environ.get("EMAIL")
224 if email is None:
225 if username is None:
226 raise DefaultIdentityNotFound("no username found")
227 email = f"{username}@{socket.gethostname()}"
228 return (fullname, email)
231def get_user_identity(config: "StackedConfig", kind: str | None = None) -> bytes:
232 """Determine the identity to use for new commits.
234 If kind is set, this first checks
235 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL.
237 If those variables are not set, then it will fall back
238 to reading the user.name and user.email settings from
239 the specified configuration.
241 If that also fails, then it will fall back to using
242 the current users' identity as obtained from the host
243 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f).
245 Args:
246 config: Configuration stack to read from
247 kind: Optional kind to return identity for,
248 usually either "AUTHOR" or "COMMITTER".
250 Returns:
251 A user identity
252 """
253 user: bytes | None = None
254 email: bytes | None = None
255 if kind:
256 user_uc = os.environ.get("GIT_" + kind + "_NAME")
257 if user_uc is not None:
258 user = user_uc.encode("utf-8")
259 email_uc = os.environ.get("GIT_" + kind + "_EMAIL")
260 if email_uc is not None:
261 email = email_uc.encode("utf-8")
262 if user is None:
263 try:
264 user = config.get(("user",), "name")
265 except KeyError:
266 user = None
267 if email is None:
268 try:
269 email = config.get(("user",), "email")
270 except KeyError:
271 email = None
272 default_user, default_email = _get_default_identity()
273 if user is None:
274 user = default_user.encode("utf-8")
275 if email is None:
276 email = default_email.encode("utf-8")
277 if email.startswith(b"<") and email.endswith(b">"):
278 email = email[1:-1]
279 return user + b" <" + email + b">"
282def check_user_identity(identity: bytes) -> None:
283 """Verify that a user identity is formatted correctly.
285 Args:
286 identity: User identity bytestring
287 Raises:
288 InvalidUserIdentity: Raised when identity is invalid
289 """
290 try:
291 _fst, snd = identity.split(b" <", 1)
292 except ValueError as exc:
293 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc
294 if b">" not in snd:
295 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
296 if b"\0" in identity or b"\n" in identity:
297 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
300def parse_graftpoints(
301 graftpoints: Iterable[bytes],
302) -> dict[ObjectID, list[ObjectID]]:
303 """Convert a list of graftpoints into a dict.
305 Args:
306 graftpoints: Iterator of graftpoint lines
308 Each line is formatted as:
309 <commit sha1> <parent sha1> [<parent sha1>]*
311 Resulting dictionary is:
312 <commit sha1>: [<parent sha1>*]
314 https://git.wiki.kernel.org/index.php/GraftPoint
315 """
316 grafts: dict[ObjectID, list[ObjectID]] = {}
317 for line in graftpoints:
318 raw_graft = line.split(None, 1)
320 commit = ObjectID(raw_graft[0])
321 if len(raw_graft) == 2:
322 parents = [ObjectID(p) for p in raw_graft[1].split()]
323 else:
324 parents = []
326 for sha in [commit, *parents]:
327 check_hexsha(sha, "Invalid graftpoint")
329 grafts[commit] = parents
330 return grafts
333def serialize_graftpoints(graftpoints: Mapping[ObjectID, Sequence[ObjectID]]) -> bytes:
334 """Convert a dictionary of grafts into string.
336 The graft dictionary is:
337 <commit sha1>: [<parent sha1>*]
339 Each line is formatted as:
340 <commit sha1> <parent sha1> [<parent sha1>]*
342 https://git.wiki.kernel.org/index.php/GraftPoint
344 """
345 graft_lines = []
346 for commit, parents in graftpoints.items():
347 if parents:
348 graft_lines.append(commit + b" " + b" ".join(parents))
349 else:
350 graft_lines.append(commit)
351 return b"\n".join(graft_lines)
354def _set_filesystem_hidden(path: str) -> None:
355 """Mark path as to be hidden if supported by platform and filesystem.
357 On win32 uses SetFileAttributesW api:
358 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw>
359 """
360 if sys.platform == "win32":
361 import ctypes
362 from ctypes.wintypes import BOOL, DWORD, LPCWSTR
364 FILE_ATTRIBUTE_HIDDEN = 2
365 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)(
366 ("SetFileAttributesW", ctypes.windll.kernel32)
367 )
369 if isinstance(path, bytes):
370 path = os.fsdecode(path)
371 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN):
372 pass # Could raise or log `ctypes.WinError()` here
374 # Could implement other platform specific filesystem hiding here
377def parse_shared_repository(
378 value: str | bytes | bool,
379) -> tuple[int | None, int | None]:
380 """Parse core.sharedRepository configuration value.
382 Args:
383 value: Configuration value (string, bytes, or boolean)
385 Returns:
386 tuple of (file_mask, directory_mask) or (None, None) if not shared
388 The masks are permission bits to apply via chmod.
389 """
390 if isinstance(value, bytes):
391 value = value.decode("utf-8", errors="replace")
393 # Handle boolean values
394 if isinstance(value, bool):
395 if value:
396 # true = group (same as "group")
397 return (0o664, 0o2775)
398 else:
399 # false = umask (use system umask, no adjustment)
400 return (None, None)
402 # Handle string values
403 value_lower = value.lower()
405 if value_lower in ("false", "0", ""):
406 # Use umask (no adjustment)
407 return (None, None)
409 if value_lower in ("true", "1", "group"):
410 # Group writable (with setgid bit)
411 return (0o664, 0o2775)
413 if value_lower in ("all", "world", "everybody", "2"):
414 # World readable/writable (with setgid bit)
415 return (0o666, 0o2777)
417 if value_lower == "umask":
418 # Explicitly use umask
419 return (None, None)
421 # Try to parse as octal
422 if value.startswith("0"):
423 try:
424 mode = int(value, 8)
425 # For directories, add execute bits where read bits are set
426 # and add setgid bit for shared repositories
427 dir_mode = mode | 0o2000 # Add setgid bit
428 if mode & 0o004:
429 dir_mode |= 0o001
430 if mode & 0o040:
431 dir_mode |= 0o010
432 if mode & 0o400:
433 dir_mode |= 0o100
434 return (mode, dir_mode)
435 except ValueError:
436 pass
438 # Default to umask for unrecognized values
439 return (None, None)
442class ParentsProvider:
443 """Provider for commit parent information."""
445 def __init__(
446 self,
447 store: "BaseObjectStore",
448 grafts: dict[ObjectID, list[ObjectID]] = {},
449 shallows: Iterable[ObjectID] = [],
450 ) -> None:
451 """Initialize ParentsProvider.
453 Args:
454 store: Object store to use
455 grafts: Graft information
456 shallows: Shallow commit SHAs
457 """
458 self.store = store
459 self.grafts = grafts
460 self.shallows = set(shallows)
462 # Get commit graph once at initialization for performance
463 self.commit_graph = store.get_commit_graph()
465 def get_parents(
466 self, commit_id: ObjectID, commit: Commit | None = None
467 ) -> list[ObjectID]:
468 """Get parents for a commit using the parents provider."""
469 try:
470 return self.grafts[commit_id]
471 except KeyError:
472 pass
473 if commit_id in self.shallows:
474 return []
476 # Try to use commit graph for faster parent lookup
477 if self.commit_graph:
478 parents = self.commit_graph.get_parents(commit_id)
479 if parents is not None:
480 return parents
482 # Fallback to reading the commit object
483 if commit is None:
484 obj = self.store[commit_id]
485 if not isinstance(obj, Commit):
486 raise ValueError(
487 f"Expected Commit object for commit_id {commit_id.decode()}, "
488 f"got {type(obj).__name__}. This usually means a reference "
489 f"points to a {type(obj).__name__} object instead of a Commit."
490 )
491 commit = obj
492 result: list[ObjectID] = commit.parents
493 return result
496class BaseRepo:
497 """Base class for a git repository.
499 This base class is meant to be used for Repository implementations that e.g.
500 work on top of a different transport than a standard filesystem path.
502 Attributes:
503 object_store: Dictionary-like object for accessing
504 the objects
505 refs: Dictionary-like object with the refs in this
506 repository
507 """
509 def __init__(
510 self,
511 object_store: "PackCapableObjectStore",
512 refs: RefsContainer,
513 object_format: "ObjectFormat | None" = None,
514 ) -> None:
515 """Open a repository.
517 This shouldn't be called directly, but rather through one of the
518 base classes, such as MemoryRepo or Repo.
520 Args:
521 object_store: Object store to use
522 refs: Refs container to use
523 object_format: Hash algorithm to use (if None, will use object_store's format)
524 """
525 self.object_store = object_store
526 self.refs = refs
528 self._graftpoints: dict[ObjectID, list[ObjectID]] = {}
529 self.hooks: dict[str, Hook] = {}
530 if object_format is None:
531 self.object_format: ObjectFormat = object_store.object_format
532 else:
533 self.object_format = object_format
535 def _determine_file_mode(self) -> bool:
536 """Probe the file-system to determine whether permissions can be trusted.
538 Returns: True if permissions can be trusted, False otherwise.
539 """
540 raise NotImplementedError(self._determine_file_mode)
542 def _determine_symlinks(self) -> bool:
543 """Probe the filesystem to determine whether symlinks can be created.
545 Returns: True if symlinks can be created, False otherwise.
546 """
547 # For now, just mimic the old behaviour
548 return sys.platform != "win32"
550 def _init_files(
551 self,
552 bare: bool,
553 symlinks: bool | None = None,
554 format: int | None = None,
555 shared_repository: str | bool | None = None,
556 object_format: str | None = None,
557 ) -> None:
558 """Initialize a default set of named files."""
559 from .config import ConfigFile
561 self._put_named_file("description", b"Unnamed repository")
562 f = BytesIO()
563 cf = ConfigFile()
565 # Determine the appropriate format version
566 if object_format == "sha256":
567 # SHA256 requires format version 1
568 if format is None:
569 format = 1
570 elif format != 1:
571 raise ValueError(
572 "SHA256 object format requires repository format version 1"
573 )
574 else:
575 # SHA1 (default) can use format 0 or 1
576 if format is None:
577 format = 0
579 if format not in (0, 1):
580 raise ValueError(f"Unsupported repository format version: {format}")
582 cf.set("core", "repositoryformatversion", str(format))
584 # Set object format extension if using SHA256
585 if object_format == "sha256":
586 cf.set("extensions", "objectformat", "sha256")
588 # Set hash algorithm based on object format
589 from .object_format import get_object_format
591 self.object_format = get_object_format(object_format)
593 if self._determine_file_mode():
594 cf.set("core", "filemode", True)
595 else:
596 cf.set("core", "filemode", False)
598 if symlinks is None and not bare:
599 symlinks = self._determine_symlinks()
601 if symlinks is False:
602 cf.set("core", "symlinks", symlinks)
604 # On macOS, set precomposeunicode to true since HFS+/APFS
605 # returns filenames in NFD (decomposed) Unicode form
606 if sys.platform == "darwin":
607 cf.set("core", "precomposeunicode", True)
609 cf.set("core", "bare", bare)
610 cf.set("core", "logallrefupdates", True)
612 # Set shared repository if specified
613 if shared_repository is not None:
614 if isinstance(shared_repository, bool):
615 cf.set("core", "sharedRepository", shared_repository)
616 else:
617 cf.set("core", "sharedRepository", shared_repository)
619 cf.write_to_file(f)
620 self._put_named_file("config", f.getvalue())
621 self._put_named_file(os.path.join("info", "exclude"), b"")
623 # Allow subclasses to handle config initialization
624 self._init_config(cf)
626 def _init_config(self, config: "ConfigFile") -> None:
627 """Initialize repository configuration.
629 This method can be overridden by subclasses to handle config initialization.
631 Args:
632 config: The ConfigFile object that was just created
633 """
634 # Default implementation does nothing
636 def get_named_file(self, path: str) -> BinaryIO | None:
637 """Get a file from the control dir with a specific name.
639 Although the filename should be interpreted as a filename relative to
640 the control dir in a disk-based Repo, the object returned need not be
641 pointing to a file in that location.
643 Args:
644 path: The path to the file, relative to the control dir.
645 Returns: An open file object, or None if the file does not exist.
646 """
647 raise NotImplementedError(self.get_named_file)
649 def _put_named_file(self, path: str, contents: bytes) -> None:
650 """Write a file to the control dir with the given name and contents.
652 Args:
653 path: The path to the file, relative to the control dir.
654 contents: A string to write to the file.
655 """
656 raise NotImplementedError(self._put_named_file)
658 def _del_named_file(self, path: str) -> None:
659 """Delete a file in the control directory with the given name."""
660 raise NotImplementedError(self._del_named_file)
662 def open_index(self) -> "Index":
663 """Open the index for this repository.
665 Raises:
666 NoIndexPresent: If no index is present
667 Returns: The matching `Index`
668 """
669 raise NotImplementedError(self.open_index)
671 def _change_object_format(self, object_format_name: str) -> None:
672 """Change the object format of this repository.
674 This can only be done if the object store is empty (no objects written yet).
676 Args:
677 object_format_name: Name of the new object format (e.g., "sha1", "sha256")
679 Raises:
680 AssertionError: If the object store is not empty
681 """
682 # Check if object store has any objects
683 for _ in self.object_store:
684 raise AssertionError(
685 "Cannot change object format: repository already contains objects"
686 )
688 # Update the object format
689 from .object_format import get_object_format
691 new_format = get_object_format(object_format_name)
692 self.object_format = new_format
693 self.object_store.object_format = new_format
695 # Update config file
696 config = self.get_config()
698 if object_format_name == "sha1":
699 # For SHA-1, explicitly remove objectformat extension if present
700 try:
701 config.remove("extensions", "objectformat")
702 except KeyError:
703 pass
704 else:
705 # For non-SHA-1 formats, set repositoryformatversion to 1 and objectformat extension
706 config.set("core", "repositoryformatversion", "1")
707 config.set("extensions", "objectformat", object_format_name)
709 config.write_to_path()
711 def fetch(
712 self,
713 target: "BaseRepo",
714 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]]
715 | None = None,
716 progress: Callable[..., None] | None = None,
717 depth: int | None = None,
718 ) -> dict[Ref, ObjectID]:
719 """Fetch objects into another repository.
721 Args:
722 target: The target repository
723 determine_wants: Optional function to determine what refs to
724 fetch.
725 progress: Optional progress function
726 depth: Optional shallow fetch depth
727 Returns: The local refs
728 """
729 # Fix object format if needed
730 if self.object_format != target.object_format:
731 # Change the target repo's format if it's empty
732 target._change_object_format(self.object_format.name)
734 if determine_wants is None:
735 determine_wants = target.object_store.determine_wants_all
736 count, pack_data = self.fetch_pack_data(
737 determine_wants,
738 target.get_graph_walker(),
739 progress=progress,
740 depth=depth,
741 )
742 target.object_store.add_pack_data(count, pack_data, progress)
743 return self.get_refs()
745 def fetch_pack_data(
746 self,
747 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]],
748 graph_walker: "GraphWalker",
749 progress: Callable[[bytes], None] | None,
750 *,
751 get_tagged: Callable[[], dict[ObjectID, ObjectID]] | None = None,
752 depth: int | None = None,
753 ) -> tuple[int, Iterator["UnpackedObject"]]:
754 """Fetch the pack data required for a set of revisions.
756 Args:
757 determine_wants: Function that takes a dictionary with heads
758 and returns the list of heads to fetch.
759 graph_walker: Object that can iterate over the list of revisions
760 to fetch and has an "ack" method that will be called to acknowledge
761 that a revision is present.
762 progress: Simple progress function that will be called with
763 updated progress strings.
764 get_tagged: Function that returns a dict of pointed-to sha ->
765 tag sha for including tags.
766 depth: Shallow fetch depth
767 Returns: count and iterator over pack data
768 """
769 missing_objects = self.find_missing_objects(
770 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth
771 )
772 if missing_objects is None:
773 return 0, iter([])
774 remote_has = missing_objects.get_remote_has()
775 object_ids = list(missing_objects)
776 return len(object_ids), generate_unpacked_objects(
777 self.object_store, object_ids, progress=progress, other_haves=remote_has
778 )
780 def find_missing_objects(
781 self,
782 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]],
783 graph_walker: "GraphWalker",
784 progress: Callable[[bytes], None] | None,
785 *,
786 get_tagged: Callable[[], dict[ObjectID, ObjectID]] | None = None,
787 depth: int | None = None,
788 ) -> MissingObjectFinder | None:
789 """Fetch the missing objects required for a set of revisions.
791 Args:
792 determine_wants: Function that takes a dictionary with heads
793 and returns the list of heads to fetch.
794 graph_walker: Object that can iterate over the list of revisions
795 to fetch and has an "ack" method that will be called to acknowledge
796 that a revision is present.
797 progress: Simple progress function that will be called with
798 updated progress strings.
799 get_tagged: Function that returns a dict of pointed-to sha ->
800 tag sha for including tags.
801 depth: Shallow fetch depth
802 Returns: iterator over objects, with __len__ implemented
803 """
804 import logging
806 # Filter out refs pointing to missing objects to avoid errors downstream.
807 # This makes Dulwich more robust when dealing with broken refs on disk.
808 # Previously serialize_refs() did this filtering as a side-effect.
809 all_refs = self.get_refs()
810 refs: dict[Ref, ObjectID] = {}
811 for ref, sha in all_refs.items():
812 if sha in self.object_store:
813 refs[ref] = sha
814 else:
815 logging.warning(
816 "ref %s points at non-present sha %s",
817 ref.decode("utf-8", "replace"),
818 sha.decode("ascii"),
819 )
821 wants = determine_wants(refs, depth)
822 if not isinstance(wants, list):
823 raise TypeError("determine_wants() did not return a list")
825 current_shallow = set(getattr(graph_walker, "shallow", set()))
827 if depth not in (None, 0):
828 assert depth is not None
829 shallow, not_shallow = find_shallow(self.object_store, wants, depth)
830 # Only update if graph_walker has shallow attribute
831 if hasattr(graph_walker, "shallow"):
832 graph_walker.shallow.update(shallow - not_shallow)
833 new_shallow = graph_walker.shallow - current_shallow
834 unshallow = not_shallow & current_shallow
835 setattr(graph_walker, "unshallow", unshallow)
836 if hasattr(graph_walker, "update_shallow"):
837 graph_walker.update_shallow(new_shallow, unshallow)
838 else:
839 unshallow = getattr(graph_walker, "unshallow", set())
841 if wants == []:
842 # TODO(dborowitz): find a way to short-circuit that doesn't change
843 # this interface.
845 if getattr(graph_walker, "shallow", set()) or unshallow:
846 # Do not send a pack in shallow short-circuit path
847 return None
849 # Return an actual MissingObjectFinder with empty wants
850 return MissingObjectFinder(
851 self.object_store,
852 haves=[],
853 wants=[],
854 )
856 # If the graph walker is set up with an implementation that can
857 # ACK/NAK to the wire, it will write data to the client through
858 # this call as a side-effect.
859 haves = self.object_store.find_common_revisions(graph_walker)
861 # Deal with shallow requests separately because the haves do
862 # not reflect what objects are missing
863 if getattr(graph_walker, "shallow", set()) or unshallow:
864 # TODO: filter the haves commits from iter_shas. the specific
865 # commits aren't missing.
866 haves = []
868 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow)
870 def get_parents(commit: Commit) -> list[ObjectID]:
871 """Get parents for a commit using the parents provider.
873 Args:
874 commit: Commit object
876 Returns:
877 List of parent commit SHAs
878 """
879 return parents_provider.get_parents(commit.id, commit)
881 return MissingObjectFinder(
882 self.object_store,
883 haves=haves,
884 wants=wants,
885 shallow=getattr(graph_walker, "shallow", set()),
886 progress=progress,
887 get_tagged=get_tagged,
888 get_parents=get_parents,
889 )
891 def generate_pack_data(
892 self,
893 have: set[ObjectID],
894 want: set[ObjectID],
895 *,
896 shallow: set[ObjectID] | None = None,
897 progress: Callable[[str], None] | None = None,
898 ofs_delta: bool | None = None,
899 ) -> tuple[int, Iterator["UnpackedObject"]]:
900 """Generate pack data objects for a set of wants/haves.
902 Args:
903 have: List of SHA1s of objects that should not be sent
904 want: List of SHA1s of objects that should be sent
905 shallow: Set of shallow commit SHA1s to skip (defaults to repo's shallow commits)
906 ofs_delta: Whether OFS deltas can be included
907 progress: Optional progress reporting method
908 """
909 if shallow is None:
910 shallow = self.get_shallow()
911 return self.object_store.generate_pack_data(
912 have,
913 want,
914 shallow=shallow,
915 progress=progress,
916 ofs_delta=ofs_delta if ofs_delta is not None else DEFAULT_OFS_DELTA,
917 )
919 def get_graph_walker(
920 self, heads: list[ObjectID] | None = None
921 ) -> ObjectStoreGraphWalker:
922 """Retrieve a graph walker.
924 A graph walker is used by a remote repository (or proxy)
925 to find out which objects are present in this repository.
927 Args:
928 heads: Repository heads to use (optional)
929 Returns: A graph walker object
930 """
931 if heads is None:
932 heads = [
933 sha
934 for sha in self.refs.as_dict(Ref(b"refs/heads")).values()
935 if sha in self.object_store
936 ]
937 parents_provider = ParentsProvider(self.object_store)
938 return ObjectStoreGraphWalker(
939 heads,
940 parents_provider.get_parents,
941 shallow=self.get_shallow(),
942 update_shallow=self.update_shallow,
943 )
945 def get_refs(self) -> dict[Ref, ObjectID]:
946 """Get dictionary with all refs.
948 Returns: A ``dict`` mapping ref names to SHA1s
949 """
950 return self.refs.as_dict()
952 def head(self) -> ObjectID:
953 """Return the SHA1 pointed at by HEAD."""
954 # TODO: move this method to WorkTree
955 return self.refs[HEADREF]
957 def _get_object(self, sha: ObjectID | RawObjectID, cls: type[T]) -> T:
958 assert len(sha) in (
959 self.object_format.oid_length,
960 self.object_format.hex_length,
961 )
962 ret = self.get_object(sha)
963 if not isinstance(ret, cls):
964 if cls is Commit:
965 raise NotCommitError(ret.id)
966 elif cls is Blob:
967 raise NotBlobError(ret.id)
968 elif cls is Tree:
969 raise NotTreeError(ret.id)
970 elif cls is Tag:
971 raise NotTagError(ret.id)
972 else:
973 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}")
974 return ret
976 def get_object(self, sha: ObjectID | RawObjectID) -> ShaFile:
977 """Retrieve the object with the specified SHA.
979 Args:
980 sha: SHA to retrieve
981 Returns: A ShaFile object
982 Raises:
983 KeyError: when the object can not be found
984 """
985 return self.object_store[sha]
987 def parents_provider(self) -> ParentsProvider:
988 """Get a parents provider for this repository.
990 Returns:
991 ParentsProvider instance configured with grafts and shallows
992 """
993 return ParentsProvider(
994 self.object_store,
995 grafts=self._graftpoints,
996 shallows=self.get_shallow(),
997 )
999 def get_parents(
1000 self, sha: ObjectID, commit: Commit | None = None
1001 ) -> list[ObjectID]:
1002 """Retrieve the parents of a specific commit.
1004 If the specific commit is a graftpoint, the graft parents
1005 will be returned instead.
1007 Args:
1008 sha: SHA of the commit for which to retrieve the parents
1009 commit: Optional commit matching the sha
1010 Returns: List of parents
1011 """
1012 return self.parents_provider().get_parents(sha, commit)
1014 def get_config(self) -> "ConfigFile":
1015 """Retrieve the config object.
1017 Returns: `ConfigFile` object for the ``.git/config`` file.
1018 """
1019 raise NotImplementedError(self.get_config)
1021 def get_worktree_config(self) -> "ConfigFile":
1022 """Retrieve the worktree config object."""
1023 raise NotImplementedError(self.get_worktree_config)
1025 def get_description(self) -> bytes | None:
1026 """Retrieve the description for this repository.
1028 Returns: Bytes with the description of the repository
1029 as set by the user.
1030 """
1031 raise NotImplementedError(self.get_description)
1033 def set_description(self, description: bytes) -> None:
1034 """Set the description for this repository.
1036 Args:
1037 description: Text to set as description for this repository.
1038 """
1039 raise NotImplementedError(self.set_description)
1041 def get_rebase_state_manager(self) -> "RebaseStateManager":
1042 """Get the appropriate rebase state manager for this repository.
1044 Returns: RebaseStateManager instance
1045 """
1046 raise NotImplementedError(self.get_rebase_state_manager)
1048 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
1049 """Return a BlobNormalizer object for checkin/checkout operations.
1051 Returns: BlobNormalizer instance
1052 """
1053 raise NotImplementedError(self.get_blob_normalizer)
1055 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
1056 """Read gitattributes for the repository.
1058 Args:
1059 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
1061 Returns:
1062 GitAttributes object that can be used to match paths
1063 """
1064 raise NotImplementedError(self.get_gitattributes)
1066 def get_config_stack(self) -> "StackedConfig":
1067 """Return a config stack for this repository.
1069 This stack accesses the configuration for both this repository
1070 itself (.git/config) and the global configuration, which usually
1071 lives in ~/.gitconfig.
1073 Returns: `Config` instance for this repository
1074 """
1075 from .config import ConfigFile, StackedConfig
1077 local_config = self.get_config()
1078 backends: list[ConfigFile] = [local_config]
1079 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
1080 backends.append(self.get_worktree_config())
1082 backends += StackedConfig.default_backends()
1083 return StackedConfig(backends, writable=local_config)
1085 def get_shallow(self) -> set[ObjectID]:
1086 """Get the set of shallow commits.
1088 Returns: Set of shallow commits.
1089 """
1090 f = self.get_named_file("shallow")
1091 if f is None:
1092 return set()
1093 with f:
1094 return {ObjectID(line.strip()) for line in f}
1096 def update_shallow(
1097 self, new_shallow: set[ObjectID] | None, new_unshallow: set[ObjectID] | None
1098 ) -> None:
1099 """Update the list of shallow objects.
1101 Args:
1102 new_shallow: Newly shallow objects
1103 new_unshallow: Newly no longer shallow objects
1104 """
1105 shallow = self.get_shallow()
1106 if new_shallow:
1107 shallow.update(new_shallow)
1108 if new_unshallow:
1109 shallow.difference_update(new_unshallow)
1110 if shallow:
1111 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow]))
1112 else:
1113 self._del_named_file("shallow")
1115 def get_peeled(self, ref: Ref) -> ObjectID:
1116 """Get the peeled value of a ref.
1118 Args:
1119 ref: The refname to peel.
1120 Returns: The fully-peeled SHA1 of a tag object, after peeling all
1121 intermediate tags; if the original ref does not point to a tag,
1122 this will equal the original SHA1.
1123 """
1124 cached = self.refs.get_peeled(ref)
1125 if cached is not None:
1126 return cached
1127 return peel_sha(self.object_store, self.refs[ref])[1].id
1129 @property
1130 def notes(self) -> "Notes":
1131 """Access notes functionality for this repository.
1133 Returns:
1134 Notes object for accessing notes
1135 """
1136 from .notes import Notes
1138 return Notes(self.object_store, self.refs)
1140 def get_walker(
1141 self,
1142 include: Sequence[ObjectID] | None = None,
1143 exclude: Sequence[ObjectID] | None = None,
1144 order: str = "date",
1145 reverse: bool = False,
1146 max_entries: int | None = None,
1147 paths: Sequence[bytes] | None = None,
1148 rename_detector: "RenameDetector | None" = None,
1149 follow: bool = False,
1150 since: int | None = None,
1151 until: int | None = None,
1152 queue_cls: type | None = None,
1153 ) -> "Walker":
1154 """Obtain a walker for this repository.
1156 Args:
1157 include: Iterable of SHAs of commits to include along with their
1158 ancestors. Defaults to [HEAD]
1159 exclude: Iterable of SHAs of commits to exclude along with their
1160 ancestors, overriding includes.
1161 order: ORDER_* constant specifying the order of results.
1162 Anything other than ORDER_DATE may result in O(n) memory usage.
1163 reverse: If True, reverse the order of output, requiring O(n)
1164 memory.
1165 max_entries: The maximum number of entries to yield, or None for
1166 no limit.
1167 paths: Iterable of file or subtree paths to show entries for.
1168 rename_detector: diff.RenameDetector object for detecting
1169 renames.
1170 follow: If True, follow path across renames/copies. Forces a
1171 default rename_detector.
1172 since: Timestamp to list commits after.
1173 until: Timestamp to list commits before.
1174 queue_cls: A class to use for a queue of commits, supporting the
1175 iterator protocol. The constructor takes a single argument, the Walker.
1177 Returns: A `Walker` object
1178 """
1179 from .walk import Walker, _CommitTimeQueue
1181 if include is None:
1182 include = [self.head()]
1184 # Pass all arguments to Walker explicitly to avoid type issues with **kwargs
1185 return Walker(
1186 self.object_store,
1187 include,
1188 exclude=exclude,
1189 order=order,
1190 reverse=reverse,
1191 max_entries=max_entries,
1192 paths=paths,
1193 rename_detector=rename_detector,
1194 follow=follow,
1195 since=since,
1196 until=until,
1197 get_parents=lambda commit: self.get_parents(commit.id, commit),
1198 queue_cls=queue_cls if queue_cls is not None else _CommitTimeQueue,
1199 )
1201 def __getitem__(self, name: ObjectID | Ref | bytes) -> "ShaFile":
1202 """Retrieve a Git object by SHA1 or ref.
1204 Args:
1205 name: A Git object SHA1 or a ref name
1206 Returns: A `ShaFile` object, such as a Commit or Blob
1207 Raises:
1208 KeyError: when the specified ref or object does not exist
1209 """
1210 if not isinstance(name, bytes):
1211 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}")
1212 # If it looks like a ref name, only try refs
1213 if name == b"HEAD" or name.startswith(b"refs/"):
1214 try:
1215 return self.object_store[self.refs[Ref(name)]]
1216 except (RefFormatError, KeyError):
1217 pass
1218 # Otherwise, try as object ID if length matches
1219 if len(name) in (
1220 self.object_store.object_format.oid_length,
1221 self.object_store.object_format.hex_length,
1222 ):
1223 try:
1224 return self.object_store[
1225 ObjectID(name)
1226 if len(name) == self.object_store.object_format.hex_length
1227 else RawObjectID(name)
1228 ]
1229 except (KeyError, ValueError):
1230 pass
1231 # If nothing worked, raise KeyError
1232 raise KeyError(name)
1234 def __contains__(self, name: bytes) -> bool:
1235 """Check if a specific Git object or ref is present.
1237 Args:
1238 name: Git object SHA1/SHA256 or ref name
1239 """
1240 if len(name) == 20:
1241 return RawObjectID(name) in self.object_store or Ref(name) in self.refs
1242 elif len(name) == 40 and valid_hexsha(name):
1243 return ObjectID(name) in self.object_store or Ref(name) in self.refs
1244 # Check if it's a binary or hex SHA
1245 if len(name) == self.object_format.oid_length:
1246 return RawObjectID(name) in self.object_store or Ref(name) in self.refs
1247 elif len(name) == self.object_format.hex_length and valid_hexsha(name):
1248 return ObjectID(name) in self.object_store or Ref(name) in self.refs
1249 else:
1250 return Ref(name) in self.refs
1252 def __setitem__(self, name: bytes, value: ShaFile | bytes) -> None:
1253 """Set a ref.
1255 Args:
1256 name: ref name
1257 value: Ref value - either a ShaFile object, or a hex sha
1258 """
1259 if name.startswith(b"refs/") or name == HEADREF:
1260 ref_name = Ref(name)
1261 if isinstance(value, ShaFile):
1262 self.refs[ref_name] = value.id
1263 elif isinstance(value, bytes):
1264 self.refs[ref_name] = ObjectID(value)
1265 else:
1266 raise TypeError(value)
1267 else:
1268 raise ValueError(name)
1270 def __delitem__(self, name: bytes) -> None:
1271 """Remove a ref.
1273 Args:
1274 name: Name of the ref to remove
1275 """
1276 if name.startswith(b"refs/") or name == HEADREF:
1277 del self.refs[Ref(name)]
1278 else:
1279 raise ValueError(name)
1281 def _get_user_identity(
1282 self, config: "StackedConfig", kind: str | None = None
1283 ) -> bytes:
1284 """Determine the identity to use for new commits."""
1285 warnings.warn(
1286 "use get_user_identity() rather than Repo._get_user_identity",
1287 DeprecationWarning,
1288 )
1289 return get_user_identity(config)
1291 def _add_graftpoints(
1292 self, updated_graftpoints: dict[ObjectID, list[ObjectID]]
1293 ) -> None:
1294 """Add or modify graftpoints.
1296 Args:
1297 updated_graftpoints: Dict of commit shas to list of parent shas
1298 """
1299 # Simple validation
1300 for commit, parents in updated_graftpoints.items():
1301 for sha in [commit, *parents]:
1302 check_hexsha(sha, "Invalid graftpoint")
1304 self._graftpoints.update(updated_graftpoints)
1306 def _remove_graftpoints(self, to_remove: Sequence[ObjectID] = ()) -> None:
1307 """Remove graftpoints.
1309 Args:
1310 to_remove: List of commit shas
1311 """
1312 for sha in to_remove:
1313 del self._graftpoints[sha]
1315 def _read_heads(self, name: str) -> list[ObjectID]:
1316 f = self.get_named_file(name)
1317 if f is None:
1318 return []
1319 with f:
1320 return [ObjectID(line.strip()) for line in f.readlines() if line.strip()]
1322 def get_worktree(self) -> "WorkTree":
1323 """Get the working tree for this repository.
1325 Returns:
1326 WorkTree instance for performing working tree operations
1328 Raises:
1329 NotImplementedError: If the repository doesn't support working trees
1330 """
1331 raise NotImplementedError(
1332 "Working tree operations not supported by this repository type"
1333 )
1336def read_gitfile(f: BinaryIO) -> str:
1337 """Read a ``.git`` file.
1339 The first line of the file should start with "gitdir: "
1341 Args:
1342 f: File-like object to read from
1343 Returns: A path
1344 """
1345 cs = f.read()
1346 if not cs.startswith(b"gitdir: "):
1347 raise ValueError("Expected file to start with 'gitdir: '")
1348 return cs[len(b"gitdir: ") :].rstrip(b"\r\n").decode("utf-8")
1351class UnsupportedVersion(Exception):
1352 """Unsupported repository version."""
1354 def __init__(self, version: int) -> None:
1355 """Initialize UnsupportedVersion exception.
1357 Args:
1358 version: The unsupported repository version
1359 """
1360 self.version = version
1363class UnsupportedExtension(Exception):
1364 """Unsupported repository extension."""
1366 def __init__(self, extension: str) -> None:
1367 """Initialize UnsupportedExtension exception.
1369 Args:
1370 extension: The unsupported repository extension
1371 """
1372 self.extension = extension
1375class Repo(BaseRepo):
1376 """A git repository backed by local disk.
1378 To open an existing repository, call the constructor with
1379 the path of the repository.
1381 To create a new repository, use the Repo.init class method.
1383 Note that a repository object may hold on to resources such
1384 as file handles for performance reasons; call .close() to free
1385 up those resources.
1387 Attributes:
1388 path: Path to the working copy (if it exists) or repository control
1389 directory (if the repository is bare)
1390 bare: Whether this is a bare repository
1391 """
1393 path: str
1394 bare: bool
1395 object_store: DiskObjectStore
1396 filter_context: "FilterContext | None"
1398 def __init__(
1399 self,
1400 root: str | bytes | os.PathLike[str],
1401 object_store: PackBasedObjectStore | None = None,
1402 bare: bool | None = None,
1403 ) -> None:
1404 """Open a repository on disk.
1406 Args:
1407 root: Path to the repository's root.
1408 object_store: ObjectStore to use; if omitted, we use the
1409 repository's default object store
1410 bare: True if this is a bare repository.
1411 """
1412 root = os.fspath(root)
1413 if isinstance(root, bytes):
1414 root = os.fsdecode(root)
1415 hidden_path = os.path.join(root, CONTROLDIR)
1416 if bare is None:
1417 if os.path.isfile(hidden_path) or os.path.isdir(
1418 os.path.join(hidden_path, OBJECTDIR)
1419 ):
1420 bare = False
1421 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(
1422 os.path.join(root, REFSDIR)
1423 ):
1424 bare = True
1425 else:
1426 raise NotGitRepository(
1427 "No git repository was found at {path}".format(**dict(path=root))
1428 )
1430 self.bare = bare
1431 if bare is False:
1432 if os.path.isfile(hidden_path):
1433 with open(hidden_path, "rb") as f:
1434 path = read_gitfile(f)
1435 self._controldir = os.path.join(root, path)
1436 else:
1437 self._controldir = hidden_path
1438 else:
1439 self._controldir = root
1440 commondir = self.get_named_file(COMMONDIR)
1441 if commondir is not None:
1442 with commondir:
1443 self._commondir = os.path.join(
1444 self.controldir(),
1445 os.fsdecode(commondir.read().rstrip(b"\r\n")),
1446 )
1447 else:
1448 self._commondir = self._controldir
1449 self.path = root
1451 # Initialize refs early so they're available for config condition matchers
1452 self.refs = DiskRefsContainer(
1453 self.commondir(), self._controldir, logger=self._write_reflog
1454 )
1456 # Initialize worktrees container
1457 from .worktree import WorkTreeContainer
1459 self.worktrees = WorkTreeContainer(self)
1461 config = self.get_config()
1462 try:
1463 repository_format_version = config.get("core", "repositoryformatversion")
1464 format_version = (
1465 0
1466 if repository_format_version is None
1467 else int(repository_format_version)
1468 )
1469 except KeyError:
1470 format_version = 0
1472 if format_version not in (0, 1):
1473 raise UnsupportedVersion(format_version)
1475 # Track extensions we encounter
1476 has_reftable_extension = False
1477 for extension, value in config.items((b"extensions",)):
1478 if extension.lower() == b"refstorage":
1479 if value == b"reftable":
1480 has_reftable_extension = True
1481 else:
1482 raise UnsupportedExtension(f"refStorage = {value.decode()}")
1483 elif extension.lower() not in (
1484 b"worktreeconfig",
1485 b"objectformat",
1486 b"relativeworktrees",
1487 ):
1488 raise UnsupportedExtension(extension.decode("utf-8"))
1490 if object_store is None:
1491 # Get shared repository permissions from config
1492 try:
1493 shared_value = config.get(("core",), "sharedRepository")
1494 file_mode, dir_mode = parse_shared_repository(shared_value)
1495 except KeyError:
1496 file_mode, dir_mode = None, None
1498 object_store = DiskObjectStore.from_config(
1499 os.path.join(self.commondir(), OBJECTDIR),
1500 config,
1501 file_mode=file_mode,
1502 dir_mode=dir_mode,
1503 )
1505 # Use reftable if extension is configured
1506 if has_reftable_extension:
1507 from .reftable import ReftableRefsContainer
1509 self.refs = ReftableRefsContainer(self.commondir())
1510 # Update worktrees container after refs change
1511 self.worktrees = WorkTreeContainer(self)
1512 BaseRepo.__init__(self, object_store, self.refs)
1514 # Determine hash algorithm from config if not already set
1515 if self.object_format is None:
1516 from .object_format import DEFAULT_OBJECT_FORMAT, get_object_format
1518 if format_version == 1:
1519 try:
1520 object_format = config.get((b"extensions",), b"objectformat")
1521 self.object_format = get_object_format(
1522 object_format.decode("ascii")
1523 )
1524 except KeyError:
1525 self.object_format = DEFAULT_OBJECT_FORMAT
1526 else:
1527 self.object_format = DEFAULT_OBJECT_FORMAT
1529 self._graftpoints = {}
1530 graft_file = self.get_named_file(
1531 os.path.join("info", "grafts"), basedir=self.commondir()
1532 )
1533 if graft_file:
1534 with graft_file:
1535 self._graftpoints.update(parse_graftpoints(graft_file))
1536 graft_file = self.get_named_file("shallow", basedir=self.commondir())
1537 if graft_file:
1538 with graft_file:
1539 self._graftpoints.update(parse_graftpoints(graft_file))
1541 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir())
1542 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir())
1543 self.hooks["post-commit"] = PostCommitShellHook(self.controldir())
1544 self.hooks["pre-receive"] = PreReceiveShellHook(self.controldir())
1545 self.hooks["update"] = UpdateShellHook(self.controldir())
1546 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir())
1548 # Initialize filter context as None, will be created lazily
1549 self.filter_context = None
1551 def get_worktree(self) -> "WorkTree":
1552 """Get the working tree for this repository.
1554 Returns:
1555 WorkTree instance for performing working tree operations
1556 """
1557 from .worktree import WorkTree
1559 return WorkTree(self, self.path)
1561 def _write_reflog(
1562 self,
1563 ref: bytes,
1564 old_sha: bytes,
1565 new_sha: bytes,
1566 committer: bytes | None,
1567 timestamp: int | None,
1568 timezone: int | None,
1569 message: bytes,
1570 ) -> None:
1571 from .reflog import format_reflog_line
1573 path = self._reflog_path(ref)
1575 # Get shared repository permissions
1576 file_mode, dir_mode = self._get_shared_repository_permissions()
1578 # Create directory with appropriate permissions
1579 parent_dir = os.path.dirname(path)
1580 # Create directory tree, setting permissions on each level if needed
1581 parts = []
1582 current = parent_dir
1583 while current and not os.path.exists(current):
1584 parts.append(current)
1585 current = os.path.dirname(current)
1586 parts.reverse()
1587 for part in parts:
1588 os.mkdir(part)
1589 if dir_mode is not None:
1590 os.chmod(part, dir_mode)
1591 if committer is None:
1592 config = self.get_config_stack()
1593 committer = get_user_identity(config)
1594 check_user_identity(committer)
1595 if timestamp is None:
1596 timestamp = int(time.time())
1597 if timezone is None:
1598 timezone = 0 # FIXME
1599 with open(path, "ab") as f:
1600 f.write(
1601 format_reflog_line(
1602 old_sha, new_sha, committer, timestamp, timezone, message
1603 )
1604 + b"\n"
1605 )
1607 # Set file permissions (open() respects umask, so we need chmod to set the actual mode)
1608 # Always chmod to ensure correct permissions even if file already existed
1609 if file_mode is not None:
1610 os.chmod(path, file_mode)
1612 def _reflog_path(self, ref: bytes) -> str:
1613 if ref.startswith((b"main-worktree/", b"worktrees/")):
1614 raise NotImplementedError(f"refs {ref.decode()} are not supported")
1616 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir()
1617 return os.path.join(base, "logs", os.fsdecode(ref))
1619 def read_reflog(self, ref: bytes) -> Generator[reflog.Entry, None, None]:
1620 """Read reflog entries for a reference.
1622 Args:
1623 ref: Reference name (e.g. b'HEAD', b'refs/heads/master')
1625 Yields:
1626 reflog.Entry objects in chronological order (oldest first)
1627 """
1628 from .reflog import read_reflog
1630 path = self._reflog_path(ref)
1631 try:
1632 with open(path, "rb") as f:
1633 yield from read_reflog(f)
1634 except FileNotFoundError:
1635 return
1637 @classmethod
1638 def discover(cls, start: str | bytes | os.PathLike[str] = ".") -> "Repo":
1639 """Iterate parent directories to discover a repository.
1641 Return a Repo object for the first parent directory that looks like a
1642 Git repository.
1644 Args:
1645 start: The directory to start discovery from (defaults to '.')
1646 """
1647 path = os.path.abspath(start)
1648 while True:
1649 try:
1650 return cls(path)
1651 except NotGitRepository:
1652 new_path, _tail = os.path.split(path)
1653 if new_path == path: # Root reached
1654 break
1655 path = new_path
1656 start_str = os.fspath(start)
1657 if isinstance(start_str, bytes):
1658 start_str = start_str.decode("utf-8")
1659 raise NotGitRepository(f"No git repository was found at {start_str}")
1661 def controldir(self) -> str:
1662 """Return the path of the control directory."""
1663 return self._controldir
1665 def commondir(self) -> str:
1666 """Return the path of the common directory.
1668 For a main working tree, it is identical to controldir().
1670 For a linked working tree, it is the control directory of the
1671 main working tree.
1672 """
1673 return self._commondir
1675 def _determine_file_mode(self) -> bool:
1676 """Probe the file-system to determine whether permissions can be trusted.
1678 Returns: True if permissions can be trusted, False otherwise.
1679 """
1680 fname = os.path.join(self.path, ".probe-permissions")
1681 with open(fname, "w") as f:
1682 f.write("")
1684 st1 = os.lstat(fname)
1685 try:
1686 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR)
1687 except PermissionError:
1688 return False
1689 st2 = os.lstat(fname)
1691 os.unlink(fname)
1693 mode_differs = st1.st_mode != st2.st_mode
1694 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0
1696 return mode_differs and st2_has_exec
1698 def _determine_symlinks(self) -> bool:
1699 """Probe the filesystem to determine whether symlinks can be created.
1701 Returns: True if symlinks can be created, False otherwise.
1702 """
1703 # TODO(jelmer): Actually probe disk / look at filesystem
1704 return sys.platform != "win32"
1706 def _get_shared_repository_permissions(
1707 self,
1708 ) -> tuple[int | None, int | None]:
1709 """Get shared repository file and directory permissions from config.
1711 Returns:
1712 tuple of (file_mask, directory_mask) or (None, None) if not shared
1713 """
1714 try:
1715 config = self.get_config()
1716 value = config.get(("core",), "sharedRepository")
1717 return parse_shared_repository(value)
1718 except KeyError:
1719 return (None, None)
1721 def _put_named_file(self, path: str, contents: bytes) -> None:
1722 """Write a file to the control dir with the given name and contents.
1724 Args:
1725 path: The path to the file, relative to the control dir.
1726 contents: A string to write to the file.
1727 """
1728 path = path.lstrip(os.path.sep)
1730 # Get shared repository permissions
1731 file_mode, _ = self._get_shared_repository_permissions()
1733 # Create file with appropriate permissions
1734 if file_mode is not None:
1735 with GitFile(
1736 os.path.join(self.controldir(), path), "wb", mask=file_mode
1737 ) as f:
1738 f.write(contents)
1739 else:
1740 with GitFile(os.path.join(self.controldir(), path), "wb") as f:
1741 f.write(contents)
1743 def _del_named_file(self, path: str) -> None:
1744 try:
1745 os.unlink(os.path.join(self.controldir(), path))
1746 except FileNotFoundError:
1747 return
1749 def get_named_file(
1750 self,
1751 path: str | bytes,
1752 basedir: str | None = None,
1753 ) -> BinaryIO | None:
1754 """Get a file from the control dir with a specific name.
1756 Although the filename should be interpreted as a filename relative to
1757 the control dir in a disk-based Repo, the object returned need not be
1758 pointing to a file in that location.
1760 Args:
1761 path: The path to the file, relative to the control dir.
1762 basedir: Optional argument that specifies an alternative to the
1763 control dir.
1764 Returns: An open file object, or None if the file does not exist.
1765 """
1766 # TODO(dborowitz): sanitize filenames, since this is used directly by
1767 # the dumb web serving code.
1768 if basedir is None:
1769 basedir = self.controldir()
1770 if isinstance(path, bytes):
1771 path = path.decode("utf-8")
1772 path = path.lstrip(os.path.sep)
1773 try:
1774 return open(os.path.join(basedir, path), "rb")
1775 except FileNotFoundError:
1776 return None
1778 def index_path(self) -> str:
1779 """Return path to the index file."""
1780 return os.path.join(self.controldir(), INDEX_FILENAME)
1782 def open_index(self) -> "Index":
1783 """Open the index for this repository.
1785 Raises:
1786 NoIndexPresent: If no index is present
1787 Returns: The matching `Index`
1788 """
1789 from .index import Index
1791 if not self.has_index():
1792 raise NoIndexPresent
1794 # Check for manyFiles feature configuration
1795 config = self.get_config_stack()
1796 many_files = config.get_boolean(b"feature", b"manyFiles", False)
1797 skip_hash = False
1798 index_version = None
1800 if many_files:
1801 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true
1802 try:
1803 index_version_str = config.get(b"index", b"version")
1804 index_version = int(index_version_str)
1805 except KeyError:
1806 index_version = 4 # Default to version 4 for manyFiles
1807 skip_hash = config.get_boolean(b"index", b"skipHash", True)
1808 else:
1809 # Check for explicit index settings
1810 try:
1811 index_version_str = config.get(b"index", b"version")
1812 index_version = int(index_version_str)
1813 except KeyError:
1814 index_version = None
1815 skip_hash = config.get_boolean(b"index", b"skipHash", False)
1817 # Get shared repository permissions for index file
1818 file_mode, _ = self._get_shared_repository_permissions()
1820 return Index(
1821 self.index_path(),
1822 skip_hash=skip_hash,
1823 version=index_version,
1824 file_mode=file_mode,
1825 )
1827 def has_index(self) -> bool:
1828 """Check if an index is present."""
1829 # Bare repos must never have index files; non-bare repos may have a
1830 # missing index file, which is treated as empty.
1831 return not self.bare
1833 def clone(
1834 self,
1835 target_path: str | bytes | os.PathLike[str],
1836 *,
1837 mkdir: bool = True,
1838 bare: bool = False,
1839 origin: bytes = b"origin",
1840 checkout: bool | None = None,
1841 branch: bytes | None = None,
1842 progress: Callable[[str], None] | None = None,
1843 depth: int | None = None,
1844 symlinks: bool | None = None,
1845 ) -> "Repo":
1846 """Clone this repository.
1848 Args:
1849 target_path: Target path
1850 mkdir: Create the target directory
1851 bare: Whether to create a bare repository
1852 checkout: Whether or not to check-out HEAD after cloning
1853 origin: Base name for refs in target repository
1854 cloned from this repository
1855 branch: Optional branch or tag to be used as HEAD in the new repository
1856 instead of this repository's HEAD.
1857 progress: Optional progress function
1858 depth: Depth at which to fetch
1859 symlinks: Symlinks setting (default to autodetect)
1860 Returns: Created repository as `Repo`
1861 """
1862 encoded_path = os.fsencode(self.path)
1864 if mkdir:
1865 os.mkdir(target_path)
1867 try:
1868 if not bare:
1869 target = Repo.init(target_path, symlinks=symlinks)
1870 if checkout is None:
1871 checkout = True
1872 else:
1873 if checkout:
1874 raise ValueError("checkout and bare are incompatible")
1875 target = Repo.init_bare(target_path)
1877 try:
1878 target_config = target.get_config()
1879 target_config.set((b"remote", origin), b"url", encoded_path)
1880 target_config.set(
1881 (b"remote", origin),
1882 b"fetch",
1883 b"+refs/heads/*:refs/remotes/" + origin + b"/*",
1884 )
1885 target_config.write_to_path()
1887 ref_message = b"clone: from " + encoded_path
1888 self.fetch(target, depth=depth)
1889 target.refs.import_refs(
1890 Ref(b"refs/remotes/" + origin),
1891 self.refs.as_dict(Ref(b"refs/heads")),
1892 message=ref_message,
1893 )
1894 target.refs.import_refs(
1895 Ref(b"refs/tags"),
1896 self.refs.as_dict(Ref(b"refs/tags")),
1897 message=ref_message,
1898 )
1900 head_chain, origin_sha = self.refs.follow(HEADREF)
1901 origin_head = head_chain[-1] if head_chain else None
1902 if origin_sha and not origin_head:
1903 # set detached HEAD
1904 target.refs[HEADREF] = origin_sha
1905 else:
1906 _set_origin_head(target.refs, origin, origin_head)
1907 head_ref = _set_default_branch(
1908 target.refs, origin, origin_head, branch, ref_message
1909 )
1911 # Update target head
1912 if head_ref:
1913 head = _set_head(target.refs, head_ref, ref_message)
1914 else:
1915 head = None
1917 if checkout and head is not None:
1918 target.get_worktree().reset_index()
1919 except BaseException:
1920 target.close()
1921 raise
1922 except BaseException:
1923 if mkdir:
1924 import shutil
1926 shutil.rmtree(target_path)
1927 raise
1928 return target
1930 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]:
1931 """Get condition matchers for includeIf conditions.
1933 Returns a dict of condition prefix to matcher function.
1934 """
1935 from pathlib import Path
1937 from .config import ConditionMatcher, match_glob_pattern
1939 # Add gitdir matchers
1940 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool:
1941 """Match gitdir against a pattern.
1943 Args:
1944 pattern: Pattern to match against
1945 case_sensitive: Whether to match case-sensitively
1947 Returns:
1948 True if gitdir matches pattern
1949 """
1950 # Handle relative patterns (starting with ./)
1951 if pattern.startswith("./"):
1952 # Can't handle relative patterns without config directory context
1953 return False
1955 # Normalize repository path
1956 try:
1957 repo_path = str(Path(self._controldir).resolve())
1958 except (OSError, ValueError):
1959 return False
1961 # Expand ~ in pattern and normalize
1962 pattern = os.path.expanduser(pattern)
1964 # Normalize pattern following Git's rules
1965 pattern = pattern.replace("\\", "/")
1966 if not pattern.startswith(("~/", "./", "/", "**")):
1967 # Check for Windows absolute path
1968 if len(pattern) >= 2 and pattern[1] == ":":
1969 pass
1970 else:
1971 pattern = "**/" + pattern
1972 if pattern.endswith("/"):
1973 pattern = pattern + "**"
1975 # Use the existing _match_gitdir_pattern function
1976 from .config import _match_gitdir_pattern
1978 pattern_bytes = pattern.encode("utf-8", errors="replace")
1979 repo_path_bytes = repo_path.encode("utf-8", errors="replace")
1981 return _match_gitdir_pattern(
1982 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive
1983 )
1985 # Add onbranch matcher
1986 def match_onbranch(pattern: str) -> bool:
1987 """Match current branch against a pattern.
1989 Args:
1990 pattern: Pattern to match against
1992 Returns:
1993 True if current branch matches pattern
1994 """
1995 try:
1996 # Get the current branch using refs
1997 ref_chain, _ = self.refs.follow(HEADREF)
1998 head_ref = ref_chain[-1] # Get the final resolved ref
1999 except KeyError:
2000 pass
2001 else:
2002 if head_ref and head_ref.startswith(b"refs/heads/"):
2003 # Extract branch name from ref
2004 branch = extract_branch_name(head_ref).decode(
2005 "utf-8", errors="replace"
2006 )
2007 return match_glob_pattern(branch, pattern)
2008 return False
2010 matchers: dict[str, ConditionMatcher] = {
2011 "onbranch:": match_onbranch,
2012 "gitdir:": lambda pattern: match_gitdir(pattern, True),
2013 "gitdir/i:": lambda pattern: match_gitdir(pattern, False),
2014 }
2016 return matchers
2018 def get_worktree_config(self) -> "ConfigFile":
2019 """Get the worktree-specific config.
2021 Returns:
2022 ConfigFile object for the worktree config
2023 """
2024 from .config import ConfigFile
2026 path = os.path.join(self.commondir(), "config.worktree")
2027 try:
2028 # Pass condition matchers for includeIf evaluation
2029 condition_matchers = self._get_config_condition_matchers()
2030 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
2031 except FileNotFoundError:
2032 cf = ConfigFile()
2033 cf.path = path
2034 return cf
2036 def get_config(self) -> "ConfigFile":
2037 """Retrieve the config object.
2039 Returns: `ConfigFile` object for the ``.git/config`` file.
2040 """
2041 from .config import ConfigFile
2043 path = os.path.join(self._commondir, "config")
2044 try:
2045 # Pass condition matchers for includeIf evaluation
2046 condition_matchers = self._get_config_condition_matchers()
2047 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
2048 except FileNotFoundError:
2049 ret = ConfigFile()
2050 ret.path = path
2051 return ret
2053 def get_rebase_state_manager(self) -> "RebaseStateManager":
2054 """Get the appropriate rebase state manager for this repository.
2056 Returns: DiskRebaseStateManager instance
2057 """
2058 import os
2060 from .rebase import DiskRebaseStateManager
2062 path = os.path.join(self.controldir(), "rebase-merge")
2063 return DiskRebaseStateManager(path)
2065 def get_description(self) -> bytes | None:
2066 """Retrieve the description of this repository.
2068 Returns: Description as bytes or None.
2069 """
2070 path = os.path.join(self._controldir, "description")
2071 try:
2072 with GitFile(path, "rb") as f:
2073 return f.read()
2074 except FileNotFoundError:
2075 return None
2077 def __repr__(self) -> str:
2078 """Return string representation of this repository."""
2079 return f"<Repo at {self.path!r}>"
2081 def set_description(self, description: bytes) -> None:
2082 """Set the description for this repository.
2084 Args:
2085 description: Text to set as description for this repository.
2086 """
2087 self._put_named_file("description", description)
2089 @classmethod
2090 def _init_maybe_bare(
2091 cls,
2092 path: str | bytes | os.PathLike[str],
2093 controldir: str | bytes | os.PathLike[str],
2094 bare: bool,
2095 object_store: PackBasedObjectStore | None = None,
2096 config: "StackedConfig | None" = None,
2097 default_branch: bytes | None = None,
2098 symlinks: bool | None = None,
2099 format: int | None = None,
2100 shared_repository: str | bool | None = None,
2101 object_format: str | None = None,
2102 ) -> "Repo":
2103 path = os.fspath(path)
2104 if isinstance(path, bytes):
2105 path = os.fsdecode(path)
2106 controldir = os.fspath(controldir)
2107 if isinstance(controldir, bytes):
2108 controldir = os.fsdecode(controldir)
2110 # Determine shared repository permissions early
2111 file_mode: int | None = None
2112 dir_mode: int | None = None
2113 if shared_repository is not None:
2114 file_mode, dir_mode = parse_shared_repository(shared_repository)
2116 # Create base directories with appropriate permissions
2117 for d in BASE_DIRECTORIES:
2118 dir_path = os.path.join(controldir, *d)
2119 os.mkdir(dir_path)
2120 if dir_mode is not None:
2121 os.chmod(dir_path, dir_mode)
2123 # Determine hash algorithm
2124 from .object_format import get_object_format
2126 hash_alg = get_object_format(object_format)
2128 if object_store is None:
2129 object_store = DiskObjectStore.init(
2130 os.path.join(controldir, OBJECTDIR),
2131 file_mode=file_mode,
2132 dir_mode=dir_mode,
2133 object_format=hash_alg,
2134 )
2135 ret = cls(path, bare=bare, object_store=object_store)
2136 if default_branch is None:
2137 if config is None:
2138 from .config import StackedConfig
2140 config = StackedConfig.default()
2141 try:
2142 default_branch = config.get("init", "defaultBranch")
2143 except KeyError:
2144 default_branch = DEFAULT_BRANCH
2145 ret.refs.set_symbolic_ref(HEADREF, local_branch_name(default_branch))
2146 ret._init_files(
2147 bare=bare,
2148 symlinks=symlinks,
2149 format=format,
2150 shared_repository=shared_repository,
2151 object_format=object_format,
2152 )
2153 return ret
2155 @classmethod
2156 def init(
2157 cls,
2158 path: str | bytes | os.PathLike[str],
2159 *,
2160 mkdir: bool = False,
2161 config: "StackedConfig | None" = None,
2162 default_branch: bytes | None = None,
2163 symlinks: bool | None = None,
2164 format: int | None = None,
2165 shared_repository: str | bool | None = None,
2166 object_format: str | None = None,
2167 ) -> "Repo":
2168 """Create a new repository.
2170 Args:
2171 path: Path in which to create the repository
2172 mkdir: Whether to create the directory
2173 config: Configuration object
2174 default_branch: Default branch name
2175 symlinks: Whether to support symlinks
2176 format: Repository format version (defaults to 0)
2177 shared_repository: Shared repository setting (group, all, umask, or octal)
2178 object_format: Object format to use ("sha1" or "sha256", defaults to "sha1")
2179 Returns: `Repo` instance
2180 """
2181 path = os.fspath(path)
2182 if isinstance(path, bytes):
2183 path = os.fsdecode(path)
2184 if mkdir:
2185 os.mkdir(path)
2186 controldir = os.path.join(path, CONTROLDIR)
2187 os.mkdir(controldir)
2188 _set_filesystem_hidden(controldir)
2189 return cls._init_maybe_bare(
2190 path,
2191 controldir,
2192 False,
2193 config=config,
2194 default_branch=default_branch,
2195 symlinks=symlinks,
2196 format=format,
2197 shared_repository=shared_repository,
2198 object_format=object_format,
2199 )
2201 @classmethod
2202 def _init_new_working_directory(
2203 cls,
2204 path: str | bytes | os.PathLike[str],
2205 main_repo: "Repo",
2206 identifier: str | None = None,
2207 mkdir: bool = False,
2208 ) -> "Repo":
2209 """Create a new working directory linked to a repository.
2211 Args:
2212 path: Path in which to create the working tree.
2213 main_repo: Main repository to reference
2214 identifier: Worktree identifier
2215 mkdir: Whether to create the directory
2216 Returns: `Repo` instance
2217 """
2218 path = os.fspath(path)
2219 if isinstance(path, bytes):
2220 path = os.fsdecode(path)
2221 if mkdir:
2222 os.mkdir(path)
2223 if identifier is None:
2224 identifier = os.path.basename(path)
2225 # Ensure we use absolute path for the worktree control directory
2226 main_controldir = os.path.abspath(main_repo.controldir())
2227 main_worktreesdir = os.path.join(main_controldir, WORKTREES)
2228 worktree_controldir = os.path.join(main_worktreesdir, identifier)
2229 gitdirfile = os.path.join(path, CONTROLDIR)
2230 with open(gitdirfile, "wb") as f:
2231 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n")
2233 # Get shared repository permissions from main repository
2234 _, dir_mode = main_repo._get_shared_repository_permissions()
2236 # Create directories with appropriate permissions
2237 try:
2238 os.mkdir(main_worktreesdir)
2239 if dir_mode is not None:
2240 os.chmod(main_worktreesdir, dir_mode)
2241 except FileExistsError:
2242 pass
2243 try:
2244 os.mkdir(worktree_controldir)
2245 if dir_mode is not None:
2246 os.chmod(worktree_controldir, dir_mode)
2247 except FileExistsError:
2248 pass
2249 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f:
2250 f.write(os.fsencode(gitdirfile) + b"\n")
2251 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f:
2252 f.write(b"../..\n")
2253 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f:
2254 f.write(main_repo.head() + b"\n")
2255 r = cls(os.path.normpath(path))
2256 r.get_worktree().reset_index()
2257 return r
2259 @classmethod
2260 def init_bare(
2261 cls,
2262 path: str | bytes | os.PathLike[str],
2263 *,
2264 mkdir: bool = False,
2265 object_store: PackBasedObjectStore | None = None,
2266 config: "StackedConfig | None" = None,
2267 default_branch: bytes | None = None,
2268 format: int | None = None,
2269 shared_repository: str | bool | None = None,
2270 object_format: str | None = None,
2271 ) -> "Repo":
2272 """Create a new bare repository.
2274 ``path`` should already exist and be an empty directory.
2276 Args:
2277 path: Path to create bare repository in
2278 mkdir: Whether to create the directory
2279 object_store: Object store to use
2280 config: Configuration object
2281 default_branch: Default branch name
2282 format: Repository format version (defaults to 0)
2283 shared_repository: Shared repository setting (group, all, umask, or octal)
2284 object_format: Object format to use ("sha1" or "sha256", defaults to "sha1")
2285 Returns: a `Repo` instance
2286 """
2287 path = os.fspath(path)
2288 if isinstance(path, bytes):
2289 path = os.fsdecode(path)
2290 if mkdir:
2291 os.mkdir(path)
2292 return cls._init_maybe_bare(
2293 path,
2294 path,
2295 True,
2296 object_store=object_store,
2297 config=config,
2298 default_branch=default_branch,
2299 format=format,
2300 shared_repository=shared_repository,
2301 object_format=object_format,
2302 )
2304 create = init_bare
2306 def close(self) -> None:
2307 """Close any files opened by this repository."""
2308 self.object_store.close()
2309 # Clean up filter context if it was created
2310 if self.filter_context is not None:
2311 self.filter_context.close()
2312 self.filter_context = None
2314 def __enter__(self) -> "Repo":
2315 """Enter context manager."""
2316 return self
2318 def __exit__(
2319 self,
2320 exc_type: type[BaseException] | None,
2321 exc_val: BaseException | None,
2322 exc_tb: TracebackType | None,
2323 ) -> None:
2324 """Exit context manager and close repository."""
2325 self.close()
2327 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]:
2328 """Read .gitattributes file from working tree.
2330 Returns:
2331 Dictionary mapping file patterns to attributes
2332 """
2333 gitattributes = {}
2334 gitattributes_path = os.path.join(self.path, ".gitattributes")
2336 if os.path.exists(gitattributes_path):
2337 with open(gitattributes_path, "rb") as f:
2338 for line in f:
2339 line = line.strip()
2340 if not line or line.startswith(b"#"):
2341 continue
2343 parts = line.split()
2344 if len(parts) < 2:
2345 continue
2347 pattern = parts[0]
2348 attrs = {}
2350 for attr in parts[1:]:
2351 if attr.startswith(b"-"):
2352 # Unset attribute
2353 attrs[attr[1:]] = b"false"
2354 elif b"=" in attr:
2355 # Set to value
2356 key, value = attr.split(b"=", 1)
2357 attrs[key] = value
2358 else:
2359 # Set attribute
2360 attrs[attr] = b"true"
2362 gitattributes[pattern] = attrs
2364 return gitattributes
2366 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
2367 """Return a BlobNormalizer object."""
2368 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2370 # Get fresh configuration and GitAttributes
2371 config_stack = self.get_config_stack()
2372 git_attributes = self.get_gitattributes()
2374 # Lazily create FilterContext if needed
2375 if self.filter_context is None:
2376 filter_registry = FilterRegistry(config_stack, self)
2377 self.filter_context = FilterContext(filter_registry)
2378 else:
2379 # Refresh the context with current config to handle config changes
2380 self.filter_context.refresh_config(config_stack)
2382 # Return a new FilterBlobNormalizer with the context
2383 return FilterBlobNormalizer(
2384 config_stack, git_attributes, filter_context=self.filter_context
2385 )
2387 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
2388 """Read gitattributes for the repository.
2390 Args:
2391 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
2393 Returns:
2394 GitAttributes object that can be used to match paths
2395 """
2396 from .attrs import (
2397 GitAttributes,
2398 Pattern,
2399 parse_git_attributes,
2400 )
2402 patterns = []
2404 # Read system gitattributes (TODO: implement this)
2405 # Read global gitattributes (TODO: implement this)
2407 # Read repository .gitattributes from index/tree
2408 if tree is None:
2409 try:
2410 # Try to get from HEAD
2411 head = self[b"HEAD"]
2412 # Peel tags to get to the underlying commit
2413 while isinstance(head, Tag):
2414 _cls, obj = head.object
2415 head = self.get_object(obj)
2416 if not isinstance(head, Commit):
2417 raise ValueError(
2418 f"Expected HEAD to point to a Commit, got {type(head).__name__}. "
2419 f"This usually means HEAD points to a {type(head).__name__} object "
2420 f"instead of a Commit."
2421 )
2422 tree = head.tree
2423 except KeyError:
2424 # No HEAD, no attributes from tree
2425 pass
2427 if tree is not None:
2428 try:
2429 tree_obj = self[tree]
2430 assert isinstance(tree_obj, Tree)
2431 if b".gitattributes" in tree_obj:
2432 _, attrs_sha = tree_obj[b".gitattributes"]
2433 attrs_blob = self[attrs_sha]
2434 if isinstance(attrs_blob, Blob):
2435 attrs_data = BytesIO(attrs_blob.data)
2436 for pattern_bytes, attrs in parse_git_attributes(attrs_data):
2437 pattern = Pattern(pattern_bytes)
2438 patterns.append((pattern, attrs))
2439 except (KeyError, NotTreeError):
2440 pass
2442 # Read .git/info/attributes
2443 info_attrs_path = os.path.join(self.controldir(), "info", "attributes")
2444 if os.path.exists(info_attrs_path):
2445 with open(info_attrs_path, "rb") as f:
2446 for pattern_bytes, attrs in parse_git_attributes(f):
2447 pattern = Pattern(pattern_bytes)
2448 patterns.append((pattern, attrs))
2450 # Read .gitattributes from working directory (if it exists)
2451 working_attrs_path = os.path.join(self.path, ".gitattributes")
2452 if os.path.exists(working_attrs_path):
2453 with open(working_attrs_path, "rb") as f:
2454 for pattern_bytes, attrs in parse_git_attributes(f):
2455 pattern = Pattern(pattern_bytes)
2456 patterns.append((pattern, attrs))
2458 return GitAttributes(patterns)
2461class MemoryRepo(BaseRepo):
2462 """Repo that stores refs, objects, and named files in memory.
2464 MemoryRepos are always bare: they have no working tree and no index, since
2465 those have a stronger dependency on the filesystem.
2466 """
2468 filter_context: "FilterContext | None"
2470 def __init__(self) -> None:
2471 """Create a new repository in memory."""
2472 from .config import ConfigFile
2473 from .object_format import DEFAULT_OBJECT_FORMAT
2475 self._reflog: list[Any] = []
2476 refs_container = DictRefsContainer({}, logger=self._append_reflog)
2477 BaseRepo.__init__(self, MemoryObjectStore(), refs_container)
2478 self._named_files: dict[str, bytes] = {}
2479 self.bare = True
2480 self._config = ConfigFile()
2481 self._description: bytes | None = None
2482 self.filter_context = None
2483 # MemoryRepo defaults to default object format
2484 self.object_format = DEFAULT_OBJECT_FORMAT
2486 def _append_reflog(
2487 self,
2488 ref: bytes,
2489 old_sha: bytes | None,
2490 new_sha: bytes | None,
2491 committer: bytes | None,
2492 timestamp: int | None,
2493 timezone: int | None,
2494 message: bytes | None,
2495 ) -> None:
2496 self._reflog.append(
2497 (ref, old_sha, new_sha, committer, timestamp, timezone, message)
2498 )
2500 def set_description(self, description: bytes) -> None:
2501 """Set the description for this repository.
2503 Args:
2504 description: Text to set as description
2505 """
2506 self._description = description
2508 def get_description(self) -> bytes | None:
2509 """Get the description of this repository.
2511 Returns:
2512 Repository description as bytes
2513 """
2514 return self._description
2516 def _determine_file_mode(self) -> bool:
2517 """Probe the file-system to determine whether permissions can be trusted.
2519 Returns: True if permissions can be trusted, False otherwise.
2520 """
2521 return sys.platform != "win32"
2523 def _determine_symlinks(self) -> bool:
2524 """Probe the file-system to determine whether permissions can be trusted.
2526 Returns: True if permissions can be trusted, False otherwise.
2527 """
2528 return sys.platform != "win32"
2530 def _put_named_file(self, path: str, contents: bytes) -> None:
2531 """Write a file to the control dir with the given name and contents.
2533 Args:
2534 path: The path to the file, relative to the control dir.
2535 contents: A string to write to the file.
2536 """
2537 self._named_files[path] = contents
2539 def _del_named_file(self, path: str) -> None:
2540 try:
2541 del self._named_files[path]
2542 except KeyError:
2543 pass
2545 def get_named_file(
2546 self,
2547 path: str | bytes,
2548 basedir: str | None = None,
2549 ) -> BytesIO | None:
2550 """Get a file from the control dir with a specific name.
2552 Although the filename should be interpreted as a filename relative to
2553 the control dir in a disk-baked Repo, the object returned need not be
2554 pointing to a file in that location.
2556 Args:
2557 path: The path to the file, relative to the control dir.
2558 basedir: Optional base directory for the path
2559 Returns: An open file object, or None if the file does not exist.
2560 """
2561 path_str = path.decode() if isinstance(path, bytes) else path
2562 contents = self._named_files.get(path_str, None)
2563 if contents is None:
2564 return None
2565 return BytesIO(contents)
2567 def open_index(self) -> "Index":
2568 """Fail to open index for this repo, since it is bare.
2570 Raises:
2571 NoIndexPresent: Raised when no index is present
2572 """
2573 raise NoIndexPresent
2575 def _init_config(self, config: "ConfigFile") -> None:
2576 """Initialize repository configuration for MemoryRepo."""
2577 self._config = config
2579 def get_config(self) -> "ConfigFile":
2580 """Retrieve the config object.
2582 Returns: `ConfigFile` object.
2583 """
2584 return self._config
2586 def get_rebase_state_manager(self) -> "RebaseStateManager":
2587 """Get the appropriate rebase state manager for this repository.
2589 Returns: MemoryRebaseStateManager instance
2590 """
2591 from .rebase import MemoryRebaseStateManager
2593 return MemoryRebaseStateManager(self)
2595 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
2596 """Return a BlobNormalizer object for checkin/checkout operations."""
2597 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2599 # Get fresh configuration and GitAttributes
2600 config_stack = self.get_config_stack()
2601 git_attributes = self.get_gitattributes()
2603 # Lazily create FilterContext if needed
2604 if self.filter_context is None:
2605 filter_registry = FilterRegistry(config_stack, self)
2606 self.filter_context = FilterContext(filter_registry)
2607 else:
2608 # Refresh the context with current config to handle config changes
2609 self.filter_context.refresh_config(config_stack)
2611 # Return a new FilterBlobNormalizer with the context
2612 return FilterBlobNormalizer(
2613 config_stack, git_attributes, filter_context=self.filter_context
2614 )
2616 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
2617 """Read gitattributes for the repository."""
2618 from .attrs import GitAttributes
2620 # Memory repos don't have working trees or gitattributes files
2621 # Return empty GitAttributes
2622 return GitAttributes([])
2624 def close(self) -> None:
2625 """Close any resources opened by this repository."""
2626 # Clean up filter context if it was created
2627 if self.filter_context is not None:
2628 self.filter_context.close()
2629 self.filter_context = None
2630 # Close object store to release pack files
2631 self.object_store.close()
2633 def do_commit(
2634 self,
2635 message: bytes | None = None,
2636 committer: bytes | None = None,
2637 author: bytes | None = None,
2638 commit_timestamp: float | None = None,
2639 commit_timezone: int | None = None,
2640 author_timestamp: float | None = None,
2641 author_timezone: int | None = None,
2642 tree: ObjectID | None = None,
2643 encoding: bytes | None = None,
2644 ref: Ref | None = HEADREF,
2645 merge_heads: list[ObjectID] | None = None,
2646 no_verify: bool = False,
2647 sign: bool = False,
2648 ) -> bytes:
2649 """Create a new commit.
2651 This is a simplified implementation for in-memory repositories that
2652 doesn't support worktree operations or hooks.
2654 Args:
2655 message: Commit message
2656 committer: Committer fullname
2657 author: Author fullname
2658 commit_timestamp: Commit timestamp (defaults to now)
2659 commit_timezone: Commit timestamp timezone (defaults to GMT)
2660 author_timestamp: Author timestamp (defaults to commit timestamp)
2661 author_timezone: Author timestamp timezone (defaults to commit timezone)
2662 tree: SHA1 of the tree root to use
2663 encoding: Encoding
2664 ref: Optional ref to commit to (defaults to current branch).
2665 If None, creates a dangling commit without updating any ref.
2666 merge_heads: Merge heads
2667 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo)
2668 sign: GPG Sign the commit (ignored for MemoryRepo)
2670 Returns:
2671 New commit SHA1
2672 """
2673 import time
2675 from .objects import Commit
2677 if tree is None:
2678 raise ValueError("tree must be specified for MemoryRepo")
2680 c = Commit()
2681 if len(tree) != self.object_format.hex_length:
2682 raise ValueError(
2683 f"tree must be a {self.object_format.hex_length}-character hex sha string"
2684 )
2685 c.tree = tree
2687 config = self.get_config_stack()
2688 if merge_heads is None:
2689 merge_heads = []
2690 if committer is None:
2691 committer = get_user_identity(config, kind="COMMITTER")
2692 check_user_identity(committer)
2693 c.committer = committer
2694 if commit_timestamp is None:
2695 commit_timestamp = time.time()
2696 c.commit_time = int(commit_timestamp)
2697 if commit_timezone is None:
2698 commit_timezone = 0
2699 c.commit_timezone = commit_timezone
2700 if author is None:
2701 author = get_user_identity(config, kind="AUTHOR")
2702 c.author = author
2703 check_user_identity(author)
2704 if author_timestamp is None:
2705 author_timestamp = commit_timestamp
2706 c.author_time = int(author_timestamp)
2707 if author_timezone is None:
2708 author_timezone = commit_timezone
2709 c.author_timezone = author_timezone
2710 if encoding is None:
2711 try:
2712 encoding = config.get(("i18n",), "commitEncoding")
2713 except KeyError:
2714 pass
2715 if encoding is not None:
2716 c.encoding = encoding
2718 # Handle message (for MemoryRepo, we don't support callable messages)
2719 if callable(message):
2720 message = message(self, c)
2721 if message is None:
2722 raise ValueError("Message callback returned None")
2724 if message is None:
2725 raise ValueError("No commit message specified")
2727 c.message = message
2729 if ref is None:
2730 # Create a dangling commit
2731 c.parents = merge_heads
2732 self.object_store.add_object(c)
2733 else:
2734 try:
2735 old_head = self.refs[ref]
2736 c.parents = [old_head, *merge_heads]
2737 self.object_store.add_object(c)
2738 ok = self.refs.set_if_equals(
2739 ref,
2740 old_head,
2741 c.id,
2742 message=b"commit: " + message,
2743 committer=committer,
2744 timestamp=int(commit_timestamp),
2745 timezone=commit_timezone,
2746 )
2747 except KeyError:
2748 c.parents = merge_heads
2749 self.object_store.add_object(c)
2750 ok = self.refs.add_if_new(
2751 ref,
2752 c.id,
2753 message=b"commit: " + message,
2754 committer=committer,
2755 timestamp=int(commit_timestamp),
2756 timezone=commit_timezone,
2757 )
2758 if not ok:
2759 from .errors import CommitError
2761 raise CommitError(f"{ref!r} changed during commit")
2763 return c.id
2765 @classmethod
2766 def init_bare(
2767 cls,
2768 objects: Iterable[ShaFile],
2769 refs: Mapping[Ref, ObjectID],
2770 format: int | None = None,
2771 object_format: str | None = None,
2772 ) -> "MemoryRepo":
2773 """Create a new bare repository in memory.
2775 Args:
2776 objects: Objects for the new repository,
2777 as iterable
2778 refs: Refs as dictionary, mapping names
2779 to object SHA1s
2780 format: Repository format version (defaults to 0)
2781 object_format: Object format to use ("sha1" or "sha256", defaults to "sha1")
2782 """
2783 ret = cls()
2784 for obj in objects:
2785 ret.object_store.add_object(obj)
2786 for refname, sha in refs.items():
2787 ret.refs.add_if_new(refname, sha)
2788 ret._init_files(bare=True, format=format, object_format=object_format)
2789 return ret