Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 38%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# repo.py -- For dealing with git repositories.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
24"""Repository access.
26This module contains the base class for git repositories
27(BaseRepo) and an implementation which uses a repository on
28local disk (Repo).
30"""
32__all__ = [
33 "BASE_DIRECTORIES",
34 "COMMONDIR",
35 "CONTROLDIR",
36 "DEFAULT_BRANCH",
37 "DEFAULT_OFS_DELTA",
38 "GITDIR",
39 "INDEX_FILENAME",
40 "OBJECTDIR",
41 "REFSDIR",
42 "REFSDIR_HEADS",
43 "REFSDIR_TAGS",
44 "WORKTREES",
45 "BaseRepo",
46 "DefaultIdentityNotFound",
47 "InvalidUserIdentity",
48 "MemoryRepo",
49 "ParentsProvider",
50 "Repo",
51 "UnsupportedExtension",
52 "UnsupportedVersion",
53 "check_user_identity",
54 "get_user_identity",
55 "parse_graftpoints",
56 "parse_shared_repository",
57 "read_gitfile",
58 "serialize_graftpoints",
59]
61import os
62import stat
63import sys
64import time
65import warnings
66from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Sequence
67from io import BytesIO
68from types import TracebackType
69from typing import (
70 TYPE_CHECKING,
71 Any,
72 BinaryIO,
73 TypeVar,
74)
76if TYPE_CHECKING:
77 # There are no circular imports here, but we try to defer imports as long
78 # as possible to reduce start-up time for anything that doesn't need
79 # these imports.
80 from .attrs import GitAttributes
81 from .config import ConditionMatcher, ConfigFile, StackedConfig
82 from .diff_tree import RenameDetector
83 from .filters import FilterBlobNormalizer, FilterContext
84 from .index import Index
85 from .notes import Notes
86 from .object_format import ObjectFormat
87 from .object_store import BaseObjectStore, GraphWalker
88 from .pack import UnpackedObject
89 from .rebase import RebaseStateManager
90 from .walk import Walker
91 from .worktree import WorkTree
93from . import reflog
94from .errors import (
95 NoIndexPresent,
96 NotBlobError,
97 NotCommitError,
98 NotGitRepository,
99 NotTagError,
100 NotTreeError,
101 RefFormatError,
102)
103from .file import GitFile
104from .hooks import (
105 CommitMsgShellHook,
106 Hook,
107 PostCommitShellHook,
108 PostReceiveShellHook,
109 PreCommitShellHook,
110 PreReceiveShellHook,
111 UpdateShellHook,
112)
113from .object_store import (
114 DiskObjectStore,
115 MemoryObjectStore,
116 MissingObjectFinder,
117 ObjectStoreGraphWalker,
118 PackBasedObjectStore,
119 PackCapableObjectStore,
120 find_shallow,
121 peel_sha,
122)
123from .objects import (
124 Blob,
125 Commit,
126 ObjectID,
127 RawObjectID,
128 ShaFile,
129 Tag,
130 Tree,
131 check_hexsha,
132 valid_hexsha,
133)
134from .pack import generate_unpacked_objects
135from .refs import (
136 HEADREF,
137 LOCAL_TAG_PREFIX, # noqa: F401
138 SYMREF, # noqa: F401
139 DictRefsContainer,
140 DiskRefsContainer,
141 Ref,
142 RefsContainer,
143 _set_default_branch,
144 _set_head,
145 _set_origin_head,
146 check_ref_format, # noqa: F401
147 extract_branch_name,
148 is_per_worktree_ref,
149 local_branch_name,
150 read_packed_refs, # noqa: F401
151 read_packed_refs_with_peeled, # noqa: F401
152 write_packed_refs, # noqa: F401
153)
155CONTROLDIR = ".git"
156OBJECTDIR = "objects"
157DEFAULT_OFS_DELTA = True
159T = TypeVar("T", bound="ShaFile")
160REFSDIR = "refs"
161REFSDIR_TAGS = "tags"
162REFSDIR_HEADS = "heads"
163INDEX_FILENAME = "index"
164COMMONDIR = "commondir"
165GITDIR = "gitdir"
166WORKTREES = "worktrees"
168BASE_DIRECTORIES = [
169 ["branches"],
170 [REFSDIR],
171 [REFSDIR, REFSDIR_TAGS],
172 [REFSDIR, REFSDIR_HEADS],
173 ["hooks"],
174 ["info"],
175]
177DEFAULT_BRANCH = b"master"
180class InvalidUserIdentity(Exception):
181 """User identity is not of the format 'user <email>'."""
183 def __init__(self, identity: str) -> None:
184 """Initialize InvalidUserIdentity exception."""
185 self.identity = identity
188class DefaultIdentityNotFound(Exception):
189 """Default identity could not be determined."""
192# TODO(jelmer): Cache?
193def _get_default_identity() -> tuple[str, str]:
194 import socket
196 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
197 username = os.environ.get(name)
198 if username:
199 break
200 else:
201 username = None
203 try:
204 import pwd
205 except ImportError:
206 fullname = None
207 else:
208 try:
209 entry = pwd.getpwuid(os.getuid()) # type: ignore[attr-defined,unused-ignore]
210 except KeyError:
211 fullname = None
212 else:
213 if getattr(entry, "gecos", None):
214 fullname = entry.pw_gecos.split(",")[0]
215 else:
216 fullname = None
217 if username is None:
218 username = entry.pw_name
219 if not fullname:
220 if username is None:
221 raise DefaultIdentityNotFound("no username found")
222 fullname = username
223 email = os.environ.get("EMAIL")
224 if email is None:
225 if username is None:
226 raise DefaultIdentityNotFound("no username found")
227 email = f"{username}@{socket.gethostname()}"
228 return (fullname, email)
231def get_user_identity(config: "StackedConfig", kind: str | None = None) -> bytes:
232 """Determine the identity to use for new commits.
234 If kind is set, this first checks
235 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL.
237 If those variables are not set, then it will fall back
238 to reading the user.name and user.email settings from
239 the specified configuration.
241 If that also fails, then it will fall back to using
242 the current users' identity as obtained from the host
243 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f).
245 Args:
246 config: Configuration stack to read from
247 kind: Optional kind to return identity for,
248 usually either "AUTHOR" or "COMMITTER".
250 Returns:
251 A user identity
252 """
253 user: bytes | None = None
254 email: bytes | None = None
255 if kind:
256 user_uc = os.environ.get("GIT_" + kind + "_NAME")
257 if user_uc is not None:
258 user = user_uc.encode("utf-8")
259 email_uc = os.environ.get("GIT_" + kind + "_EMAIL")
260 if email_uc is not None:
261 email = email_uc.encode("utf-8")
262 if user is None:
263 try:
264 user = config.get(("user",), "name")
265 except KeyError:
266 user = None
267 if email is None:
268 try:
269 email = config.get(("user",), "email")
270 except KeyError:
271 email = None
272 default_user, default_email = _get_default_identity()
273 if user is None:
274 user = default_user.encode("utf-8")
275 if email is None:
276 email = default_email.encode("utf-8")
277 if email.startswith(b"<") and email.endswith(b">"):
278 email = email[1:-1]
279 return user + b" <" + email + b">"
282def check_user_identity(identity: bytes) -> None:
283 """Verify that a user identity is formatted correctly.
285 Args:
286 identity: User identity bytestring
287 Raises:
288 InvalidUserIdentity: Raised when identity is invalid
289 """
290 try:
291 _fst, snd = identity.split(b" <", 1)
292 except ValueError as exc:
293 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc
294 if b">" not in snd:
295 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
296 if b"\0" in identity or b"\n" in identity:
297 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
300def parse_graftpoints(
301 graftpoints: Iterable[bytes],
302) -> dict[ObjectID, list[ObjectID]]:
303 """Convert a list of graftpoints into a dict.
305 Args:
306 graftpoints: Iterator of graftpoint lines
308 Each line is formatted as:
309 <commit sha1> <parent sha1> [<parent sha1>]*
311 Resulting dictionary is:
312 <commit sha1>: [<parent sha1>*]
314 https://git.wiki.kernel.org/index.php/GraftPoint
315 """
316 grafts: dict[ObjectID, list[ObjectID]] = {}
317 for line in graftpoints:
318 raw_graft = line.split(None, 1)
320 commit = ObjectID(raw_graft[0])
321 if len(raw_graft) == 2:
322 parents = [ObjectID(p) for p in raw_graft[1].split()]
323 else:
324 parents = []
326 for sha in [commit, *parents]:
327 check_hexsha(sha, "Invalid graftpoint")
329 grafts[commit] = parents
330 return grafts
333def serialize_graftpoints(graftpoints: Mapping[ObjectID, Sequence[ObjectID]]) -> bytes:
334 """Convert a dictionary of grafts into string.
336 The graft dictionary is:
337 <commit sha1>: [<parent sha1>*]
339 Each line is formatted as:
340 <commit sha1> <parent sha1> [<parent sha1>]*
342 https://git.wiki.kernel.org/index.php/GraftPoint
344 """
345 graft_lines = []
346 for commit, parents in graftpoints.items():
347 if parents:
348 graft_lines.append(commit + b" " + b" ".join(parents))
349 else:
350 graft_lines.append(commit)
351 return b"\n".join(graft_lines)
354def _set_filesystem_hidden(path: str) -> None:
355 """Mark path as to be hidden if supported by platform and filesystem.
357 On win32 uses SetFileAttributesW api:
358 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw>
359 """
360 if sys.platform == "win32":
361 import ctypes
362 from ctypes.wintypes import BOOL, DWORD, LPCWSTR
364 FILE_ATTRIBUTE_HIDDEN = 2
365 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)(
366 ("SetFileAttributesW", ctypes.windll.kernel32)
367 )
369 if isinstance(path, bytes):
370 path = os.fsdecode(path)
371 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN):
372 pass # Could raise or log `ctypes.WinError()` here
374 # Could implement other platform specific filesystem hiding here
377def parse_shared_repository(
378 value: str | bytes | bool,
379) -> tuple[int | None, int | None]:
380 """Parse core.sharedRepository configuration value.
382 Args:
383 value: Configuration value (string, bytes, or boolean)
385 Returns:
386 tuple of (file_mask, directory_mask) or (None, None) if not shared
388 The masks are permission bits to apply via chmod.
389 """
390 if isinstance(value, bytes):
391 value = value.decode("utf-8", errors="replace")
393 # Handle boolean values
394 if isinstance(value, bool):
395 if value:
396 # true = group (same as "group")
397 return (0o664, 0o2775)
398 else:
399 # false = umask (use system umask, no adjustment)
400 return (None, None)
402 # Handle string values
403 value_lower = value.lower()
405 if value_lower in ("false", "0", ""):
406 # Use umask (no adjustment)
407 return (None, None)
409 if value_lower in ("true", "1", "group"):
410 # Group writable (with setgid bit)
411 return (0o664, 0o2775)
413 if value_lower in ("all", "world", "everybody", "2"):
414 # World readable/writable (with setgid bit)
415 return (0o666, 0o2777)
417 if value_lower == "umask":
418 # Explicitly use umask
419 return (None, None)
421 # Try to parse as octal
422 if value.startswith("0"):
423 try:
424 mode = int(value, 8)
425 # For directories, add execute bits where read bits are set
426 # and add setgid bit for shared repositories
427 dir_mode = mode | 0o2000 # Add setgid bit
428 if mode & 0o004:
429 dir_mode |= 0o001
430 if mode & 0o040:
431 dir_mode |= 0o010
432 if mode & 0o400:
433 dir_mode |= 0o100
434 return (mode, dir_mode)
435 except ValueError:
436 pass
438 # Default to umask for unrecognized values
439 return (None, None)
442class ParentsProvider:
443 """Provider for commit parent information."""
445 def __init__(
446 self,
447 store: "BaseObjectStore",
448 grafts: dict[ObjectID, list[ObjectID]] = {},
449 shallows: Iterable[ObjectID] = [],
450 ) -> None:
451 """Initialize ParentsProvider.
453 Args:
454 store: Object store to use
455 grafts: Graft information
456 shallows: Shallow commit SHAs
457 """
458 self.store = store
459 self.grafts = grafts
460 self.shallows = set(shallows)
462 # Get commit graph once at initialization for performance
463 self.commit_graph = store.get_commit_graph()
465 def get_parents(
466 self, commit_id: ObjectID, commit: Commit | None = None
467 ) -> list[ObjectID]:
468 """Get parents for a commit using the parents provider."""
469 try:
470 return self.grafts[commit_id]
471 except KeyError:
472 pass
473 if commit_id in self.shallows:
474 return []
476 # Try to use commit graph for faster parent lookup
477 if self.commit_graph:
478 parents = self.commit_graph.get_parents(commit_id)
479 if parents is not None:
480 return parents
482 # Fallback to reading the commit object
483 if commit is None:
484 obj = self.store[commit_id]
485 if not isinstance(obj, Commit):
486 raise ValueError(
487 f"Expected Commit object for commit_id {commit_id.decode()}, "
488 f"got {type(obj).__name__}. This usually means a reference "
489 f"points to a {type(obj).__name__} object instead of a Commit."
490 )
491 commit = obj
492 result: list[ObjectID] = commit.parents
493 return result
496class BaseRepo:
497 """Base class for a git repository.
499 This base class is meant to be used for Repository implementations that e.g.
500 work on top of a different transport than a standard filesystem path.
502 Attributes:
503 object_store: Dictionary-like object for accessing
504 the objects
505 refs: Dictionary-like object with the refs in this
506 repository
507 """
509 def __init__(
510 self,
511 object_store: "PackCapableObjectStore",
512 refs: RefsContainer,
513 object_format: "ObjectFormat | None" = None,
514 ) -> None:
515 """Open a repository.
517 This shouldn't be called directly, but rather through one of the
518 base classes, such as MemoryRepo or Repo.
520 Args:
521 object_store: Object store to use
522 refs: Refs container to use
523 object_format: Hash algorithm to use (if None, will use object_store's format)
524 """
525 self.object_store = object_store
526 self.refs = refs
528 self._graftpoints: dict[ObjectID, list[ObjectID]] = {}
529 self.hooks: dict[str, Hook] = {}
530 if object_format is None:
531 self.object_format: ObjectFormat = object_store.object_format
532 else:
533 self.object_format = object_format
535 def _determine_file_mode(self) -> bool:
536 """Probe the file-system to determine whether permissions can be trusted.
538 Returns: True if permissions can be trusted, False otherwise.
539 """
540 raise NotImplementedError(self._determine_file_mode)
542 def _determine_symlinks(self) -> bool:
543 """Probe the filesystem to determine whether symlinks can be created.
545 Returns: True if symlinks can be created, False otherwise.
546 """
547 # For now, just mimic the old behaviour
548 return sys.platform != "win32"
550 def _init_files(
551 self,
552 bare: bool,
553 symlinks: bool | None = None,
554 format: int | None = None,
555 shared_repository: str | bool | None = None,
556 object_format: str | None = None,
557 ) -> None:
558 """Initialize a default set of named files."""
559 from .config import ConfigFile
561 self._put_named_file("description", b"Unnamed repository")
562 f = BytesIO()
563 cf = ConfigFile()
565 # Determine the appropriate format version
566 if object_format == "sha256":
567 # SHA256 requires format version 1
568 if format is None:
569 format = 1
570 elif format != 1:
571 raise ValueError(
572 "SHA256 object format requires repository format version 1"
573 )
574 else:
575 # SHA1 (default) can use format 0 or 1
576 if format is None:
577 format = 0
579 if format not in (0, 1):
580 raise ValueError(f"Unsupported repository format version: {format}")
582 cf.set("core", "repositoryformatversion", str(format))
584 # Set object format extension if using SHA256
585 if object_format == "sha256":
586 cf.set("extensions", "objectformat", "sha256")
588 # Set hash algorithm based on object format
589 from .object_format import get_object_format
591 self.object_format = get_object_format(object_format)
593 if self._determine_file_mode():
594 cf.set("core", "filemode", True)
595 else:
596 cf.set("core", "filemode", False)
598 if symlinks is None and not bare:
599 symlinks = self._determine_symlinks()
601 if symlinks is False:
602 cf.set("core", "symlinks", symlinks)
604 cf.set("core", "bare", bare)
605 cf.set("core", "logallrefupdates", True)
607 # Set shared repository if specified
608 if shared_repository is not None:
609 if isinstance(shared_repository, bool):
610 cf.set("core", "sharedRepository", shared_repository)
611 else:
612 cf.set("core", "sharedRepository", shared_repository)
614 cf.write_to_file(f)
615 self._put_named_file("config", f.getvalue())
616 self._put_named_file(os.path.join("info", "exclude"), b"")
618 # Allow subclasses to handle config initialization
619 self._init_config(cf)
621 def _init_config(self, config: "ConfigFile") -> None:
622 """Initialize repository configuration.
624 This method can be overridden by subclasses to handle config initialization.
626 Args:
627 config: The ConfigFile object that was just created
628 """
629 # Default implementation does nothing
631 def get_named_file(self, path: str) -> BinaryIO | None:
632 """Get a file from the control dir with a specific name.
634 Although the filename should be interpreted as a filename relative to
635 the control dir in a disk-based Repo, the object returned need not be
636 pointing to a file in that location.
638 Args:
639 path: The path to the file, relative to the control dir.
640 Returns: An open file object, or None if the file does not exist.
641 """
642 raise NotImplementedError(self.get_named_file)
644 def _put_named_file(self, path: str, contents: bytes) -> None:
645 """Write a file to the control dir with the given name and contents.
647 Args:
648 path: The path to the file, relative to the control dir.
649 contents: A string to write to the file.
650 """
651 raise NotImplementedError(self._put_named_file)
653 def _del_named_file(self, path: str) -> None:
654 """Delete a file in the control directory with the given name."""
655 raise NotImplementedError(self._del_named_file)
657 def open_index(self) -> "Index":
658 """Open the index for this repository.
660 Raises:
661 NoIndexPresent: If no index is present
662 Returns: The matching `Index`
663 """
664 raise NotImplementedError(self.open_index)
666 def _change_object_format(self, object_format_name: str) -> None:
667 """Change the object format of this repository.
669 This can only be done if the object store is empty (no objects written yet).
671 Args:
672 object_format_name: Name of the new object format (e.g., "sha1", "sha256")
674 Raises:
675 AssertionError: If the object store is not empty
676 """
677 # Check if object store has any objects
678 for _ in self.object_store:
679 raise AssertionError(
680 "Cannot change object format: repository already contains objects"
681 )
683 # Update the object format
684 from .object_format import get_object_format
686 new_format = get_object_format(object_format_name)
687 self.object_format = new_format
688 self.object_store.object_format = new_format
690 # Update config file
691 config = self.get_config()
693 if object_format_name == "sha1":
694 # For SHA-1, explicitly remove objectformat extension if present
695 try:
696 config.remove("extensions", "objectformat")
697 except KeyError:
698 pass
699 else:
700 # For non-SHA-1 formats, set repositoryformatversion to 1 and objectformat extension
701 config.set("core", "repositoryformatversion", "1")
702 config.set("extensions", "objectformat", object_format_name)
704 config.write_to_path()
706 def fetch(
707 self,
708 target: "BaseRepo",
709 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]]
710 | None = None,
711 progress: Callable[..., None] | None = None,
712 depth: int | None = None,
713 ) -> dict[Ref, ObjectID]:
714 """Fetch objects into another repository.
716 Args:
717 target: The target repository
718 determine_wants: Optional function to determine what refs to
719 fetch.
720 progress: Optional progress function
721 depth: Optional shallow fetch depth
722 Returns: The local refs
723 """
724 # Fix object format if needed
725 if self.object_format != target.object_format:
726 # Change the target repo's format if it's empty
727 target._change_object_format(self.object_format.name)
729 if determine_wants is None:
730 determine_wants = target.object_store.determine_wants_all
731 count, pack_data = self.fetch_pack_data(
732 determine_wants,
733 target.get_graph_walker(),
734 progress=progress,
735 depth=depth,
736 )
737 target.object_store.add_pack_data(count, pack_data, progress)
738 return self.get_refs()
740 def fetch_pack_data(
741 self,
742 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]],
743 graph_walker: "GraphWalker",
744 progress: Callable[[bytes], None] | None,
745 *,
746 get_tagged: Callable[[], dict[ObjectID, ObjectID]] | None = None,
747 depth: int | None = None,
748 ) -> tuple[int, Iterator["UnpackedObject"]]:
749 """Fetch the pack data required for a set of revisions.
751 Args:
752 determine_wants: Function that takes a dictionary with heads
753 and returns the list of heads to fetch.
754 graph_walker: Object that can iterate over the list of revisions
755 to fetch and has an "ack" method that will be called to acknowledge
756 that a revision is present.
757 progress: Simple progress function that will be called with
758 updated progress strings.
759 get_tagged: Function that returns a dict of pointed-to sha ->
760 tag sha for including tags.
761 depth: Shallow fetch depth
762 Returns: count and iterator over pack data
763 """
764 missing_objects = self.find_missing_objects(
765 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth
766 )
767 if missing_objects is None:
768 return 0, iter([])
769 remote_has = missing_objects.get_remote_has()
770 object_ids = list(missing_objects)
771 return len(object_ids), generate_unpacked_objects(
772 self.object_store, object_ids, progress=progress, other_haves=remote_has
773 )
775 def find_missing_objects(
776 self,
777 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]],
778 graph_walker: "GraphWalker",
779 progress: Callable[[bytes], None] | None,
780 *,
781 get_tagged: Callable[[], dict[ObjectID, ObjectID]] | None = None,
782 depth: int | None = None,
783 ) -> MissingObjectFinder | None:
784 """Fetch the missing objects required for a set of revisions.
786 Args:
787 determine_wants: Function that takes a dictionary with heads
788 and returns the list of heads to fetch.
789 graph_walker: Object that can iterate over the list of revisions
790 to fetch and has an "ack" method that will be called to acknowledge
791 that a revision is present.
792 progress: Simple progress function that will be called with
793 updated progress strings.
794 get_tagged: Function that returns a dict of pointed-to sha ->
795 tag sha for including tags.
796 depth: Shallow fetch depth
797 Returns: iterator over objects, with __len__ implemented
798 """
799 import logging
801 # Filter out refs pointing to missing objects to avoid errors downstream.
802 # This makes Dulwich more robust when dealing with broken refs on disk.
803 # Previously serialize_refs() did this filtering as a side-effect.
804 all_refs = self.get_refs()
805 refs: dict[Ref, ObjectID] = {}
806 for ref, sha in all_refs.items():
807 if sha in self.object_store:
808 refs[ref] = sha
809 else:
810 logging.warning(
811 "ref %s points at non-present sha %s",
812 ref.decode("utf-8", "replace"),
813 sha.decode("ascii"),
814 )
816 wants = determine_wants(refs, depth)
817 if not isinstance(wants, list):
818 raise TypeError("determine_wants() did not return a list")
820 current_shallow = set(getattr(graph_walker, "shallow", set()))
822 if depth not in (None, 0):
823 assert depth is not None
824 shallow, not_shallow = find_shallow(self.object_store, wants, depth)
825 # Only update if graph_walker has shallow attribute
826 if hasattr(graph_walker, "shallow"):
827 graph_walker.shallow.update(shallow - not_shallow)
828 new_shallow = graph_walker.shallow - current_shallow
829 unshallow = not_shallow & current_shallow
830 setattr(graph_walker, "unshallow", unshallow)
831 if hasattr(graph_walker, "update_shallow"):
832 graph_walker.update_shallow(new_shallow, unshallow)
833 else:
834 unshallow = getattr(graph_walker, "unshallow", set())
836 if wants == []:
837 # TODO(dborowitz): find a way to short-circuit that doesn't change
838 # this interface.
840 if getattr(graph_walker, "shallow", set()) or unshallow:
841 # Do not send a pack in shallow short-circuit path
842 return None
844 # Return an actual MissingObjectFinder with empty wants
845 return MissingObjectFinder(
846 self.object_store,
847 haves=[],
848 wants=[],
849 )
851 # If the graph walker is set up with an implementation that can
852 # ACK/NAK to the wire, it will write data to the client through
853 # this call as a side-effect.
854 haves = self.object_store.find_common_revisions(graph_walker)
856 # Deal with shallow requests separately because the haves do
857 # not reflect what objects are missing
858 if getattr(graph_walker, "shallow", set()) or unshallow:
859 # TODO: filter the haves commits from iter_shas. the specific
860 # commits aren't missing.
861 haves = []
863 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow)
865 def get_parents(commit: Commit) -> list[ObjectID]:
866 """Get parents for a commit using the parents provider.
868 Args:
869 commit: Commit object
871 Returns:
872 List of parent commit SHAs
873 """
874 return parents_provider.get_parents(commit.id, commit)
876 return MissingObjectFinder(
877 self.object_store,
878 haves=haves,
879 wants=wants,
880 shallow=getattr(graph_walker, "shallow", set()),
881 progress=progress,
882 get_tagged=get_tagged,
883 get_parents=get_parents,
884 )
886 def generate_pack_data(
887 self,
888 have: set[ObjectID],
889 want: set[ObjectID],
890 *,
891 shallow: set[ObjectID] | None = None,
892 progress: Callable[[str], None] | None = None,
893 ofs_delta: bool | None = None,
894 ) -> tuple[int, Iterator["UnpackedObject"]]:
895 """Generate pack data objects for a set of wants/haves.
897 Args:
898 have: List of SHA1s of objects that should not be sent
899 want: List of SHA1s of objects that should be sent
900 shallow: Set of shallow commit SHA1s to skip (defaults to repo's shallow commits)
901 ofs_delta: Whether OFS deltas can be included
902 progress: Optional progress reporting method
903 """
904 if shallow is None:
905 shallow = self.get_shallow()
906 return self.object_store.generate_pack_data(
907 have,
908 want,
909 shallow=shallow,
910 progress=progress,
911 ofs_delta=ofs_delta if ofs_delta is not None else DEFAULT_OFS_DELTA,
912 )
914 def get_graph_walker(
915 self, heads: list[ObjectID] | None = None
916 ) -> ObjectStoreGraphWalker:
917 """Retrieve a graph walker.
919 A graph walker is used by a remote repository (or proxy)
920 to find out which objects are present in this repository.
922 Args:
923 heads: Repository heads to use (optional)
924 Returns: A graph walker object
925 """
926 if heads is None:
927 heads = [
928 sha
929 for sha in self.refs.as_dict(Ref(b"refs/heads")).values()
930 if sha in self.object_store
931 ]
932 parents_provider = ParentsProvider(self.object_store)
933 return ObjectStoreGraphWalker(
934 heads,
935 parents_provider.get_parents,
936 shallow=self.get_shallow(),
937 update_shallow=self.update_shallow,
938 )
940 def get_refs(self) -> dict[Ref, ObjectID]:
941 """Get dictionary with all refs.
943 Returns: A ``dict`` mapping ref names to SHA1s
944 """
945 return self.refs.as_dict()
947 def head(self) -> ObjectID:
948 """Return the SHA1 pointed at by HEAD."""
949 # TODO: move this method to WorkTree
950 return self.refs[HEADREF]
952 def _get_object(self, sha: ObjectID | RawObjectID, cls: type[T]) -> T:
953 assert len(sha) in (
954 self.object_format.oid_length,
955 self.object_format.hex_length,
956 )
957 ret = self.get_object(sha)
958 if not isinstance(ret, cls):
959 if cls is Commit:
960 raise NotCommitError(ret.id)
961 elif cls is Blob:
962 raise NotBlobError(ret.id)
963 elif cls is Tree:
964 raise NotTreeError(ret.id)
965 elif cls is Tag:
966 raise NotTagError(ret.id)
967 else:
968 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}")
969 return ret
971 def get_object(self, sha: ObjectID | RawObjectID) -> ShaFile:
972 """Retrieve the object with the specified SHA.
974 Args:
975 sha: SHA to retrieve
976 Returns: A ShaFile object
977 Raises:
978 KeyError: when the object can not be found
979 """
980 return self.object_store[sha]
982 def parents_provider(self) -> ParentsProvider:
983 """Get a parents provider for this repository.
985 Returns:
986 ParentsProvider instance configured with grafts and shallows
987 """
988 return ParentsProvider(
989 self.object_store,
990 grafts=self._graftpoints,
991 shallows=self.get_shallow(),
992 )
994 def get_parents(
995 self, sha: ObjectID, commit: Commit | None = None
996 ) -> list[ObjectID]:
997 """Retrieve the parents of a specific commit.
999 If the specific commit is a graftpoint, the graft parents
1000 will be returned instead.
1002 Args:
1003 sha: SHA of the commit for which to retrieve the parents
1004 commit: Optional commit matching the sha
1005 Returns: List of parents
1006 """
1007 return self.parents_provider().get_parents(sha, commit)
1009 def get_config(self) -> "ConfigFile":
1010 """Retrieve the config object.
1012 Returns: `ConfigFile` object for the ``.git/config`` file.
1013 """
1014 raise NotImplementedError(self.get_config)
1016 def get_worktree_config(self) -> "ConfigFile":
1017 """Retrieve the worktree config object."""
1018 raise NotImplementedError(self.get_worktree_config)
1020 def get_description(self) -> bytes | None:
1021 """Retrieve the description for this repository.
1023 Returns: Bytes with the description of the repository
1024 as set by the user.
1025 """
1026 raise NotImplementedError(self.get_description)
1028 def set_description(self, description: bytes) -> None:
1029 """Set the description for this repository.
1031 Args:
1032 description: Text to set as description for this repository.
1033 """
1034 raise NotImplementedError(self.set_description)
1036 def get_rebase_state_manager(self) -> "RebaseStateManager":
1037 """Get the appropriate rebase state manager for this repository.
1039 Returns: RebaseStateManager instance
1040 """
1041 raise NotImplementedError(self.get_rebase_state_manager)
1043 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
1044 """Return a BlobNormalizer object for checkin/checkout operations.
1046 Returns: BlobNormalizer instance
1047 """
1048 raise NotImplementedError(self.get_blob_normalizer)
1050 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
1051 """Read gitattributes for the repository.
1053 Args:
1054 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
1056 Returns:
1057 GitAttributes object that can be used to match paths
1058 """
1059 raise NotImplementedError(self.get_gitattributes)
1061 def get_config_stack(self) -> "StackedConfig":
1062 """Return a config stack for this repository.
1064 This stack accesses the configuration for both this repository
1065 itself (.git/config) and the global configuration, which usually
1066 lives in ~/.gitconfig.
1068 Returns: `Config` instance for this repository
1069 """
1070 from .config import ConfigFile, StackedConfig
1072 local_config = self.get_config()
1073 backends: list[ConfigFile] = [local_config]
1074 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
1075 backends.append(self.get_worktree_config())
1077 backends += StackedConfig.default_backends()
1078 return StackedConfig(backends, writable=local_config)
1080 def get_shallow(self) -> set[ObjectID]:
1081 """Get the set of shallow commits.
1083 Returns: Set of shallow commits.
1084 """
1085 f = self.get_named_file("shallow")
1086 if f is None:
1087 return set()
1088 with f:
1089 return {ObjectID(line.strip()) for line in f}
1091 def update_shallow(
1092 self, new_shallow: set[ObjectID] | None, new_unshallow: set[ObjectID] | None
1093 ) -> None:
1094 """Update the list of shallow objects.
1096 Args:
1097 new_shallow: Newly shallow objects
1098 new_unshallow: Newly no longer shallow objects
1099 """
1100 shallow = self.get_shallow()
1101 if new_shallow:
1102 shallow.update(new_shallow)
1103 if new_unshallow:
1104 shallow.difference_update(new_unshallow)
1105 if shallow:
1106 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow]))
1107 else:
1108 self._del_named_file("shallow")
1110 def get_peeled(self, ref: Ref) -> ObjectID:
1111 """Get the peeled value of a ref.
1113 Args:
1114 ref: The refname to peel.
1115 Returns: The fully-peeled SHA1 of a tag object, after peeling all
1116 intermediate tags; if the original ref does not point to a tag,
1117 this will equal the original SHA1.
1118 """
1119 cached = self.refs.get_peeled(ref)
1120 if cached is not None:
1121 return cached
1122 return peel_sha(self.object_store, self.refs[ref])[1].id
1124 @property
1125 def notes(self) -> "Notes":
1126 """Access notes functionality for this repository.
1128 Returns:
1129 Notes object for accessing notes
1130 """
1131 from .notes import Notes
1133 return Notes(self.object_store, self.refs)
1135 def get_walker(
1136 self,
1137 include: Sequence[ObjectID] | None = None,
1138 exclude: Sequence[ObjectID] | None = None,
1139 order: str = "date",
1140 reverse: bool = False,
1141 max_entries: int | None = None,
1142 paths: Sequence[bytes] | None = None,
1143 rename_detector: "RenameDetector | None" = None,
1144 follow: bool = False,
1145 since: int | None = None,
1146 until: int | None = None,
1147 queue_cls: type | None = None,
1148 ) -> "Walker":
1149 """Obtain a walker for this repository.
1151 Args:
1152 include: Iterable of SHAs of commits to include along with their
1153 ancestors. Defaults to [HEAD]
1154 exclude: Iterable of SHAs of commits to exclude along with their
1155 ancestors, overriding includes.
1156 order: ORDER_* constant specifying the order of results.
1157 Anything other than ORDER_DATE may result in O(n) memory usage.
1158 reverse: If True, reverse the order of output, requiring O(n)
1159 memory.
1160 max_entries: The maximum number of entries to yield, or None for
1161 no limit.
1162 paths: Iterable of file or subtree paths to show entries for.
1163 rename_detector: diff.RenameDetector object for detecting
1164 renames.
1165 follow: If True, follow path across renames/copies. Forces a
1166 default rename_detector.
1167 since: Timestamp to list commits after.
1168 until: Timestamp to list commits before.
1169 queue_cls: A class to use for a queue of commits, supporting the
1170 iterator protocol. The constructor takes a single argument, the Walker.
1172 Returns: A `Walker` object
1173 """
1174 from .walk import Walker, _CommitTimeQueue
1176 if include is None:
1177 include = [self.head()]
1179 # Pass all arguments to Walker explicitly to avoid type issues with **kwargs
1180 return Walker(
1181 self.object_store,
1182 include,
1183 exclude=exclude,
1184 order=order,
1185 reverse=reverse,
1186 max_entries=max_entries,
1187 paths=paths,
1188 rename_detector=rename_detector,
1189 follow=follow,
1190 since=since,
1191 until=until,
1192 get_parents=lambda commit: self.get_parents(commit.id, commit),
1193 queue_cls=queue_cls if queue_cls is not None else _CommitTimeQueue,
1194 )
1196 def __getitem__(self, name: ObjectID | Ref | bytes) -> "ShaFile":
1197 """Retrieve a Git object by SHA1 or ref.
1199 Args:
1200 name: A Git object SHA1 or a ref name
1201 Returns: A `ShaFile` object, such as a Commit or Blob
1202 Raises:
1203 KeyError: when the specified ref or object does not exist
1204 """
1205 if not isinstance(name, bytes):
1206 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}")
1207 # If it looks like a ref name, only try refs
1208 if name == b"HEAD" or name.startswith(b"refs/"):
1209 try:
1210 return self.object_store[self.refs[Ref(name)]]
1211 except (RefFormatError, KeyError):
1212 pass
1213 # Otherwise, try as object ID if length matches
1214 if len(name) in (
1215 self.object_store.object_format.oid_length,
1216 self.object_store.object_format.hex_length,
1217 ):
1218 try:
1219 return self.object_store[
1220 ObjectID(name)
1221 if len(name) == self.object_store.object_format.hex_length
1222 else RawObjectID(name)
1223 ]
1224 except (KeyError, ValueError):
1225 pass
1226 # If nothing worked, raise KeyError
1227 raise KeyError(name)
1229 def __contains__(self, name: bytes) -> bool:
1230 """Check if a specific Git object or ref is present.
1232 Args:
1233 name: Git object SHA1/SHA256 or ref name
1234 """
1235 if len(name) == 20:
1236 return RawObjectID(name) in self.object_store or Ref(name) in self.refs
1237 elif len(name) == 40 and valid_hexsha(name):
1238 return ObjectID(name) in self.object_store or Ref(name) in self.refs
1239 # Check if it's a binary or hex SHA
1240 if len(name) == self.object_format.oid_length:
1241 return RawObjectID(name) in self.object_store or Ref(name) in self.refs
1242 elif len(name) == self.object_format.hex_length and valid_hexsha(name):
1243 return ObjectID(name) in self.object_store or Ref(name) in self.refs
1244 else:
1245 return Ref(name) in self.refs
1247 def __setitem__(self, name: bytes, value: ShaFile | bytes) -> None:
1248 """Set a ref.
1250 Args:
1251 name: ref name
1252 value: Ref value - either a ShaFile object, or a hex sha
1253 """
1254 if name.startswith(b"refs/") or name == HEADREF:
1255 ref_name = Ref(name)
1256 if isinstance(value, ShaFile):
1257 self.refs[ref_name] = value.id
1258 elif isinstance(value, bytes):
1259 self.refs[ref_name] = ObjectID(value)
1260 else:
1261 raise TypeError(value)
1262 else:
1263 raise ValueError(name)
1265 def __delitem__(self, name: bytes) -> None:
1266 """Remove a ref.
1268 Args:
1269 name: Name of the ref to remove
1270 """
1271 if name.startswith(b"refs/") or name == HEADREF:
1272 del self.refs[Ref(name)]
1273 else:
1274 raise ValueError(name)
1276 def _get_user_identity(
1277 self, config: "StackedConfig", kind: str | None = None
1278 ) -> bytes:
1279 """Determine the identity to use for new commits."""
1280 warnings.warn(
1281 "use get_user_identity() rather than Repo._get_user_identity",
1282 DeprecationWarning,
1283 )
1284 return get_user_identity(config)
1286 def _add_graftpoints(
1287 self, updated_graftpoints: dict[ObjectID, list[ObjectID]]
1288 ) -> None:
1289 """Add or modify graftpoints.
1291 Args:
1292 updated_graftpoints: Dict of commit shas to list of parent shas
1293 """
1294 # Simple validation
1295 for commit, parents in updated_graftpoints.items():
1296 for sha in [commit, *parents]:
1297 check_hexsha(sha, "Invalid graftpoint")
1299 self._graftpoints.update(updated_graftpoints)
1301 def _remove_graftpoints(self, to_remove: Sequence[ObjectID] = ()) -> None:
1302 """Remove graftpoints.
1304 Args:
1305 to_remove: List of commit shas
1306 """
1307 for sha in to_remove:
1308 del self._graftpoints[sha]
1310 def _read_heads(self, name: str) -> list[ObjectID]:
1311 f = self.get_named_file(name)
1312 if f is None:
1313 return []
1314 with f:
1315 return [ObjectID(line.strip()) for line in f.readlines() if line.strip()]
1317 def get_worktree(self) -> "WorkTree":
1318 """Get the working tree for this repository.
1320 Returns:
1321 WorkTree instance for performing working tree operations
1323 Raises:
1324 NotImplementedError: If the repository doesn't support working trees
1325 """
1326 raise NotImplementedError(
1327 "Working tree operations not supported by this repository type"
1328 )
1331def read_gitfile(f: BinaryIO) -> str:
1332 """Read a ``.git`` file.
1334 The first line of the file should start with "gitdir: "
1336 Args:
1337 f: File-like object to read from
1338 Returns: A path
1339 """
1340 cs = f.read()
1341 if not cs.startswith(b"gitdir: "):
1342 raise ValueError("Expected file to start with 'gitdir: '")
1343 return cs[len(b"gitdir: ") :].rstrip(b"\r\n").decode("utf-8")
1346class UnsupportedVersion(Exception):
1347 """Unsupported repository version."""
1349 def __init__(self, version: int) -> None:
1350 """Initialize UnsupportedVersion exception.
1352 Args:
1353 version: The unsupported repository version
1354 """
1355 self.version = version
1358class UnsupportedExtension(Exception):
1359 """Unsupported repository extension."""
1361 def __init__(self, extension: str) -> None:
1362 """Initialize UnsupportedExtension exception.
1364 Args:
1365 extension: The unsupported repository extension
1366 """
1367 self.extension = extension
1370class Repo(BaseRepo):
1371 """A git repository backed by local disk.
1373 To open an existing repository, call the constructor with
1374 the path of the repository.
1376 To create a new repository, use the Repo.init class method.
1378 Note that a repository object may hold on to resources such
1379 as file handles for performance reasons; call .close() to free
1380 up those resources.
1382 Attributes:
1383 path: Path to the working copy (if it exists) or repository control
1384 directory (if the repository is bare)
1385 bare: Whether this is a bare repository
1386 """
1388 path: str
1389 bare: bool
1390 object_store: DiskObjectStore
1391 filter_context: "FilterContext | None"
1393 def __init__(
1394 self,
1395 root: str | bytes | os.PathLike[str],
1396 object_store: PackBasedObjectStore | None = None,
1397 bare: bool | None = None,
1398 ) -> None:
1399 """Open a repository on disk.
1401 Args:
1402 root: Path to the repository's root.
1403 object_store: ObjectStore to use; if omitted, we use the
1404 repository's default object store
1405 bare: True if this is a bare repository.
1406 """
1407 root = os.fspath(root)
1408 if isinstance(root, bytes):
1409 root = os.fsdecode(root)
1410 hidden_path = os.path.join(root, CONTROLDIR)
1411 if bare is None:
1412 if os.path.isfile(hidden_path) or os.path.isdir(
1413 os.path.join(hidden_path, OBJECTDIR)
1414 ):
1415 bare = False
1416 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(
1417 os.path.join(root, REFSDIR)
1418 ):
1419 bare = True
1420 else:
1421 raise NotGitRepository(
1422 "No git repository was found at {path}".format(**dict(path=root))
1423 )
1425 self.bare = bare
1426 if bare is False:
1427 if os.path.isfile(hidden_path):
1428 with open(hidden_path, "rb") as f:
1429 path = read_gitfile(f)
1430 self._controldir = os.path.join(root, path)
1431 else:
1432 self._controldir = hidden_path
1433 else:
1434 self._controldir = root
1435 commondir = self.get_named_file(COMMONDIR)
1436 if commondir is not None:
1437 with commondir:
1438 self._commondir = os.path.join(
1439 self.controldir(),
1440 os.fsdecode(commondir.read().rstrip(b"\r\n")),
1441 )
1442 else:
1443 self._commondir = self._controldir
1444 self.path = root
1446 # Initialize refs early so they're available for config condition matchers
1447 self.refs = DiskRefsContainer(
1448 self.commondir(), self._controldir, logger=self._write_reflog
1449 )
1451 # Initialize worktrees container
1452 from .worktree import WorkTreeContainer
1454 self.worktrees = WorkTreeContainer(self)
1456 config = self.get_config()
1457 try:
1458 repository_format_version = config.get("core", "repositoryformatversion")
1459 format_version = (
1460 0
1461 if repository_format_version is None
1462 else int(repository_format_version)
1463 )
1464 except KeyError:
1465 format_version = 0
1467 if format_version not in (0, 1):
1468 raise UnsupportedVersion(format_version)
1470 # Track extensions we encounter
1471 has_reftable_extension = False
1472 for extension, value in config.items((b"extensions",)):
1473 if extension.lower() == b"refstorage":
1474 if value == b"reftable":
1475 has_reftable_extension = True
1476 else:
1477 raise UnsupportedExtension(f"refStorage = {value.decode()}")
1478 elif extension.lower() not in (
1479 b"worktreeconfig",
1480 b"objectformat",
1481 b"relativeworktrees",
1482 ):
1483 raise UnsupportedExtension(extension.decode("utf-8"))
1485 if object_store is None:
1486 # Get shared repository permissions from config
1487 try:
1488 shared_value = config.get(("core",), "sharedRepository")
1489 file_mode, dir_mode = parse_shared_repository(shared_value)
1490 except KeyError:
1491 file_mode, dir_mode = None, None
1493 object_store = DiskObjectStore.from_config(
1494 os.path.join(self.commondir(), OBJECTDIR),
1495 config,
1496 file_mode=file_mode,
1497 dir_mode=dir_mode,
1498 )
1500 # Use reftable if extension is configured
1501 if has_reftable_extension:
1502 from .reftable import ReftableRefsContainer
1504 self.refs = ReftableRefsContainer(self.commondir())
1505 # Update worktrees container after refs change
1506 self.worktrees = WorkTreeContainer(self)
1507 BaseRepo.__init__(self, object_store, self.refs)
1509 # Determine hash algorithm from config if not already set
1510 if self.object_format is None:
1511 from .object_format import DEFAULT_OBJECT_FORMAT, get_object_format
1513 if format_version == 1:
1514 try:
1515 object_format = config.get((b"extensions",), b"objectformat")
1516 self.object_format = get_object_format(
1517 object_format.decode("ascii")
1518 )
1519 except KeyError:
1520 self.object_format = DEFAULT_OBJECT_FORMAT
1521 else:
1522 self.object_format = DEFAULT_OBJECT_FORMAT
1524 self._graftpoints = {}
1525 graft_file = self.get_named_file(
1526 os.path.join("info", "grafts"), basedir=self.commondir()
1527 )
1528 if graft_file:
1529 with graft_file:
1530 self._graftpoints.update(parse_graftpoints(graft_file))
1531 graft_file = self.get_named_file("shallow", basedir=self.commondir())
1532 if graft_file:
1533 with graft_file:
1534 self._graftpoints.update(parse_graftpoints(graft_file))
1536 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir())
1537 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir())
1538 self.hooks["post-commit"] = PostCommitShellHook(self.controldir())
1539 self.hooks["pre-receive"] = PreReceiveShellHook(self.controldir())
1540 self.hooks["update"] = UpdateShellHook(self.controldir())
1541 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir())
1543 # Initialize filter context as None, will be created lazily
1544 self.filter_context = None
1546 def get_worktree(self) -> "WorkTree":
1547 """Get the working tree for this repository.
1549 Returns:
1550 WorkTree instance for performing working tree operations
1551 """
1552 from .worktree import WorkTree
1554 return WorkTree(self, self.path)
1556 def _write_reflog(
1557 self,
1558 ref: bytes,
1559 old_sha: bytes,
1560 new_sha: bytes,
1561 committer: bytes | None,
1562 timestamp: int | None,
1563 timezone: int | None,
1564 message: bytes,
1565 ) -> None:
1566 from .reflog import format_reflog_line
1568 path = self._reflog_path(ref)
1570 # Get shared repository permissions
1571 file_mode, dir_mode = self._get_shared_repository_permissions()
1573 # Create directory with appropriate permissions
1574 parent_dir = os.path.dirname(path)
1575 # Create directory tree, setting permissions on each level if needed
1576 parts = []
1577 current = parent_dir
1578 while current and not os.path.exists(current):
1579 parts.append(current)
1580 current = os.path.dirname(current)
1581 parts.reverse()
1582 for part in parts:
1583 os.mkdir(part)
1584 if dir_mode is not None:
1585 os.chmod(part, dir_mode)
1586 if committer is None:
1587 config = self.get_config_stack()
1588 committer = get_user_identity(config)
1589 check_user_identity(committer)
1590 if timestamp is None:
1591 timestamp = int(time.time())
1592 if timezone is None:
1593 timezone = 0 # FIXME
1594 with open(path, "ab") as f:
1595 f.write(
1596 format_reflog_line(
1597 old_sha, new_sha, committer, timestamp, timezone, message
1598 )
1599 + b"\n"
1600 )
1602 # Set file permissions (open() respects umask, so we need chmod to set the actual mode)
1603 # Always chmod to ensure correct permissions even if file already existed
1604 if file_mode is not None:
1605 os.chmod(path, file_mode)
1607 def _reflog_path(self, ref: bytes) -> str:
1608 if ref.startswith((b"main-worktree/", b"worktrees/")):
1609 raise NotImplementedError(f"refs {ref.decode()} are not supported")
1611 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir()
1612 return os.path.join(base, "logs", os.fsdecode(ref))
1614 def read_reflog(self, ref: bytes) -> Generator[reflog.Entry, None, None]:
1615 """Read reflog entries for a reference.
1617 Args:
1618 ref: Reference name (e.g. b'HEAD', b'refs/heads/master')
1620 Yields:
1621 reflog.Entry objects in chronological order (oldest first)
1622 """
1623 from .reflog import read_reflog
1625 path = self._reflog_path(ref)
1626 try:
1627 with open(path, "rb") as f:
1628 yield from read_reflog(f)
1629 except FileNotFoundError:
1630 return
1632 @classmethod
1633 def discover(cls, start: str | bytes | os.PathLike[str] = ".") -> "Repo":
1634 """Iterate parent directories to discover a repository.
1636 Return a Repo object for the first parent directory that looks like a
1637 Git repository.
1639 Args:
1640 start: The directory to start discovery from (defaults to '.')
1641 """
1642 path = os.path.abspath(start)
1643 while True:
1644 try:
1645 return cls(path)
1646 except NotGitRepository:
1647 new_path, _tail = os.path.split(path)
1648 if new_path == path: # Root reached
1649 break
1650 path = new_path
1651 start_str = os.fspath(start)
1652 if isinstance(start_str, bytes):
1653 start_str = start_str.decode("utf-8")
1654 raise NotGitRepository(f"No git repository was found at {start_str}")
1656 def controldir(self) -> str:
1657 """Return the path of the control directory."""
1658 return self._controldir
1660 def commondir(self) -> str:
1661 """Return the path of the common directory.
1663 For a main working tree, it is identical to controldir().
1665 For a linked working tree, it is the control directory of the
1666 main working tree.
1667 """
1668 return self._commondir
1670 def _determine_file_mode(self) -> bool:
1671 """Probe the file-system to determine whether permissions can be trusted.
1673 Returns: True if permissions can be trusted, False otherwise.
1674 """
1675 fname = os.path.join(self.path, ".probe-permissions")
1676 with open(fname, "w") as f:
1677 f.write("")
1679 st1 = os.lstat(fname)
1680 try:
1681 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR)
1682 except PermissionError:
1683 return False
1684 st2 = os.lstat(fname)
1686 os.unlink(fname)
1688 mode_differs = st1.st_mode != st2.st_mode
1689 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0
1691 return mode_differs and st2_has_exec
1693 def _determine_symlinks(self) -> bool:
1694 """Probe the filesystem to determine whether symlinks can be created.
1696 Returns: True if symlinks can be created, False otherwise.
1697 """
1698 # TODO(jelmer): Actually probe disk / look at filesystem
1699 return sys.platform != "win32"
1701 def _get_shared_repository_permissions(
1702 self,
1703 ) -> tuple[int | None, int | None]:
1704 """Get shared repository file and directory permissions from config.
1706 Returns:
1707 tuple of (file_mask, directory_mask) or (None, None) if not shared
1708 """
1709 try:
1710 config = self.get_config()
1711 value = config.get(("core",), "sharedRepository")
1712 return parse_shared_repository(value)
1713 except KeyError:
1714 return (None, None)
1716 def _put_named_file(self, path: str, contents: bytes) -> None:
1717 """Write a file to the control dir with the given name and contents.
1719 Args:
1720 path: The path to the file, relative to the control dir.
1721 contents: A string to write to the file.
1722 """
1723 path = path.lstrip(os.path.sep)
1725 # Get shared repository permissions
1726 file_mode, _ = self._get_shared_repository_permissions()
1728 # Create file with appropriate permissions
1729 if file_mode is not None:
1730 with GitFile(
1731 os.path.join(self.controldir(), path), "wb", mask=file_mode
1732 ) as f:
1733 f.write(contents)
1734 else:
1735 with GitFile(os.path.join(self.controldir(), path), "wb") as f:
1736 f.write(contents)
1738 def _del_named_file(self, path: str) -> None:
1739 try:
1740 os.unlink(os.path.join(self.controldir(), path))
1741 except FileNotFoundError:
1742 return
1744 def get_named_file(
1745 self,
1746 path: str | bytes,
1747 basedir: str | None = None,
1748 ) -> BinaryIO | None:
1749 """Get a file from the control dir with a specific name.
1751 Although the filename should be interpreted as a filename relative to
1752 the control dir in a disk-based Repo, the object returned need not be
1753 pointing to a file in that location.
1755 Args:
1756 path: The path to the file, relative to the control dir.
1757 basedir: Optional argument that specifies an alternative to the
1758 control dir.
1759 Returns: An open file object, or None if the file does not exist.
1760 """
1761 # TODO(dborowitz): sanitize filenames, since this is used directly by
1762 # the dumb web serving code.
1763 if basedir is None:
1764 basedir = self.controldir()
1765 if isinstance(path, bytes):
1766 path = path.decode("utf-8")
1767 path = path.lstrip(os.path.sep)
1768 try:
1769 return open(os.path.join(basedir, path), "rb")
1770 except FileNotFoundError:
1771 return None
1773 def index_path(self) -> str:
1774 """Return path to the index file."""
1775 return os.path.join(self.controldir(), INDEX_FILENAME)
1777 def open_index(self) -> "Index":
1778 """Open the index for this repository.
1780 Raises:
1781 NoIndexPresent: If no index is present
1782 Returns: The matching `Index`
1783 """
1784 from .index import Index
1786 if not self.has_index():
1787 raise NoIndexPresent
1789 # Check for manyFiles feature configuration
1790 config = self.get_config_stack()
1791 many_files = config.get_boolean(b"feature", b"manyFiles", False)
1792 skip_hash = False
1793 index_version = None
1795 if many_files:
1796 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true
1797 try:
1798 index_version_str = config.get(b"index", b"version")
1799 index_version = int(index_version_str)
1800 except KeyError:
1801 index_version = 4 # Default to version 4 for manyFiles
1802 skip_hash = config.get_boolean(b"index", b"skipHash", True)
1803 else:
1804 # Check for explicit index settings
1805 try:
1806 index_version_str = config.get(b"index", b"version")
1807 index_version = int(index_version_str)
1808 except KeyError:
1809 index_version = None
1810 skip_hash = config.get_boolean(b"index", b"skipHash", False)
1812 # Get shared repository permissions for index file
1813 file_mode, _ = self._get_shared_repository_permissions()
1815 return Index(
1816 self.index_path(),
1817 skip_hash=skip_hash,
1818 version=index_version,
1819 file_mode=file_mode,
1820 )
1822 def has_index(self) -> bool:
1823 """Check if an index is present."""
1824 # Bare repos must never have index files; non-bare repos may have a
1825 # missing index file, which is treated as empty.
1826 return not self.bare
1828 def clone(
1829 self,
1830 target_path: str | bytes | os.PathLike[str],
1831 *,
1832 mkdir: bool = True,
1833 bare: bool = False,
1834 origin: bytes = b"origin",
1835 checkout: bool | None = None,
1836 branch: bytes | None = None,
1837 progress: Callable[[str], None] | None = None,
1838 depth: int | None = None,
1839 symlinks: bool | None = None,
1840 ) -> "Repo":
1841 """Clone this repository.
1843 Args:
1844 target_path: Target path
1845 mkdir: Create the target directory
1846 bare: Whether to create a bare repository
1847 checkout: Whether or not to check-out HEAD after cloning
1848 origin: Base name for refs in target repository
1849 cloned from this repository
1850 branch: Optional branch or tag to be used as HEAD in the new repository
1851 instead of this repository's HEAD.
1852 progress: Optional progress function
1853 depth: Depth at which to fetch
1854 symlinks: Symlinks setting (default to autodetect)
1855 Returns: Created repository as `Repo`
1856 """
1857 encoded_path = os.fsencode(self.path)
1859 if mkdir:
1860 os.mkdir(target_path)
1862 try:
1863 if not bare:
1864 target = Repo.init(target_path, symlinks=symlinks)
1865 if checkout is None:
1866 checkout = True
1867 else:
1868 if checkout:
1869 raise ValueError("checkout and bare are incompatible")
1870 target = Repo.init_bare(target_path)
1872 try:
1873 target_config = target.get_config()
1874 target_config.set((b"remote", origin), b"url", encoded_path)
1875 target_config.set(
1876 (b"remote", origin),
1877 b"fetch",
1878 b"+refs/heads/*:refs/remotes/" + origin + b"/*",
1879 )
1880 target_config.write_to_path()
1882 ref_message = b"clone: from " + encoded_path
1883 self.fetch(target, depth=depth)
1884 target.refs.import_refs(
1885 Ref(b"refs/remotes/" + origin),
1886 self.refs.as_dict(Ref(b"refs/heads")),
1887 message=ref_message,
1888 )
1889 target.refs.import_refs(
1890 Ref(b"refs/tags"),
1891 self.refs.as_dict(Ref(b"refs/tags")),
1892 message=ref_message,
1893 )
1895 head_chain, origin_sha = self.refs.follow(HEADREF)
1896 origin_head = head_chain[-1] if head_chain else None
1897 if origin_sha and not origin_head:
1898 # set detached HEAD
1899 target.refs[HEADREF] = origin_sha
1900 else:
1901 _set_origin_head(target.refs, origin, origin_head)
1902 head_ref = _set_default_branch(
1903 target.refs, origin, origin_head, branch, ref_message
1904 )
1906 # Update target head
1907 if head_ref:
1908 head = _set_head(target.refs, head_ref, ref_message)
1909 else:
1910 head = None
1912 if checkout and head is not None:
1913 target.get_worktree().reset_index()
1914 except BaseException:
1915 target.close()
1916 raise
1917 except BaseException:
1918 if mkdir:
1919 import shutil
1921 shutil.rmtree(target_path)
1922 raise
1923 return target
1925 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]:
1926 """Get condition matchers for includeIf conditions.
1928 Returns a dict of condition prefix to matcher function.
1929 """
1930 from pathlib import Path
1932 from .config import ConditionMatcher, match_glob_pattern
1934 # Add gitdir matchers
1935 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool:
1936 """Match gitdir against a pattern.
1938 Args:
1939 pattern: Pattern to match against
1940 case_sensitive: Whether to match case-sensitively
1942 Returns:
1943 True if gitdir matches pattern
1944 """
1945 # Handle relative patterns (starting with ./)
1946 if pattern.startswith("./"):
1947 # Can't handle relative patterns without config directory context
1948 return False
1950 # Normalize repository path
1951 try:
1952 repo_path = str(Path(self._controldir).resolve())
1953 except (OSError, ValueError):
1954 return False
1956 # Expand ~ in pattern and normalize
1957 pattern = os.path.expanduser(pattern)
1959 # Normalize pattern following Git's rules
1960 pattern = pattern.replace("\\", "/")
1961 if not pattern.startswith(("~/", "./", "/", "**")):
1962 # Check for Windows absolute path
1963 if len(pattern) >= 2 and pattern[1] == ":":
1964 pass
1965 else:
1966 pattern = "**/" + pattern
1967 if pattern.endswith("/"):
1968 pattern = pattern + "**"
1970 # Use the existing _match_gitdir_pattern function
1971 from .config import _match_gitdir_pattern
1973 pattern_bytes = pattern.encode("utf-8", errors="replace")
1974 repo_path_bytes = repo_path.encode("utf-8", errors="replace")
1976 return _match_gitdir_pattern(
1977 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive
1978 )
1980 # Add onbranch matcher
1981 def match_onbranch(pattern: str) -> bool:
1982 """Match current branch against a pattern.
1984 Args:
1985 pattern: Pattern to match against
1987 Returns:
1988 True if current branch matches pattern
1989 """
1990 try:
1991 # Get the current branch using refs
1992 ref_chain, _ = self.refs.follow(HEADREF)
1993 head_ref = ref_chain[-1] # Get the final resolved ref
1994 except KeyError:
1995 pass
1996 else:
1997 if head_ref and head_ref.startswith(b"refs/heads/"):
1998 # Extract branch name from ref
1999 branch = extract_branch_name(head_ref).decode(
2000 "utf-8", errors="replace"
2001 )
2002 return match_glob_pattern(branch, pattern)
2003 return False
2005 matchers: dict[str, ConditionMatcher] = {
2006 "onbranch:": match_onbranch,
2007 "gitdir:": lambda pattern: match_gitdir(pattern, True),
2008 "gitdir/i:": lambda pattern: match_gitdir(pattern, False),
2009 }
2011 return matchers
2013 def get_worktree_config(self) -> "ConfigFile":
2014 """Get the worktree-specific config.
2016 Returns:
2017 ConfigFile object for the worktree config
2018 """
2019 from .config import ConfigFile
2021 path = os.path.join(self.commondir(), "config.worktree")
2022 try:
2023 # Pass condition matchers for includeIf evaluation
2024 condition_matchers = self._get_config_condition_matchers()
2025 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
2026 except FileNotFoundError:
2027 cf = ConfigFile()
2028 cf.path = path
2029 return cf
2031 def get_config(self) -> "ConfigFile":
2032 """Retrieve the config object.
2034 Returns: `ConfigFile` object for the ``.git/config`` file.
2035 """
2036 from .config import ConfigFile
2038 path = os.path.join(self._commondir, "config")
2039 try:
2040 # Pass condition matchers for includeIf evaluation
2041 condition_matchers = self._get_config_condition_matchers()
2042 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
2043 except FileNotFoundError:
2044 ret = ConfigFile()
2045 ret.path = path
2046 return ret
2048 def get_rebase_state_manager(self) -> "RebaseStateManager":
2049 """Get the appropriate rebase state manager for this repository.
2051 Returns: DiskRebaseStateManager instance
2052 """
2053 import os
2055 from .rebase import DiskRebaseStateManager
2057 path = os.path.join(self.controldir(), "rebase-merge")
2058 return DiskRebaseStateManager(path)
2060 def get_description(self) -> bytes | None:
2061 """Retrieve the description of this repository.
2063 Returns: Description as bytes or None.
2064 """
2065 path = os.path.join(self._controldir, "description")
2066 try:
2067 with GitFile(path, "rb") as f:
2068 return f.read()
2069 except FileNotFoundError:
2070 return None
2072 def __repr__(self) -> str:
2073 """Return string representation of this repository."""
2074 return f"<Repo at {self.path!r}>"
2076 def set_description(self, description: bytes) -> None:
2077 """Set the description for this repository.
2079 Args:
2080 description: Text to set as description for this repository.
2081 """
2082 self._put_named_file("description", description)
2084 @classmethod
2085 def _init_maybe_bare(
2086 cls,
2087 path: str | bytes | os.PathLike[str],
2088 controldir: str | bytes | os.PathLike[str],
2089 bare: bool,
2090 object_store: PackBasedObjectStore | None = None,
2091 config: "StackedConfig | None" = None,
2092 default_branch: bytes | None = None,
2093 symlinks: bool | None = None,
2094 format: int | None = None,
2095 shared_repository: str | bool | None = None,
2096 object_format: str | None = None,
2097 ) -> "Repo":
2098 path = os.fspath(path)
2099 if isinstance(path, bytes):
2100 path = os.fsdecode(path)
2101 controldir = os.fspath(controldir)
2102 if isinstance(controldir, bytes):
2103 controldir = os.fsdecode(controldir)
2105 # Determine shared repository permissions early
2106 file_mode: int | None = None
2107 dir_mode: int | None = None
2108 if shared_repository is not None:
2109 file_mode, dir_mode = parse_shared_repository(shared_repository)
2111 # Create base directories with appropriate permissions
2112 for d in BASE_DIRECTORIES:
2113 dir_path = os.path.join(controldir, *d)
2114 os.mkdir(dir_path)
2115 if dir_mode is not None:
2116 os.chmod(dir_path, dir_mode)
2118 # Determine hash algorithm
2119 from .object_format import get_object_format
2121 hash_alg = get_object_format(object_format)
2123 if object_store is None:
2124 object_store = DiskObjectStore.init(
2125 os.path.join(controldir, OBJECTDIR),
2126 file_mode=file_mode,
2127 dir_mode=dir_mode,
2128 object_format=hash_alg,
2129 )
2130 ret = cls(path, bare=bare, object_store=object_store)
2131 if default_branch is None:
2132 if config is None:
2133 from .config import StackedConfig
2135 config = StackedConfig.default()
2136 try:
2137 default_branch = config.get("init", "defaultBranch")
2138 except KeyError:
2139 default_branch = DEFAULT_BRANCH
2140 ret.refs.set_symbolic_ref(HEADREF, local_branch_name(default_branch))
2141 ret._init_files(
2142 bare=bare,
2143 symlinks=symlinks,
2144 format=format,
2145 shared_repository=shared_repository,
2146 object_format=object_format,
2147 )
2148 return ret
2150 @classmethod
2151 def init(
2152 cls,
2153 path: str | bytes | os.PathLike[str],
2154 *,
2155 mkdir: bool = False,
2156 config: "StackedConfig | None" = None,
2157 default_branch: bytes | None = None,
2158 symlinks: bool | None = None,
2159 format: int | None = None,
2160 shared_repository: str | bool | None = None,
2161 object_format: str | None = None,
2162 ) -> "Repo":
2163 """Create a new repository.
2165 Args:
2166 path: Path in which to create the repository
2167 mkdir: Whether to create the directory
2168 config: Configuration object
2169 default_branch: Default branch name
2170 symlinks: Whether to support symlinks
2171 format: Repository format version (defaults to 0)
2172 shared_repository: Shared repository setting (group, all, umask, or octal)
2173 object_format: Object format to use ("sha1" or "sha256", defaults to "sha1")
2174 Returns: `Repo` instance
2175 """
2176 path = os.fspath(path)
2177 if isinstance(path, bytes):
2178 path = os.fsdecode(path)
2179 if mkdir:
2180 os.mkdir(path)
2181 controldir = os.path.join(path, CONTROLDIR)
2182 os.mkdir(controldir)
2183 _set_filesystem_hidden(controldir)
2184 return cls._init_maybe_bare(
2185 path,
2186 controldir,
2187 False,
2188 config=config,
2189 default_branch=default_branch,
2190 symlinks=symlinks,
2191 format=format,
2192 shared_repository=shared_repository,
2193 object_format=object_format,
2194 )
2196 @classmethod
2197 def _init_new_working_directory(
2198 cls,
2199 path: str | bytes | os.PathLike[str],
2200 main_repo: "Repo",
2201 identifier: str | None = None,
2202 mkdir: bool = False,
2203 ) -> "Repo":
2204 """Create a new working directory linked to a repository.
2206 Args:
2207 path: Path in which to create the working tree.
2208 main_repo: Main repository to reference
2209 identifier: Worktree identifier
2210 mkdir: Whether to create the directory
2211 Returns: `Repo` instance
2212 """
2213 path = os.fspath(path)
2214 if isinstance(path, bytes):
2215 path = os.fsdecode(path)
2216 if mkdir:
2217 os.mkdir(path)
2218 if identifier is None:
2219 identifier = os.path.basename(path)
2220 # Ensure we use absolute path for the worktree control directory
2221 main_controldir = os.path.abspath(main_repo.controldir())
2222 main_worktreesdir = os.path.join(main_controldir, WORKTREES)
2223 worktree_controldir = os.path.join(main_worktreesdir, identifier)
2224 gitdirfile = os.path.join(path, CONTROLDIR)
2225 with open(gitdirfile, "wb") as f:
2226 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n")
2228 # Get shared repository permissions from main repository
2229 _, dir_mode = main_repo._get_shared_repository_permissions()
2231 # Create directories with appropriate permissions
2232 try:
2233 os.mkdir(main_worktreesdir)
2234 if dir_mode is not None:
2235 os.chmod(main_worktreesdir, dir_mode)
2236 except FileExistsError:
2237 pass
2238 try:
2239 os.mkdir(worktree_controldir)
2240 if dir_mode is not None:
2241 os.chmod(worktree_controldir, dir_mode)
2242 except FileExistsError:
2243 pass
2244 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f:
2245 f.write(os.fsencode(gitdirfile) + b"\n")
2246 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f:
2247 f.write(b"../..\n")
2248 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f:
2249 f.write(main_repo.head() + b"\n")
2250 r = cls(os.path.normpath(path))
2251 r.get_worktree().reset_index()
2252 return r
2254 @classmethod
2255 def init_bare(
2256 cls,
2257 path: str | bytes | os.PathLike[str],
2258 *,
2259 mkdir: bool = False,
2260 object_store: PackBasedObjectStore | None = None,
2261 config: "StackedConfig | None" = None,
2262 default_branch: bytes | None = None,
2263 format: int | None = None,
2264 shared_repository: str | bool | None = None,
2265 object_format: str | None = None,
2266 ) -> "Repo":
2267 """Create a new bare repository.
2269 ``path`` should already exist and be an empty directory.
2271 Args:
2272 path: Path to create bare repository in
2273 mkdir: Whether to create the directory
2274 object_store: Object store to use
2275 config: Configuration object
2276 default_branch: Default branch name
2277 format: Repository format version (defaults to 0)
2278 shared_repository: Shared repository setting (group, all, umask, or octal)
2279 object_format: Object format to use ("sha1" or "sha256", defaults to "sha1")
2280 Returns: a `Repo` instance
2281 """
2282 path = os.fspath(path)
2283 if isinstance(path, bytes):
2284 path = os.fsdecode(path)
2285 if mkdir:
2286 os.mkdir(path)
2287 return cls._init_maybe_bare(
2288 path,
2289 path,
2290 True,
2291 object_store=object_store,
2292 config=config,
2293 default_branch=default_branch,
2294 format=format,
2295 shared_repository=shared_repository,
2296 object_format=object_format,
2297 )
2299 create = init_bare
2301 def close(self) -> None:
2302 """Close any files opened by this repository."""
2303 self.object_store.close()
2304 # Clean up filter context if it was created
2305 if self.filter_context is not None:
2306 self.filter_context.close()
2307 self.filter_context = None
2309 def __enter__(self) -> "Repo":
2310 """Enter context manager."""
2311 return self
2313 def __exit__(
2314 self,
2315 exc_type: type[BaseException] | None,
2316 exc_val: BaseException | None,
2317 exc_tb: TracebackType | None,
2318 ) -> None:
2319 """Exit context manager and close repository."""
2320 self.close()
2322 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]:
2323 """Read .gitattributes file from working tree.
2325 Returns:
2326 Dictionary mapping file patterns to attributes
2327 """
2328 gitattributes = {}
2329 gitattributes_path = os.path.join(self.path, ".gitattributes")
2331 if os.path.exists(gitattributes_path):
2332 with open(gitattributes_path, "rb") as f:
2333 for line in f:
2334 line = line.strip()
2335 if not line or line.startswith(b"#"):
2336 continue
2338 parts = line.split()
2339 if len(parts) < 2:
2340 continue
2342 pattern = parts[0]
2343 attrs = {}
2345 for attr in parts[1:]:
2346 if attr.startswith(b"-"):
2347 # Unset attribute
2348 attrs[attr[1:]] = b"false"
2349 elif b"=" in attr:
2350 # Set to value
2351 key, value = attr.split(b"=", 1)
2352 attrs[key] = value
2353 else:
2354 # Set attribute
2355 attrs[attr] = b"true"
2357 gitattributes[pattern] = attrs
2359 return gitattributes
2361 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
2362 """Return a BlobNormalizer object."""
2363 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2365 # Get fresh configuration and GitAttributes
2366 config_stack = self.get_config_stack()
2367 git_attributes = self.get_gitattributes()
2369 # Lazily create FilterContext if needed
2370 if self.filter_context is None:
2371 filter_registry = FilterRegistry(config_stack, self)
2372 self.filter_context = FilterContext(filter_registry)
2373 else:
2374 # Refresh the context with current config to handle config changes
2375 self.filter_context.refresh_config(config_stack)
2377 # Return a new FilterBlobNormalizer with the context
2378 return FilterBlobNormalizer(
2379 config_stack, git_attributes, filter_context=self.filter_context
2380 )
2382 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
2383 """Read gitattributes for the repository.
2385 Args:
2386 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
2388 Returns:
2389 GitAttributes object that can be used to match paths
2390 """
2391 from .attrs import (
2392 GitAttributes,
2393 Pattern,
2394 parse_git_attributes,
2395 )
2397 patterns = []
2399 # Read system gitattributes (TODO: implement this)
2400 # Read global gitattributes (TODO: implement this)
2402 # Read repository .gitattributes from index/tree
2403 if tree is None:
2404 try:
2405 # Try to get from HEAD
2406 head = self[b"HEAD"]
2407 # Peel tags to get to the underlying commit
2408 while isinstance(head, Tag):
2409 _cls, obj = head.object
2410 head = self.get_object(obj)
2411 if not isinstance(head, Commit):
2412 raise ValueError(
2413 f"Expected HEAD to point to a Commit, got {type(head).__name__}. "
2414 f"This usually means HEAD points to a {type(head).__name__} object "
2415 f"instead of a Commit."
2416 )
2417 tree = head.tree
2418 except KeyError:
2419 # No HEAD, no attributes from tree
2420 pass
2422 if tree is not None:
2423 try:
2424 tree_obj = self[tree]
2425 assert isinstance(tree_obj, Tree)
2426 if b".gitattributes" in tree_obj:
2427 _, attrs_sha = tree_obj[b".gitattributes"]
2428 attrs_blob = self[attrs_sha]
2429 if isinstance(attrs_blob, Blob):
2430 attrs_data = BytesIO(attrs_blob.data)
2431 for pattern_bytes, attrs in parse_git_attributes(attrs_data):
2432 pattern = Pattern(pattern_bytes)
2433 patterns.append((pattern, attrs))
2434 except (KeyError, NotTreeError):
2435 pass
2437 # Read .git/info/attributes
2438 info_attrs_path = os.path.join(self.controldir(), "info", "attributes")
2439 if os.path.exists(info_attrs_path):
2440 with open(info_attrs_path, "rb") as f:
2441 for pattern_bytes, attrs in parse_git_attributes(f):
2442 pattern = Pattern(pattern_bytes)
2443 patterns.append((pattern, attrs))
2445 # Read .gitattributes from working directory (if it exists)
2446 working_attrs_path = os.path.join(self.path, ".gitattributes")
2447 if os.path.exists(working_attrs_path):
2448 with open(working_attrs_path, "rb") as f:
2449 for pattern_bytes, attrs in parse_git_attributes(f):
2450 pattern = Pattern(pattern_bytes)
2451 patterns.append((pattern, attrs))
2453 return GitAttributes(patterns)
2456class MemoryRepo(BaseRepo):
2457 """Repo that stores refs, objects, and named files in memory.
2459 MemoryRepos are always bare: they have no working tree and no index, since
2460 those have a stronger dependency on the filesystem.
2461 """
2463 filter_context: "FilterContext | None"
2465 def __init__(self) -> None:
2466 """Create a new repository in memory."""
2467 from .config import ConfigFile
2468 from .object_format import DEFAULT_OBJECT_FORMAT
2470 self._reflog: list[Any] = []
2471 refs_container = DictRefsContainer({}, logger=self._append_reflog)
2472 BaseRepo.__init__(self, MemoryObjectStore(), refs_container)
2473 self._named_files: dict[str, bytes] = {}
2474 self.bare = True
2475 self._config = ConfigFile()
2476 self._description: bytes | None = None
2477 self.filter_context = None
2478 # MemoryRepo defaults to default object format
2479 self.object_format = DEFAULT_OBJECT_FORMAT
2481 def _append_reflog(
2482 self,
2483 ref: bytes,
2484 old_sha: bytes | None,
2485 new_sha: bytes | None,
2486 committer: bytes | None,
2487 timestamp: int | None,
2488 timezone: int | None,
2489 message: bytes | None,
2490 ) -> None:
2491 self._reflog.append(
2492 (ref, old_sha, new_sha, committer, timestamp, timezone, message)
2493 )
2495 def set_description(self, description: bytes) -> None:
2496 """Set the description for this repository.
2498 Args:
2499 description: Text to set as description
2500 """
2501 self._description = description
2503 def get_description(self) -> bytes | None:
2504 """Get the description of this repository.
2506 Returns:
2507 Repository description as bytes
2508 """
2509 return self._description
2511 def _determine_file_mode(self) -> bool:
2512 """Probe the file-system to determine whether permissions can be trusted.
2514 Returns: True if permissions can be trusted, False otherwise.
2515 """
2516 return sys.platform != "win32"
2518 def _determine_symlinks(self) -> bool:
2519 """Probe the file-system to determine whether permissions can be trusted.
2521 Returns: True if permissions can be trusted, False otherwise.
2522 """
2523 return sys.platform != "win32"
2525 def _put_named_file(self, path: str, contents: bytes) -> None:
2526 """Write a file to the control dir with the given name and contents.
2528 Args:
2529 path: The path to the file, relative to the control dir.
2530 contents: A string to write to the file.
2531 """
2532 self._named_files[path] = contents
2534 def _del_named_file(self, path: str) -> None:
2535 try:
2536 del self._named_files[path]
2537 except KeyError:
2538 pass
2540 def get_named_file(
2541 self,
2542 path: str | bytes,
2543 basedir: str | None = None,
2544 ) -> BytesIO | None:
2545 """Get a file from the control dir with a specific name.
2547 Although the filename should be interpreted as a filename relative to
2548 the control dir in a disk-baked Repo, the object returned need not be
2549 pointing to a file in that location.
2551 Args:
2552 path: The path to the file, relative to the control dir.
2553 basedir: Optional base directory for the path
2554 Returns: An open file object, or None if the file does not exist.
2555 """
2556 path_str = path.decode() if isinstance(path, bytes) else path
2557 contents = self._named_files.get(path_str, None)
2558 if contents is None:
2559 return None
2560 return BytesIO(contents)
2562 def open_index(self) -> "Index":
2563 """Fail to open index for this repo, since it is bare.
2565 Raises:
2566 NoIndexPresent: Raised when no index is present
2567 """
2568 raise NoIndexPresent
2570 def _init_config(self, config: "ConfigFile") -> None:
2571 """Initialize repository configuration for MemoryRepo."""
2572 self._config = config
2574 def get_config(self) -> "ConfigFile":
2575 """Retrieve the config object.
2577 Returns: `ConfigFile` object.
2578 """
2579 return self._config
2581 def get_rebase_state_manager(self) -> "RebaseStateManager":
2582 """Get the appropriate rebase state manager for this repository.
2584 Returns: MemoryRebaseStateManager instance
2585 """
2586 from .rebase import MemoryRebaseStateManager
2588 return MemoryRebaseStateManager(self)
2590 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
2591 """Return a BlobNormalizer object for checkin/checkout operations."""
2592 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2594 # Get fresh configuration and GitAttributes
2595 config_stack = self.get_config_stack()
2596 git_attributes = self.get_gitattributes()
2598 # Lazily create FilterContext if needed
2599 if self.filter_context is None:
2600 filter_registry = FilterRegistry(config_stack, self)
2601 self.filter_context = FilterContext(filter_registry)
2602 else:
2603 # Refresh the context with current config to handle config changes
2604 self.filter_context.refresh_config(config_stack)
2606 # Return a new FilterBlobNormalizer with the context
2607 return FilterBlobNormalizer(
2608 config_stack, git_attributes, filter_context=self.filter_context
2609 )
2611 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
2612 """Read gitattributes for the repository."""
2613 from .attrs import GitAttributes
2615 # Memory repos don't have working trees or gitattributes files
2616 # Return empty GitAttributes
2617 return GitAttributes([])
2619 def close(self) -> None:
2620 """Close any resources opened by this repository."""
2621 # Clean up filter context if it was created
2622 if self.filter_context is not None:
2623 self.filter_context.close()
2624 self.filter_context = None
2625 # Close object store to release pack files
2626 self.object_store.close()
2628 def do_commit(
2629 self,
2630 message: bytes | None = None,
2631 committer: bytes | None = None,
2632 author: bytes | None = None,
2633 commit_timestamp: float | None = None,
2634 commit_timezone: int | None = None,
2635 author_timestamp: float | None = None,
2636 author_timezone: int | None = None,
2637 tree: ObjectID | None = None,
2638 encoding: bytes | None = None,
2639 ref: Ref | None = HEADREF,
2640 merge_heads: list[ObjectID] | None = None,
2641 no_verify: bool = False,
2642 sign: bool = False,
2643 ) -> bytes:
2644 """Create a new commit.
2646 This is a simplified implementation for in-memory repositories that
2647 doesn't support worktree operations or hooks.
2649 Args:
2650 message: Commit message
2651 committer: Committer fullname
2652 author: Author fullname
2653 commit_timestamp: Commit timestamp (defaults to now)
2654 commit_timezone: Commit timestamp timezone (defaults to GMT)
2655 author_timestamp: Author timestamp (defaults to commit timestamp)
2656 author_timezone: Author timestamp timezone (defaults to commit timezone)
2657 tree: SHA1 of the tree root to use
2658 encoding: Encoding
2659 ref: Optional ref to commit to (defaults to current branch).
2660 If None, creates a dangling commit without updating any ref.
2661 merge_heads: Merge heads
2662 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo)
2663 sign: GPG Sign the commit (ignored for MemoryRepo)
2665 Returns:
2666 New commit SHA1
2667 """
2668 import time
2670 from .objects import Commit
2672 if tree is None:
2673 raise ValueError("tree must be specified for MemoryRepo")
2675 c = Commit()
2676 if len(tree) != self.object_format.hex_length:
2677 raise ValueError(
2678 f"tree must be a {self.object_format.hex_length}-character hex sha string"
2679 )
2680 c.tree = tree
2682 config = self.get_config_stack()
2683 if merge_heads is None:
2684 merge_heads = []
2685 if committer is None:
2686 committer = get_user_identity(config, kind="COMMITTER")
2687 check_user_identity(committer)
2688 c.committer = committer
2689 if commit_timestamp is None:
2690 commit_timestamp = time.time()
2691 c.commit_time = int(commit_timestamp)
2692 if commit_timezone is None:
2693 commit_timezone = 0
2694 c.commit_timezone = commit_timezone
2695 if author is None:
2696 author = get_user_identity(config, kind="AUTHOR")
2697 c.author = author
2698 check_user_identity(author)
2699 if author_timestamp is None:
2700 author_timestamp = commit_timestamp
2701 c.author_time = int(author_timestamp)
2702 if author_timezone is None:
2703 author_timezone = commit_timezone
2704 c.author_timezone = author_timezone
2705 if encoding is None:
2706 try:
2707 encoding = config.get(("i18n",), "commitEncoding")
2708 except KeyError:
2709 pass
2710 if encoding is not None:
2711 c.encoding = encoding
2713 # Handle message (for MemoryRepo, we don't support callable messages)
2714 if callable(message):
2715 message = message(self, c)
2716 if message is None:
2717 raise ValueError("Message callback returned None")
2719 if message is None:
2720 raise ValueError("No commit message specified")
2722 c.message = message
2724 if ref is None:
2725 # Create a dangling commit
2726 c.parents = merge_heads
2727 self.object_store.add_object(c)
2728 else:
2729 try:
2730 old_head = self.refs[ref]
2731 c.parents = [old_head, *merge_heads]
2732 self.object_store.add_object(c)
2733 ok = self.refs.set_if_equals(
2734 ref,
2735 old_head,
2736 c.id,
2737 message=b"commit: " + message,
2738 committer=committer,
2739 timestamp=int(commit_timestamp),
2740 timezone=commit_timezone,
2741 )
2742 except KeyError:
2743 c.parents = merge_heads
2744 self.object_store.add_object(c)
2745 ok = self.refs.add_if_new(
2746 ref,
2747 c.id,
2748 message=b"commit: " + message,
2749 committer=committer,
2750 timestamp=int(commit_timestamp),
2751 timezone=commit_timezone,
2752 )
2753 if not ok:
2754 from .errors import CommitError
2756 raise CommitError(f"{ref!r} changed during commit")
2758 return c.id
2760 @classmethod
2761 def init_bare(
2762 cls,
2763 objects: Iterable[ShaFile],
2764 refs: Mapping[Ref, ObjectID],
2765 format: int | None = None,
2766 object_format: str | None = None,
2767 ) -> "MemoryRepo":
2768 """Create a new bare repository in memory.
2770 Args:
2771 objects: Objects for the new repository,
2772 as iterable
2773 refs: Refs as dictionary, mapping names
2774 to object SHA1s
2775 format: Repository format version (defaults to 0)
2776 object_format: Object format to use ("sha1" or "sha256", defaults to "sha1")
2777 """
2778 ret = cls()
2779 for obj in objects:
2780 ret.object_store.add_object(obj)
2781 for refname, sha in refs.items():
2782 ret.refs.add_if_new(refname, sha)
2783 ret._init_files(bare=True, format=format, object_format=object_format)
2784 return ret