Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 37%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# repo.py -- For dealing with git repositories.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
24"""Repository access.
26This module contains the base class for git repositories
27(BaseRepo) and an implementation which uses a repository on
28local disk (Repo).
30"""
32__all__ = [
33 "BASE_DIRECTORIES",
34 "COMMONDIR",
35 "CONTROLDIR",
36 "DEFAULT_BRANCH",
37 "DEFAULT_OFS_DELTA",
38 "GITDIR",
39 "INDEX_FILENAME",
40 "OBJECTDIR",
41 "REFSDIR",
42 "REFSDIR_HEADS",
43 "REFSDIR_TAGS",
44 "WORKTREES",
45 "BaseRepo",
46 "DefaultIdentityNotFound",
47 "InvalidUserIdentity",
48 "MemoryRepo",
49 "ParentsProvider",
50 "Repo",
51 "UnsupportedExtension",
52 "UnsupportedVersion",
53 "check_user_identity",
54 "get_user_identity",
55 "parse_graftpoints",
56 "parse_shared_repository",
57 "read_gitfile",
58 "serialize_graftpoints",
59]
61import os
62import stat
63import sys
64import time
65import warnings
66from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Sequence
67from io import BytesIO
68from types import TracebackType
69from typing import (
70 TYPE_CHECKING,
71 Any,
72 BinaryIO,
73 TypeVar,
74)
76if sys.version_info >= (3, 11):
77 from typing import Self
78else:
79 from typing_extensions import Self
81if TYPE_CHECKING:
82 # There are no circular imports here, but we try to defer imports as long
83 # as possible to reduce start-up time for anything that doesn't need
84 # these imports.
85 from .attrs import GitAttributes
86 from .config import ConditionMatcher, Config, ConfigFile, StackedConfig
87 from .diff_tree import RenameDetector
88 from .filters import FilterBlobNormalizer, FilterContext
89 from .index import Index
90 from .notes import Notes
91 from .object_format import ObjectFormat
92 from .object_store import BaseObjectStore, GraphWalker
93 from .pack import UnpackedObject
94 from .rebase import RebaseStateManager
95 from .walk import Walker
96 from .worktree import WorkTree
98from . import reflog
99from .errors import (
100 NoIndexPresent,
101 NotBlobError,
102 NotCommitError,
103 NotGitRepository,
104 NotTagError,
105 NotTreeError,
106 RefFormatError,
107)
108from .file import GitFile
109from .hooks import (
110 CommitMsgShellHook,
111 Hook,
112 PostCommitShellHook,
113 PostReceiveShellHook,
114 PreCommitShellHook,
115 PreReceiveShellHook,
116 UpdateShellHook,
117)
118from .object_store import (
119 DiskObjectStore,
120 MemoryObjectStore,
121 MissingObjectFinder,
122 ObjectStoreGraphWalker,
123 PackBasedObjectStore,
124 PackCapableObjectStore,
125 find_shallow,
126 peel_sha,
127)
128from .objects import (
129 Blob,
130 Commit,
131 ObjectID,
132 RawObjectID,
133 ShaFile,
134 Tag,
135 Tree,
136 check_hexsha,
137 valid_hexsha,
138)
139from .pack import generate_unpacked_objects
140from .refs import (
141 HEADREF,
142 LOCAL_TAG_PREFIX, # noqa: F401
143 SYMREF, # noqa: F401
144 DictRefsContainer,
145 DiskRefsContainer,
146 Ref,
147 RefsContainer,
148 _set_default_branch,
149 _set_head,
150 _set_origin_head,
151 check_ref_format, # noqa: F401
152 extract_branch_name,
153 is_per_worktree_ref,
154 local_branch_name,
155 read_packed_refs, # noqa: F401
156 read_packed_refs_with_peeled, # noqa: F401
157 write_packed_refs, # noqa: F401
158)
160CONTROLDIR = ".git"
161OBJECTDIR = "objects"
162DEFAULT_OFS_DELTA = True
164T = TypeVar("T", bound="ShaFile")
165REFSDIR = "refs"
166REFSDIR_TAGS = "tags"
167REFSDIR_HEADS = "heads"
168INDEX_FILENAME = "index"
169COMMONDIR = "commondir"
170GITDIR = "gitdir"
171WORKTREES = "worktrees"
173BASE_DIRECTORIES = [
174 ["branches"],
175 [REFSDIR],
176 [REFSDIR, REFSDIR_TAGS],
177 [REFSDIR, REFSDIR_HEADS],
178 ["hooks"],
179 ["info"],
180]
182DEFAULT_BRANCH = b"master"
185class InvalidUserIdentity(Exception):
186 """User identity is not of the format 'user <email>'."""
188 def __init__(self, identity: str) -> None:
189 """Initialize InvalidUserIdentity exception."""
190 self.identity = identity
193class DefaultIdentityNotFound(Exception):
194 """Default identity could not be determined."""
197# TODO(jelmer): Cache?
198def _get_default_identity() -> tuple[str, str]:
199 import socket
201 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
202 username = os.environ.get(name)
203 if username:
204 break
205 else:
206 username = None
208 try:
209 import pwd
210 except ImportError:
211 fullname = None
212 else:
213 try:
214 entry = pwd.getpwuid(os.getuid()) # type: ignore[attr-defined,unused-ignore]
215 except KeyError:
216 fullname = None
217 else:
218 if getattr(entry, "gecos", None):
219 fullname = entry.pw_gecos.split(",")[0]
220 else:
221 fullname = None
222 if username is None:
223 username = entry.pw_name
224 if not fullname:
225 if username is None:
226 raise DefaultIdentityNotFound("no username found")
227 fullname = username
228 email = os.environ.get("EMAIL")
229 if email is None:
230 if username is None:
231 raise DefaultIdentityNotFound("no username found")
232 email = f"{username}@{socket.gethostname()}"
233 return (fullname, email)
236def get_user_identity(config: "Config", kind: str | None = None) -> bytes:
237 """Determine the identity to use for new commits.
239 If kind is set, this first checks
240 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL.
242 If those variables are not set, then it will fall back
243 to reading the user.name and user.email settings from
244 the specified configuration.
246 If that also fails, then it will fall back to using
247 the current users' identity as obtained from the host
248 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f).
250 Args:
251 config: Configuration stack to read from
252 kind: Optional kind to return identity for,
253 usually either "AUTHOR" or "COMMITTER".
255 Returns:
256 A user identity
257 """
258 user: bytes | None = None
259 email: bytes | None = None
260 if kind:
261 user_uc = os.environ.get("GIT_" + kind + "_NAME")
262 if user_uc is not None:
263 user = user_uc.encode("utf-8")
264 email_uc = os.environ.get("GIT_" + kind + "_EMAIL")
265 if email_uc is not None:
266 email = email_uc.encode("utf-8")
267 if user is None:
268 try:
269 user = config.get(("user",), "name")
270 except KeyError:
271 user = None
272 if email is None:
273 try:
274 email = config.get(("user",), "email")
275 except KeyError:
276 email = None
277 default_user, default_email = _get_default_identity()
278 if user is None:
279 user = default_user.encode("utf-8")
280 if email is None:
281 email = default_email.encode("utf-8")
282 if email.startswith(b"<") and email.endswith(b">"):
283 email = email[1:-1]
284 return user + b" <" + email + b">"
287def check_user_identity(identity: bytes) -> None:
288 """Verify that a user identity is formatted correctly.
290 Args:
291 identity: User identity bytestring
292 Raises:
293 InvalidUserIdentity: Raised when identity is invalid
294 """
295 try:
296 _fst, snd = identity.split(b" <", 1)
297 except ValueError as exc:
298 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc
299 if b">" not in snd:
300 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
301 if b"\0" in identity or b"\n" in identity:
302 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
305def parse_graftpoints(
306 graftpoints: Iterable[bytes],
307) -> dict[ObjectID, list[ObjectID]]:
308 """Convert a list of graftpoints into a dict.
310 Args:
311 graftpoints: Iterator of graftpoint lines
313 Each line is formatted as:
314 <commit sha1> <parent sha1> [<parent sha1>]*
316 Resulting dictionary is:
317 <commit sha1>: [<parent sha1>*]
319 https://git.wiki.kernel.org/index.php/GraftPoint
320 """
321 grafts: dict[ObjectID, list[ObjectID]] = {}
322 for line in graftpoints:
323 raw_graft = line.split(None, 1)
325 commit = ObjectID(raw_graft[0])
326 if len(raw_graft) == 2:
327 parents = [ObjectID(p) for p in raw_graft[1].split()]
328 else:
329 parents = []
331 for sha in [commit, *parents]:
332 check_hexsha(sha, "Invalid graftpoint")
334 grafts[commit] = parents
335 return grafts
338def serialize_graftpoints(graftpoints: Mapping[ObjectID, Sequence[ObjectID]]) -> bytes:
339 """Convert a dictionary of grafts into string.
341 The graft dictionary is:
342 <commit sha1>: [<parent sha1>*]
344 Each line is formatted as:
345 <commit sha1> <parent sha1> [<parent sha1>]*
347 https://git.wiki.kernel.org/index.php/GraftPoint
349 """
350 graft_lines = []
351 for commit, parents in graftpoints.items():
352 if parents:
353 graft_lines.append(commit + b" " + b" ".join(parents))
354 else:
355 graft_lines.append(commit)
356 return b"\n".join(graft_lines)
359def _set_filesystem_hidden(path: str) -> None:
360 """Mark path as to be hidden if supported by platform and filesystem.
362 On win32 uses SetFileAttributesW api:
363 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw>
364 """
365 if sys.platform == "win32":
366 import ctypes
367 from ctypes.wintypes import BOOL, DWORD, LPCWSTR
369 FILE_ATTRIBUTE_HIDDEN = 2
370 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)(
371 ("SetFileAttributesW", ctypes.windll.kernel32)
372 )
374 if isinstance(path, bytes):
375 path = os.fsdecode(path)
376 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN):
377 pass # Could raise or log `ctypes.WinError()` here
379 # Could implement other platform specific filesystem hiding here
382def parse_shared_repository(
383 value: str | bytes | bool,
384) -> tuple[int | None, int | None]:
385 """Parse core.sharedRepository configuration value.
387 Args:
388 value: Configuration value (string, bytes, or boolean)
390 Returns:
391 tuple of (file_mask, directory_mask) or (None, None) if not shared
393 The masks are permission bits to apply via chmod.
394 """
395 if isinstance(value, bytes):
396 value = value.decode("utf-8", errors="replace")
398 # Handle boolean values
399 if isinstance(value, bool):
400 if value:
401 # true = group (same as "group")
402 return (0o664, 0o2775)
403 else:
404 # false = umask (use system umask, no adjustment)
405 return (None, None)
407 # Handle string values
408 value_lower = value.lower()
410 if value_lower in ("false", "0", ""):
411 # Use umask (no adjustment)
412 return (None, None)
414 if value_lower in ("true", "1", "group"):
415 # Group writable (with setgid bit)
416 return (0o664, 0o2775)
418 if value_lower in ("all", "world", "everybody", "2"):
419 # World readable/writable (with setgid bit)
420 return (0o666, 0o2777)
422 if value_lower == "umask":
423 # Explicitly use umask
424 return (None, None)
426 # Try to parse as octal
427 if value.startswith("0"):
428 try:
429 mode = int(value, 8)
430 # For directories, add execute bits where read bits are set
431 # and add setgid bit for shared repositories
432 dir_mode = mode | 0o2000 # Add setgid bit
433 if mode & 0o004:
434 dir_mode |= 0o001
435 if mode & 0o040:
436 dir_mode |= 0o010
437 if mode & 0o400:
438 dir_mode |= 0o100
439 return (mode, dir_mode)
440 except ValueError:
441 pass
443 # Default to umask for unrecognized values
444 return (None, None)
447def _enable_relative_worktrees_extension(repo: "Repo") -> None:
448 """Enable the relativeworktrees extension in repository config.
450 This sets core.repositoryformatversion to 1 (if not already) and
451 enables the extensions.relativeworktrees extension.
453 Args:
454 repo: The repository to configure
455 """
456 config = repo.get_config()
458 # Ensure repository format version is at least 1
459 try:
460 version = int(config.get(("core",), "repositoryformatversion"))
461 except KeyError:
462 version = 0
464 if version < 1:
465 config.set(("core",), "repositoryformatversion", "1")
467 # Enable the relativeworktrees extension
468 config.set(("extensions",), "relativeworktrees", True)
469 config.write_to_path()
472class ParentsProvider:
473 """Provider for commit parent information."""
475 def __init__(
476 self,
477 store: "BaseObjectStore",
478 grafts: dict[ObjectID, list[ObjectID]] = {},
479 shallows: Iterable[ObjectID] = [],
480 ) -> None:
481 """Initialize ParentsProvider.
483 Args:
484 store: Object store to use
485 grafts: Graft information
486 shallows: Shallow commit SHAs
487 """
488 self.store = store
489 self.grafts = grafts
490 self.shallows = set(shallows)
492 # Get commit graph once at initialization for performance
493 self.commit_graph = store.get_commit_graph()
495 def get_parents(
496 self, commit_id: ObjectID, commit: Commit | None = None
497 ) -> list[ObjectID]:
498 """Get parents for a commit using the parents provider."""
499 try:
500 return self.grafts[commit_id]
501 except KeyError:
502 pass
503 if commit_id in self.shallows:
504 return []
506 # Try to use commit graph for faster parent lookup
507 if self.commit_graph:
508 parents = self.commit_graph.get_parents(commit_id)
509 if parents is not None:
510 return parents
512 # Fallback to reading the commit object
513 if commit is None:
514 obj = self.store[commit_id]
515 if not isinstance(obj, Commit):
516 raise ValueError(
517 f"Expected Commit object for commit_id {commit_id.decode()}, "
518 f"got {type(obj).__name__}. This usually means a reference "
519 f"points to a {type(obj).__name__} object instead of a Commit."
520 )
521 commit = obj
522 result: list[ObjectID] = commit.parents
523 return result
526class BaseRepo:
527 """Base class for a git repository.
529 This base class is meant to be used for Repository implementations that e.g.
530 work on top of a different transport than a standard filesystem path.
532 Attributes:
533 object_store: Dictionary-like object for accessing
534 the objects
535 refs: Dictionary-like object with the refs in this
536 repository
537 """
539 def __init__(
540 self,
541 object_store: "PackCapableObjectStore",
542 refs: RefsContainer,
543 object_format: "ObjectFormat | None" = None,
544 ) -> None:
545 """Open a repository.
547 This shouldn't be called directly, but rather through one of the
548 base classes, such as MemoryRepo or Repo.
550 Args:
551 object_store: Object store to use
552 refs: Refs container to use
553 object_format: Hash algorithm to use (if None, will use object_store's format)
554 """
555 self.object_store = object_store
556 self.refs = refs
558 self._graftpoints: dict[ObjectID, list[ObjectID]] = {}
559 self.hooks: dict[str, Hook] = {}
560 if object_format is None:
561 self.object_format: ObjectFormat = object_store.object_format
562 else:
563 self.object_format = object_format
565 def _determine_file_mode(self) -> bool:
566 """Probe the file-system to determine whether permissions can be trusted.
568 Returns: True if permissions can be trusted, False otherwise.
569 """
570 raise NotImplementedError(self._determine_file_mode)
572 def _determine_symlinks(self) -> bool:
573 """Probe the filesystem to determine whether symlinks can be created.
575 Returns: True if symlinks can be created, False otherwise.
576 """
577 # For now, just mimic the old behaviour
578 return sys.platform != "win32"
580 def _init_files(
581 self,
582 bare: bool,
583 symlinks: bool | None = None,
584 format: int | None = None,
585 shared_repository: str | bool | None = None,
586 object_format: str | None = None,
587 ) -> None:
588 """Initialize a default set of named files."""
589 from .config import ConfigFile
591 self._put_named_file("description", b"Unnamed repository")
592 f = BytesIO()
593 cf = ConfigFile()
595 # Determine the appropriate format version
596 if object_format == "sha256":
597 # SHA256 requires format version 1
598 if format is None:
599 format = 1
600 elif format != 1:
601 raise ValueError(
602 "SHA256 object format requires repository format version 1"
603 )
604 else:
605 # SHA1 (default) can use format 0 or 1
606 if format is None:
607 format = 0
609 if format not in (0, 1):
610 raise ValueError(f"Unsupported repository format version: {format}")
612 cf.set("core", "repositoryformatversion", str(format))
614 # Set object format extension if using SHA256
615 if object_format == "sha256":
616 cf.set("extensions", "objectformat", "sha256")
618 # Set hash algorithm based on object format
619 from .object_format import get_object_format
621 self.object_format = get_object_format(object_format)
623 if self._determine_file_mode():
624 cf.set("core", "filemode", True)
625 else:
626 cf.set("core", "filemode", False)
628 if symlinks is None and not bare:
629 symlinks = self._determine_symlinks()
631 if symlinks is False:
632 cf.set("core", "symlinks", symlinks)
634 # On macOS, set precomposeunicode to true since HFS+/APFS
635 # returns filenames in NFD (decomposed) Unicode form
636 if sys.platform == "darwin":
637 cf.set("core", "precomposeunicode", True)
639 cf.set("core", "bare", bare)
640 cf.set("core", "logallrefupdates", True)
642 # Set shared repository if specified
643 if shared_repository is not None:
644 if isinstance(shared_repository, bool):
645 cf.set("core", "sharedRepository", shared_repository)
646 else:
647 cf.set("core", "sharedRepository", shared_repository)
649 cf.write_to_file(f)
650 self._put_named_file("config", f.getvalue())
651 self._put_named_file(os.path.join("info", "exclude"), b"")
653 # Allow subclasses to handle config initialization
654 self._init_config(cf)
656 def _init_config(self, config: "ConfigFile") -> None:
657 """Initialize repository configuration.
659 This method can be overridden by subclasses to handle config initialization.
661 Args:
662 config: The ConfigFile object that was just created
663 """
664 # Default implementation does nothing
666 def get_named_file(self, path: str) -> BinaryIO | None:
667 """Get a file from the control dir with a specific name.
669 Although the filename should be interpreted as a filename relative to
670 the control dir in a disk-based Repo, the object returned need not be
671 pointing to a file in that location.
673 Args:
674 path: The path to the file, relative to the control dir.
675 Returns: An open file object, or None if the file does not exist.
676 """
677 raise NotImplementedError(self.get_named_file)
679 def _put_named_file(self, path: str, contents: bytes) -> None:
680 """Write a file to the control dir with the given name and contents.
682 Args:
683 path: The path to the file, relative to the control dir.
684 contents: A string to write to the file.
685 """
686 raise NotImplementedError(self._put_named_file)
688 def _del_named_file(self, path: str) -> None:
689 """Delete a file in the control directory with the given name."""
690 raise NotImplementedError(self._del_named_file)
692 def open_index(self, config: "Config | None" = None) -> "Index":
693 """Open the index for this repository.
695 Args:
696 config: Configuration to consult for index settings. If None,
697 implementations may fall back to ``self.get_config_stack()``.
699 Raises:
700 NoIndexPresent: If no index is present
701 Returns: The matching `Index`
702 """
703 raise NotImplementedError(self.open_index)
705 def _change_object_format(self, object_format_name: str) -> None:
706 """Change the object format of this repository.
708 This can only be done if the object store is empty (no objects written yet).
710 Args:
711 object_format_name: Name of the new object format (e.g., "sha1", "sha256")
713 Raises:
714 AssertionError: If the object store is not empty
715 """
716 # Check if object store has any objects
717 for _ in self.object_store:
718 raise AssertionError(
719 "Cannot change object format: repository already contains objects"
720 )
722 # Update the object format
723 from .object_format import get_object_format
725 new_format = get_object_format(object_format_name)
726 self.object_format = new_format
727 self.object_store.object_format = new_format
729 # Update config file
730 config = self.get_config()
732 if object_format_name == "sha1":
733 # For SHA-1, explicitly remove objectformat extension if present
734 try:
735 config.remove("extensions", "objectformat")
736 except KeyError:
737 pass
738 else:
739 # For non-SHA-1 formats, set repositoryformatversion to 1 and objectformat extension
740 config.set("core", "repositoryformatversion", "1")
741 config.set("extensions", "objectformat", object_format_name)
743 config.write_to_path()
745 def fetch(
746 self,
747 target: "BaseRepo",
748 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]]
749 | None = None,
750 progress: Callable[..., None] | None = None,
751 depth: int | None = None,
752 ) -> dict[Ref, ObjectID]:
753 """Fetch objects into another repository.
755 Args:
756 target: The target repository
757 determine_wants: Optional function to determine what refs to
758 fetch.
759 progress: Optional progress function
760 depth: Optional shallow fetch depth
761 Returns: The local refs
762 """
763 # Fix object format if needed
764 if self.object_format != target.object_format:
765 # Change the target repo's format if it's empty
766 target._change_object_format(self.object_format.name)
768 if determine_wants is None:
769 determine_wants = target.object_store.determine_wants_all
770 count, pack_data = self.fetch_pack_data(
771 determine_wants,
772 target.get_graph_walker(),
773 progress=progress,
774 depth=depth,
775 )
776 target.object_store.add_pack_data(count, pack_data, progress)
777 return self.get_refs()
779 def fetch_pack_data(
780 self,
781 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]],
782 graph_walker: "GraphWalker",
783 progress: Callable[[bytes], None] | None,
784 *,
785 get_tagged: Callable[[], dict[ObjectID, ObjectID]] | None = None,
786 depth: int | None = None,
787 ) -> tuple[int, Iterator["UnpackedObject"]]:
788 """Fetch the pack data required for a set of revisions.
790 Args:
791 determine_wants: Function that takes a dictionary with heads
792 and returns the list of heads to fetch.
793 graph_walker: Object that can iterate over the list of revisions
794 to fetch and has an "ack" method that will be called to acknowledge
795 that a revision is present.
796 progress: Simple progress function that will be called with
797 updated progress strings.
798 get_tagged: Function that returns a dict of pointed-to sha ->
799 tag sha for including tags.
800 depth: Shallow fetch depth
801 Returns: count and iterator over pack data
802 """
803 missing_objects = self.find_missing_objects(
804 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth
805 )
806 if missing_objects is None:
807 return 0, iter([])
808 remote_has = missing_objects.get_remote_has()
809 object_ids = list(missing_objects)
810 return len(object_ids), generate_unpacked_objects(
811 self.object_store, object_ids, progress=progress, other_haves=remote_has
812 )
814 def find_missing_objects(
815 self,
816 determine_wants: Callable[[Mapping[Ref, ObjectID], int | None], list[ObjectID]],
817 graph_walker: "GraphWalker",
818 progress: Callable[[bytes], None] | None,
819 *,
820 get_tagged: Callable[[], dict[ObjectID, ObjectID]] | None = None,
821 depth: int | None = None,
822 ) -> MissingObjectFinder | None:
823 """Fetch the missing objects required for a set of revisions.
825 Args:
826 determine_wants: Function that takes a dictionary with heads
827 and returns the list of heads to fetch.
828 graph_walker: Object that can iterate over the list of revisions
829 to fetch and has an "ack" method that will be called to acknowledge
830 that a revision is present.
831 progress: Simple progress function that will be called with
832 updated progress strings.
833 get_tagged: Function that returns a dict of pointed-to sha ->
834 tag sha for including tags.
835 depth: Shallow fetch depth
836 Returns: iterator over objects, with __len__ implemented
837 """
838 import logging
840 # Filter out refs pointing to missing objects to avoid errors downstream.
841 # This makes Dulwich more robust when dealing with broken refs on disk.
842 # Previously serialize_refs() did this filtering as a side-effect.
843 all_refs = self.get_refs()
844 refs: dict[Ref, ObjectID] = {}
845 for ref, sha in all_refs.items():
846 if sha in self.object_store:
847 refs[ref] = sha
848 else:
849 logging.warning(
850 "ref %s points at non-present sha %s",
851 ref.decode("utf-8", "replace"),
852 sha.decode("ascii"),
853 )
855 wants = determine_wants(refs, depth)
856 if not isinstance(wants, list):
857 raise TypeError("determine_wants() did not return a list")
859 current_shallow = set(getattr(graph_walker, "shallow", set()))
861 if depth not in (None, 0):
862 assert depth is not None
863 shallow, not_shallow = find_shallow(self.object_store, wants, depth)
864 # Only update if graph_walker has shallow attribute
865 if hasattr(graph_walker, "shallow"):
866 graph_walker.shallow.update(shallow - not_shallow)
867 new_shallow = graph_walker.shallow - current_shallow
868 unshallow = not_shallow & current_shallow
869 setattr(graph_walker, "unshallow", unshallow)
870 if hasattr(graph_walker, "update_shallow"):
871 graph_walker.update_shallow(new_shallow, unshallow)
872 else:
873 unshallow = getattr(graph_walker, "unshallow", set())
875 if wants == []:
876 # TODO(dborowitz): find a way to short-circuit that doesn't change
877 # this interface.
879 if getattr(graph_walker, "shallow", set()) or unshallow:
880 # Do not send a pack in shallow short-circuit path
881 return None
883 # Return an actual MissingObjectFinder with empty wants
884 return MissingObjectFinder(
885 self.object_store,
886 haves=[],
887 wants=[],
888 )
890 # If the graph walker is set up with an implementation that can
891 # ACK/NAK to the wire, it will write data to the client through
892 # this call as a side-effect.
893 haves = self.object_store.find_common_revisions(graph_walker)
895 # Deal with shallow requests separately because the haves do
896 # not reflect what objects are missing
897 if getattr(graph_walker, "shallow", set()) or unshallow:
898 # TODO: filter the haves commits from iter_shas. the specific
899 # commits aren't missing.
900 haves = []
902 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow)
904 def get_parents(commit: Commit) -> list[ObjectID]:
905 """Get parents for a commit using the parents provider.
907 Args:
908 commit: Commit object
910 Returns:
911 List of parent commit SHAs
912 """
913 return parents_provider.get_parents(commit.id, commit)
915 return MissingObjectFinder(
916 self.object_store,
917 haves=haves,
918 wants=wants,
919 shallow=getattr(graph_walker, "shallow", set()),
920 progress=progress,
921 get_tagged=get_tagged,
922 get_parents=get_parents,
923 )
925 def generate_pack_data(
926 self,
927 have: set[ObjectID],
928 want: set[ObjectID],
929 *,
930 shallow: set[ObjectID] | None = None,
931 progress: Callable[[str], None] | None = None,
932 ofs_delta: bool | None = None,
933 ) -> tuple[int, Iterator["UnpackedObject"]]:
934 """Generate pack data objects for a set of wants/haves.
936 Args:
937 have: List of SHA1s of objects that should not be sent
938 want: List of SHA1s of objects that should be sent
939 shallow: Set of shallow commit SHA1s to skip (defaults to repo's shallow commits)
940 ofs_delta: Whether OFS deltas can be included
941 progress: Optional progress reporting method
942 """
943 if shallow is None:
944 shallow = self.get_shallow()
945 return self.object_store.generate_pack_data(
946 have,
947 want,
948 shallow=shallow,
949 progress=progress,
950 ofs_delta=ofs_delta if ofs_delta is not None else DEFAULT_OFS_DELTA,
951 )
953 def get_graph_walker(
954 self, heads: list[ObjectID] | None = None
955 ) -> ObjectStoreGraphWalker:
956 """Retrieve a graph walker.
958 A graph walker is used by a remote repository (or proxy)
959 to find out which objects are present in this repository.
961 Args:
962 heads: Repository heads to use (optional)
963 Returns: A graph walker object
964 """
965 if heads is None:
966 heads = [
967 sha
968 for sha in self.refs.as_dict(Ref(b"refs/heads")).values()
969 if sha in self.object_store
970 ]
971 parents_provider = ParentsProvider(self.object_store)
972 return ObjectStoreGraphWalker(
973 heads,
974 parents_provider.get_parents,
975 shallow=self.get_shallow(),
976 update_shallow=self.update_shallow,
977 )
979 def get_refs(self) -> dict[Ref, ObjectID]:
980 """Get dictionary with all refs.
982 Returns: A ``dict`` mapping ref names to SHA1s
983 """
984 return self.refs.as_dict()
986 def head(self) -> ObjectID:
987 """Return the SHA1 pointed at by HEAD."""
988 # TODO: move this method to WorkTree
989 return self.refs[HEADREF]
991 def _get_object(self, sha: ObjectID | RawObjectID, cls: type[T]) -> T:
992 assert len(sha) in (
993 self.object_format.oid_length,
994 self.object_format.hex_length,
995 )
996 ret = self.get_object(sha)
997 if not isinstance(ret, cls):
998 if cls is Commit:
999 raise NotCommitError(ret.id)
1000 elif cls is Blob:
1001 raise NotBlobError(ret.id)
1002 elif cls is Tree:
1003 raise NotTreeError(ret.id)
1004 elif cls is Tag:
1005 raise NotTagError(ret.id)
1006 else:
1007 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}")
1008 return ret
1010 def get_object(self, sha: ObjectID | RawObjectID) -> ShaFile:
1011 """Retrieve the object with the specified SHA.
1013 Args:
1014 sha: SHA to retrieve
1015 Returns: A ShaFile object
1016 Raises:
1017 KeyError: when the object can not be found
1018 """
1019 return self.object_store[sha]
1021 def parents_provider(self) -> ParentsProvider:
1022 """Get a parents provider for this repository.
1024 Returns:
1025 ParentsProvider instance configured with grafts and shallows
1026 """
1027 return ParentsProvider(
1028 self.object_store,
1029 grafts=self._graftpoints,
1030 shallows=self.get_shallow(),
1031 )
1033 def get_parents(
1034 self, sha: ObjectID, commit: Commit | None = None
1035 ) -> list[ObjectID]:
1036 """Retrieve the parents of a specific commit.
1038 If the specific commit is a graftpoint, the graft parents
1039 will be returned instead.
1041 Args:
1042 sha: SHA of the commit for which to retrieve the parents
1043 commit: Optional commit matching the sha
1044 Returns: List of parents
1045 """
1046 return self.parents_provider().get_parents(sha, commit)
1048 def get_config(self) -> "ConfigFile":
1049 """Retrieve the config object.
1051 Returns: `ConfigFile` object for the ``.git/config`` file.
1052 """
1053 raise NotImplementedError(self.get_config)
1055 def get_worktree_config(self) -> "ConfigFile":
1056 """Retrieve the worktree config object."""
1057 raise NotImplementedError(self.get_worktree_config)
1059 def get_description(self) -> bytes | None:
1060 """Retrieve the description for this repository.
1062 Returns: Bytes with the description of the repository
1063 as set by the user.
1064 """
1065 raise NotImplementedError(self.get_description)
1067 def set_description(self, description: bytes) -> None:
1068 """Set the description for this repository.
1070 Args:
1071 description: Text to set as description for this repository.
1072 """
1073 raise NotImplementedError(self.set_description)
1075 def get_rebase_state_manager(self) -> "RebaseStateManager":
1076 """Get the appropriate rebase state manager for this repository.
1078 Returns: RebaseStateManager instance
1079 """
1080 raise NotImplementedError(self.get_rebase_state_manager)
1082 def get_blob_normalizer(
1083 self, config: "Config | None" = None
1084 ) -> "FilterBlobNormalizer":
1085 """Return a BlobNormalizer object for checkin/checkout operations.
1087 Args:
1088 config: Configuration to consult for filter setup. If None,
1089 implementations may fall back to ``self.get_config_stack()``.
1091 Returns: BlobNormalizer instance
1092 """
1093 raise NotImplementedError(self.get_blob_normalizer)
1095 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
1096 """Read gitattributes for the repository.
1098 Args:
1099 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
1101 Returns:
1102 GitAttributes object that can be used to match paths
1103 """
1104 raise NotImplementedError(self.get_gitattributes)
1106 def get_config_stack(self) -> "StackedConfig":
1107 """Return a config stack for this repository.
1109 This stack accesses the configuration for both this repository
1110 itself (.git/config) and the global configuration, which usually
1111 lives in ~/.gitconfig.
1113 Returns: `Config` instance for this repository
1114 """
1115 from .config import ConfigFile, StackedConfig
1117 local_config = self.get_config()
1118 backends: list[ConfigFile] = [local_config]
1119 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
1120 backends.append(self.get_worktree_config())
1122 backends += StackedConfig.default_backends()
1123 return StackedConfig(backends, writable=local_config)
1125 def get_shallow(self) -> set[ObjectID]:
1126 """Get the set of shallow commits.
1128 Returns: Set of shallow commits.
1129 """
1130 f = self.get_named_file("shallow")
1131 if f is None:
1132 return set()
1133 with f:
1134 return {ObjectID(line.strip()) for line in f}
1136 def update_shallow(
1137 self, new_shallow: set[ObjectID] | None, new_unshallow: set[ObjectID] | None
1138 ) -> None:
1139 """Update the list of shallow objects.
1141 Args:
1142 new_shallow: Newly shallow objects
1143 new_unshallow: Newly no longer shallow objects
1144 """
1145 shallow = self.get_shallow()
1146 if new_shallow:
1147 shallow.update(new_shallow)
1148 if new_unshallow:
1149 shallow.difference_update(new_unshallow)
1150 if shallow:
1151 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow]))
1152 else:
1153 self._del_named_file("shallow")
1155 def get_peeled(self, ref: Ref) -> ObjectID:
1156 """Get the peeled value of a ref.
1158 Args:
1159 ref: The refname to peel.
1160 Returns: The fully-peeled SHA1 of a tag object, after peeling all
1161 intermediate tags; if the original ref does not point to a tag,
1162 this will equal the original SHA1.
1163 """
1164 cached = self.refs.get_peeled(ref)
1165 if cached is not None:
1166 return cached
1167 return peel_sha(self.object_store, self.refs[ref])[1].id
1169 @property
1170 def notes(self) -> "Notes":
1171 """Access notes functionality for this repository.
1173 Returns:
1174 Notes object for accessing notes
1175 """
1176 from .notes import Notes
1178 return Notes(self.object_store, self.refs)
1180 def get_walker(
1181 self,
1182 include: Sequence[ObjectID] | None = None,
1183 exclude: Sequence[ObjectID] | None = None,
1184 order: str = "date",
1185 reverse: bool = False,
1186 max_entries: int | None = None,
1187 paths: Sequence[bytes] | None = None,
1188 rename_detector: "RenameDetector | None" = None,
1189 follow: bool = False,
1190 since: int | None = None,
1191 until: int | None = None,
1192 queue_cls: type | None = None,
1193 ) -> "Walker":
1194 """Obtain a walker for this repository.
1196 Args:
1197 include: Iterable of SHAs of commits to include along with their
1198 ancestors. Defaults to [HEAD]
1199 exclude: Iterable of SHAs of commits to exclude along with their
1200 ancestors, overriding includes.
1201 order: ORDER_* constant specifying the order of results.
1202 Anything other than ORDER_DATE may result in O(n) memory usage.
1203 reverse: If True, reverse the order of output, requiring O(n)
1204 memory.
1205 max_entries: The maximum number of entries to yield, or None for
1206 no limit.
1207 paths: Iterable of file or subtree paths to show entries for.
1208 rename_detector: diff.RenameDetector object for detecting
1209 renames.
1210 follow: If True, follow path across renames/copies. Forces a
1211 default rename_detector.
1212 since: Timestamp to list commits after.
1213 until: Timestamp to list commits before.
1214 queue_cls: A class to use for a queue of commits, supporting the
1215 iterator protocol. The constructor takes a single argument, the Walker.
1217 Returns: A `Walker` object
1218 """
1219 from .walk import Walker, _CommitTimeQueue
1221 if include is None:
1222 include = [self.head()]
1224 # Pass all arguments to Walker explicitly to avoid type issues with **kwargs
1225 return Walker(
1226 self.object_store,
1227 include,
1228 exclude=exclude,
1229 order=order,
1230 reverse=reverse,
1231 max_entries=max_entries,
1232 paths=paths,
1233 rename_detector=rename_detector,
1234 follow=follow,
1235 since=since,
1236 until=until,
1237 get_parents=lambda commit: self.get_parents(commit.id, commit),
1238 queue_cls=queue_cls if queue_cls is not None else _CommitTimeQueue,
1239 )
1241 def __getitem__(self, name: ObjectID | Ref | bytes) -> "ShaFile":
1242 """Retrieve a Git object by SHA1 or ref.
1244 Args:
1245 name: A Git object SHA1 or a ref name
1246 Returns: A `ShaFile` object, such as a Commit or Blob
1247 Raises:
1248 KeyError: when the specified ref or object does not exist
1249 """
1250 if not isinstance(name, bytes):
1251 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}")
1252 # If it looks like a ref name, only try refs
1253 if name == b"HEAD" or name.startswith(b"refs/"):
1254 try:
1255 return self.object_store[self.refs[Ref(name)]]
1256 except (RefFormatError, KeyError):
1257 pass
1258 # Otherwise, try as object ID if length matches
1259 if len(name) in (
1260 self.object_store.object_format.oid_length,
1261 self.object_store.object_format.hex_length,
1262 ):
1263 try:
1264 return self.object_store[
1265 ObjectID(name)
1266 if len(name) == self.object_store.object_format.hex_length
1267 else RawObjectID(name)
1268 ]
1269 except (KeyError, ValueError):
1270 pass
1271 # If nothing worked, raise KeyError
1272 raise KeyError(name)
1274 def __contains__(self, name: bytes) -> bool:
1275 """Check if a specific Git object or ref is present.
1277 Args:
1278 name: Git object SHA1/SHA256 or ref name
1279 """
1280 if len(name) == 20:
1281 return RawObjectID(name) in self.object_store or Ref(name) in self.refs
1282 elif len(name) == 40 and valid_hexsha(name):
1283 return ObjectID(name) in self.object_store or Ref(name) in self.refs
1284 # Check if it's a binary or hex SHA
1285 if len(name) == self.object_format.oid_length:
1286 return RawObjectID(name) in self.object_store or Ref(name) in self.refs
1287 elif len(name) == self.object_format.hex_length and valid_hexsha(name):
1288 return ObjectID(name) in self.object_store or Ref(name) in self.refs
1289 else:
1290 return Ref(name) in self.refs
1292 def __setitem__(self, name: bytes, value: ShaFile | bytes) -> None:
1293 """Set a ref.
1295 Args:
1296 name: ref name
1297 value: Ref value - either a ShaFile object, or a hex sha
1298 """
1299 if name.startswith(b"refs/") or name == HEADREF:
1300 ref_name = Ref(name)
1301 if isinstance(value, ShaFile):
1302 self.refs[ref_name] = value.id
1303 elif isinstance(value, bytes):
1304 self.refs[ref_name] = ObjectID(value)
1305 else:
1306 raise TypeError(value)
1307 else:
1308 raise ValueError(name)
1310 def __delitem__(self, name: bytes) -> None:
1311 """Remove a ref.
1313 Args:
1314 name: Name of the ref to remove
1315 """
1316 if name.startswith(b"refs/") or name == HEADREF:
1317 del self.refs[Ref(name)]
1318 else:
1319 raise ValueError(name)
1321 def _get_user_identity(
1322 self, config: "StackedConfig", kind: str | None = None
1323 ) -> bytes:
1324 """Determine the identity to use for new commits."""
1325 warnings.warn(
1326 "use get_user_identity() rather than Repo._get_user_identity",
1327 DeprecationWarning,
1328 )
1329 return get_user_identity(config)
1331 def _add_graftpoints(
1332 self, updated_graftpoints: dict[ObjectID, list[ObjectID]]
1333 ) -> None:
1334 """Add or modify graftpoints.
1336 Args:
1337 updated_graftpoints: Dict of commit shas to list of parent shas
1338 """
1339 # Simple validation
1340 for commit, parents in updated_graftpoints.items():
1341 for sha in [commit, *parents]:
1342 check_hexsha(sha, "Invalid graftpoint")
1344 self._graftpoints.update(updated_graftpoints)
1346 def _remove_graftpoints(self, to_remove: Sequence[ObjectID] = ()) -> None:
1347 """Remove graftpoints.
1349 Args:
1350 to_remove: List of commit shas
1351 """
1352 for sha in to_remove:
1353 del self._graftpoints[sha]
1355 def _read_heads(self, name: str) -> list[ObjectID]:
1356 f = self.get_named_file(name)
1357 if f is None:
1358 return []
1359 with f:
1360 return [ObjectID(line.strip()) for line in f.readlines() if line.strip()]
1362 def get_worktree(self) -> "WorkTree":
1363 """Get the working tree for this repository.
1365 Returns:
1366 WorkTree instance for performing working tree operations
1368 Raises:
1369 NotImplementedError: If the repository doesn't support working trees
1370 """
1371 raise NotImplementedError(
1372 "Working tree operations not supported by this repository type"
1373 )
1376def read_gitfile(f: BinaryIO) -> str:
1377 """Read a ``.git`` file.
1379 The first line of the file should start with "gitdir: "
1381 Args:
1382 f: File-like object to read from
1383 Returns: A path
1384 """
1385 cs = f.read()
1386 if not cs.startswith(b"gitdir: "):
1387 raise ValueError("Expected file to start with 'gitdir: '")
1388 return cs[len(b"gitdir: ") :].rstrip(b"\r\n").decode("utf-8")
1391class UnsupportedVersion(Exception):
1392 """Unsupported repository version."""
1394 def __init__(self, version: int) -> None:
1395 """Initialize UnsupportedVersion exception.
1397 Args:
1398 version: The unsupported repository version
1399 """
1400 self.version = version
1403class UnsupportedExtension(Exception):
1404 """Unsupported repository extension."""
1406 def __init__(self, extension: str) -> None:
1407 """Initialize UnsupportedExtension exception.
1409 Args:
1410 extension: The unsupported repository extension
1411 """
1412 self.extension = extension
1415class Repo(BaseRepo):
1416 """A git repository backed by local disk.
1418 To open an existing repository, call the constructor with
1419 the path of the repository.
1421 To create a new repository, use the Repo.init class method.
1423 Note that a repository object may hold on to resources such
1424 as file handles for performance reasons; call .close() to free
1425 up those resources.
1427 Attributes:
1428 path: Path to the working copy (if it exists) or repository control
1429 directory (if the repository is bare)
1430 bare: Whether this is a bare repository
1431 """
1433 path: str
1434 bare: bool
1435 object_store: DiskObjectStore
1436 filter_context: "FilterContext | None"
1438 def __init__(
1439 self,
1440 root: str | bytes | os.PathLike[str],
1441 object_store: PackBasedObjectStore | None = None,
1442 bare: bool | None = None,
1443 ) -> None:
1444 """Open a repository on disk.
1446 Args:
1447 root: Path to the repository's root.
1448 object_store: ObjectStore to use; if omitted, we use the
1449 repository's default object store
1450 bare: True if this is a bare repository.
1451 """
1452 root = os.fspath(root)
1453 if isinstance(root, bytes):
1454 root = os.fsdecode(root)
1455 hidden_path = os.path.join(root, CONTROLDIR)
1456 if bare is None:
1457 if os.path.isfile(hidden_path) or os.path.isdir(
1458 os.path.join(hidden_path, OBJECTDIR)
1459 ):
1460 bare = False
1461 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(
1462 os.path.join(root, REFSDIR)
1463 ):
1464 bare = True
1465 else:
1466 raise NotGitRepository(
1467 "No git repository was found at {path}".format(**dict(path=root))
1468 )
1470 self.bare = bare
1471 if bare is False:
1472 if os.path.isfile(hidden_path):
1473 with open(hidden_path, "rb") as f:
1474 path = read_gitfile(f)
1475 self._controldir = os.path.join(root, path)
1476 else:
1477 self._controldir = hidden_path
1478 else:
1479 self._controldir = root
1480 commondir = self.get_named_file(COMMONDIR)
1481 if commondir is not None:
1482 with commondir:
1483 self._commondir = os.path.join(
1484 self.controldir(),
1485 os.fsdecode(commondir.read().rstrip(b"\r\n")),
1486 )
1487 else:
1488 self._commondir = self._controldir
1489 self.path = root
1491 # Initialize refs early so they're available for config condition matchers
1492 self.refs = DiskRefsContainer(
1493 self.commondir(), self._controldir, logger=self._write_reflog
1494 )
1496 # Initialize worktrees container
1497 from .worktree import WorkTreeContainer
1499 self.worktrees = WorkTreeContainer(self)
1501 config = self.get_config()
1502 try:
1503 repository_format_version = config.get("core", "repositoryformatversion")
1504 format_version = (
1505 0
1506 if repository_format_version is None
1507 else int(repository_format_version)
1508 )
1509 except KeyError:
1510 format_version = 0
1512 if format_version not in (0, 1):
1513 raise UnsupportedVersion(format_version)
1515 # Track extensions we encounter
1516 has_reftable_extension = False
1517 for extension, value in config.items((b"extensions",)):
1518 if extension.lower() == b"refstorage":
1519 if value == b"reftable":
1520 has_reftable_extension = True
1521 else:
1522 raise UnsupportedExtension(f"refStorage = {value.decode()}")
1523 elif extension.lower() not in (
1524 b"worktreeconfig",
1525 b"objectformat",
1526 b"relativeworktrees",
1527 ):
1528 raise UnsupportedExtension(extension.decode("utf-8"))
1530 if object_store is None:
1531 # Get shared repository permissions from config
1532 try:
1533 shared_value = config.get(("core",), "sharedRepository")
1534 file_mode, dir_mode = parse_shared_repository(shared_value)
1535 except KeyError:
1536 file_mode, dir_mode = None, None
1538 object_store = DiskObjectStore.from_config(
1539 os.path.join(self.commondir(), OBJECTDIR),
1540 config,
1541 file_mode=file_mode,
1542 dir_mode=dir_mode,
1543 )
1545 # Use reftable if extension is configured
1546 if has_reftable_extension:
1547 from .reftable import ReftableRefsContainer
1549 self.refs = ReftableRefsContainer(self.commondir())
1550 # Update worktrees container after refs change
1551 self.worktrees = WorkTreeContainer(self)
1552 BaseRepo.__init__(self, object_store, self.refs)
1554 # Determine hash algorithm from config if not already set
1555 if self.object_format is None:
1556 from .object_format import DEFAULT_OBJECT_FORMAT, get_object_format
1558 if format_version == 1:
1559 try:
1560 object_format = config.get((b"extensions",), b"objectformat")
1561 self.object_format = get_object_format(
1562 object_format.decode("ascii")
1563 )
1564 except KeyError:
1565 self.object_format = DEFAULT_OBJECT_FORMAT
1566 else:
1567 self.object_format = DEFAULT_OBJECT_FORMAT
1569 self._graftpoints = {}
1570 graft_file = self.get_named_file(
1571 os.path.join("info", "grafts"), basedir=self.commondir()
1572 )
1573 if graft_file:
1574 with graft_file:
1575 self._graftpoints.update(parse_graftpoints(graft_file))
1576 graft_file = self.get_named_file("shallow", basedir=self.commondir())
1577 if graft_file:
1578 with graft_file:
1579 self._graftpoints.update(parse_graftpoints(graft_file))
1581 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir())
1582 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir())
1583 self.hooks["post-commit"] = PostCommitShellHook(self.controldir())
1584 self.hooks["pre-receive"] = PreReceiveShellHook(self.controldir())
1585 self.hooks["update"] = UpdateShellHook(self.controldir())
1586 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir())
1588 # Initialize filter context as None, will be created lazily
1589 self.filter_context = None
1591 def get_worktree(self) -> "WorkTree":
1592 """Get the working tree for this repository.
1594 Returns:
1595 WorkTree instance for performing working tree operations
1596 """
1597 from .worktree import WorkTree
1599 return WorkTree(self, self.path)
1601 def _write_reflog(
1602 self,
1603 ref: bytes,
1604 old_sha: bytes,
1605 new_sha: bytes,
1606 committer: bytes | None,
1607 timestamp: int | None,
1608 timezone: int | None,
1609 message: bytes,
1610 ) -> None:
1611 from .reflog import format_reflog_line
1613 path = self._reflog_path(ref)
1615 # Get shared repository permissions
1616 file_mode, dir_mode = self._get_shared_repository_permissions()
1618 # Create directory with appropriate permissions
1619 parent_dir = os.path.dirname(path)
1620 # Create directory tree, setting permissions on each level if needed
1621 parts = []
1622 current = parent_dir
1623 while current and not os.path.exists(current):
1624 parts.append(current)
1625 current = os.path.dirname(current)
1626 parts.reverse()
1627 for part in parts:
1628 os.mkdir(part)
1629 if dir_mode is not None:
1630 os.chmod(part, dir_mode)
1631 if committer is None:
1632 config = self.get_config_stack()
1633 committer = get_user_identity(config)
1634 check_user_identity(committer)
1635 if timestamp is None:
1636 timestamp = int(time.time())
1637 if timezone is None:
1638 timezone = 0 # FIXME
1639 with open(path, "ab") as f:
1640 f.write(
1641 format_reflog_line(
1642 old_sha, new_sha, committer, timestamp, timezone, message
1643 )
1644 + b"\n"
1645 )
1647 # Set file permissions (open() respects umask, so we need chmod to set the actual mode)
1648 # Always chmod to ensure correct permissions even if file already existed
1649 if file_mode is not None:
1650 os.chmod(path, file_mode)
1652 def _reflog_path(self, ref: bytes) -> str:
1653 if ref.startswith((b"main-worktree/", b"worktrees/")):
1654 raise NotImplementedError(f"refs {ref.decode()} are not supported")
1656 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir()
1657 return os.path.join(base, "logs", os.fsdecode(ref))
1659 def read_reflog(self, ref: bytes) -> Generator[reflog.Entry, None, None]:
1660 """Read reflog entries for a reference.
1662 Args:
1663 ref: Reference name (e.g. b'HEAD', b'refs/heads/master')
1665 Yields:
1666 reflog.Entry objects in chronological order (oldest first)
1667 """
1668 from .reflog import read_reflog
1670 path = self._reflog_path(ref)
1671 try:
1672 with open(path, "rb") as f:
1673 yield from read_reflog(f)
1674 except FileNotFoundError:
1675 return
1677 @classmethod
1678 def discover(cls, start: str | bytes | os.PathLike[str] = ".") -> "Repo":
1679 """Iterate parent directories to discover a repository.
1681 Return a Repo object for the first parent directory that looks like a
1682 Git repository.
1684 Args:
1685 start: The directory to start discovery from (defaults to '.')
1686 """
1687 path = os.path.abspath(start)
1688 while True:
1689 try:
1690 return cls(path)
1691 except NotGitRepository:
1692 new_path, _tail = os.path.split(path)
1693 if new_path == path: # Root reached
1694 break
1695 path = new_path
1696 start_str = os.fspath(start)
1697 if isinstance(start_str, bytes):
1698 start_str = start_str.decode("utf-8")
1699 raise NotGitRepository(f"No git repository was found at {start_str}")
1701 def controldir(self) -> str:
1702 """Return the path of the control directory."""
1703 return self._controldir
1705 def commondir(self) -> str:
1706 """Return the path of the common directory.
1708 For a main working tree, it is identical to controldir().
1710 For a linked working tree, it is the control directory of the
1711 main working tree.
1712 """
1713 return self._commondir
1715 def _determine_file_mode(self) -> bool:
1716 """Probe the file-system to determine whether permissions can be trusted.
1718 Returns: True if permissions can be trusted, False otherwise.
1719 """
1720 fname = os.path.join(self.path, ".probe-permissions")
1721 with open(fname, "w") as f:
1722 f.write("")
1724 st1 = os.lstat(fname)
1725 try:
1726 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR)
1727 except PermissionError:
1728 return False
1729 st2 = os.lstat(fname)
1731 os.unlink(fname)
1733 mode_differs = st1.st_mode != st2.st_mode
1734 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0
1736 return mode_differs and st2_has_exec
1738 def _determine_symlinks(self) -> bool:
1739 """Probe the filesystem to determine whether symlinks can be created.
1741 Returns: True if symlinks can be created, False otherwise.
1742 """
1743 # TODO(jelmer): Actually probe disk / look at filesystem
1744 return sys.platform != "win32"
1746 def _get_shared_repository_permissions(
1747 self,
1748 ) -> tuple[int | None, int | None]:
1749 """Get shared repository file and directory permissions from config.
1751 Returns:
1752 tuple of (file_mask, directory_mask) or (None, None) if not shared
1753 """
1754 try:
1755 config = self.get_config()
1756 value = config.get(("core",), "sharedRepository")
1757 return parse_shared_repository(value)
1758 except KeyError:
1759 return (None, None)
1761 def _put_named_file(self, path: str, contents: bytes) -> None:
1762 """Write a file to the control dir with the given name and contents.
1764 Args:
1765 path: The path to the file, relative to the control dir.
1766 contents: A string to write to the file.
1767 """
1768 path = path.lstrip(os.path.sep)
1770 # Get shared repository permissions
1771 file_mode, _ = self._get_shared_repository_permissions()
1773 # Create file with appropriate permissions
1774 if file_mode is not None:
1775 with GitFile(
1776 os.path.join(self.controldir(), path), "wb", mask=file_mode
1777 ) as f:
1778 f.write(contents)
1779 else:
1780 with GitFile(os.path.join(self.controldir(), path), "wb") as f:
1781 f.write(contents)
1783 def _del_named_file(self, path: str) -> None:
1784 try:
1785 os.unlink(os.path.join(self.controldir(), path))
1786 except FileNotFoundError:
1787 return
1789 def get_named_file(
1790 self,
1791 path: str | bytes,
1792 basedir: str | None = None,
1793 ) -> BinaryIO | None:
1794 """Get a file from the control dir with a specific name.
1796 Although the filename should be interpreted as a filename relative to
1797 the control dir in a disk-based Repo, the object returned need not be
1798 pointing to a file in that location.
1800 Args:
1801 path: The path to the file, relative to the control dir.
1802 basedir: Optional argument that specifies an alternative to the
1803 control dir.
1804 Returns: An open file object, or None if the file does not exist.
1805 """
1806 # TODO(dborowitz): sanitize filenames, since this is used directly by
1807 # the dumb web serving code.
1808 if basedir is None:
1809 basedir = self.controldir()
1810 if isinstance(path, bytes):
1811 path = path.decode("utf-8")
1812 path = path.lstrip(os.path.sep)
1813 try:
1814 return open(os.path.join(basedir, path), "rb")
1815 except FileNotFoundError:
1816 return None
1818 def index_path(self) -> str:
1819 """Return path to the index file."""
1820 return os.path.join(self.controldir(), INDEX_FILENAME)
1822 def open_index(self, config: "Config | None" = None) -> "Index":
1823 """Open the index for this repository.
1825 Args:
1826 config: Configuration to consult for index settings. If None,
1827 falls back to ``self.get_config_stack()``.
1829 Raises:
1830 NoIndexPresent: If no index is present
1831 Returns: The matching `Index`
1832 """
1833 from .index import Index, make_path_normalizer
1835 if not self.has_index():
1836 raise NoIndexPresent
1838 if config is None:
1839 config = self.get_config_stack()
1840 many_files = config.get_boolean(b"feature", b"manyFiles", False)
1841 skip_hash = False
1842 index_version = None
1844 if many_files:
1845 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true
1846 try:
1847 index_version_str = config.get(b"index", b"version")
1848 index_version = int(index_version_str)
1849 except KeyError:
1850 index_version = 4 # Default to version 4 for manyFiles
1851 skip_hash = config.get_boolean(b"index", b"skipHash", True)
1852 else:
1853 # Check for explicit index settings
1854 try:
1855 index_version_str = config.get(b"index", b"version")
1856 index_version = int(index_version_str)
1857 except KeyError:
1858 index_version = None
1859 skip_hash = config.get_boolean(b"index", b"skipHash", False)
1861 # Get shared repository permissions for index file
1862 file_mode, _ = self._get_shared_repository_permissions()
1864 return Index(
1865 self.index_path(),
1866 skip_hash=skip_hash,
1867 version=index_version,
1868 file_mode=file_mode,
1869 path_normalizer=make_path_normalizer(config),
1870 )
1872 def has_index(self) -> bool:
1873 """Check if an index is present."""
1874 # Bare repos must never have index files; non-bare repos may have a
1875 # missing index file, which is treated as empty.
1876 return not self.bare
1878 def clone(
1879 self,
1880 target_path: str | bytes | os.PathLike[str],
1881 *,
1882 mkdir: bool = True,
1883 bare: bool = False,
1884 origin: bytes = b"origin",
1885 checkout: bool | None = None,
1886 branch: bytes | None = None,
1887 progress: Callable[[str], None] | None = None,
1888 depth: int | None = None,
1889 symlinks: bool | None = None,
1890 ) -> "Repo":
1891 """Clone this repository.
1893 Args:
1894 target_path: Target path
1895 mkdir: Create the target directory
1896 bare: Whether to create a bare repository
1897 checkout: Whether or not to check-out HEAD after cloning
1898 origin: Base name for refs in target repository
1899 cloned from this repository
1900 branch: Optional branch or tag to be used as HEAD in the new repository
1901 instead of this repository's HEAD.
1902 progress: Optional progress function
1903 depth: Depth at which to fetch
1904 symlinks: Symlinks setting (default to autodetect)
1905 Returns: Created repository as `Repo`
1906 """
1907 encoded_path = os.fsencode(self.path)
1909 if mkdir:
1910 os.mkdir(target_path)
1912 try:
1913 if not bare:
1914 target = Repo.init(target_path, symlinks=symlinks)
1915 if checkout is None:
1916 checkout = True
1917 else:
1918 if checkout:
1919 raise ValueError("checkout and bare are incompatible")
1920 target = Repo.init_bare(target_path)
1922 try:
1923 target_config = target.get_config()
1924 target_config.set((b"remote", origin), b"url", encoded_path)
1925 target_config.set(
1926 (b"remote", origin),
1927 b"fetch",
1928 b"+refs/heads/*:refs/remotes/" + origin + b"/*",
1929 )
1930 target_config.write_to_path()
1932 ref_message = b"clone: from " + encoded_path
1933 self.fetch(target, depth=depth)
1934 target.refs.import_refs(
1935 Ref(b"refs/remotes/" + origin),
1936 self.refs.as_dict(Ref(b"refs/heads")),
1937 message=ref_message,
1938 )
1939 target.refs.import_refs(
1940 Ref(b"refs/tags"),
1941 self.refs.as_dict(Ref(b"refs/tags")),
1942 message=ref_message,
1943 )
1945 head_chain, origin_sha = self.refs.follow(HEADREF)
1946 origin_head = head_chain[-1] if head_chain else None
1947 if origin_sha and not origin_head:
1948 # set detached HEAD
1949 target.refs[HEADREF] = origin_sha
1950 else:
1951 _set_origin_head(target.refs, origin, origin_head)
1952 head_ref = _set_default_branch(
1953 target.refs, origin, origin_head, branch, ref_message
1954 )
1956 # Update target head
1957 if head_ref:
1958 head = _set_head(target.refs, head_ref, ref_message)
1959 else:
1960 head = None
1962 if checkout and head is not None:
1963 target.get_worktree().reset_index(config=target.get_config_stack())
1964 except BaseException:
1965 target.close()
1966 raise
1967 except BaseException:
1968 if mkdir:
1969 import shutil
1971 shutil.rmtree(target_path)
1972 raise
1973 return target
1975 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]:
1976 """Get condition matchers for includeIf conditions.
1978 Returns a dict of condition prefix to matcher function.
1979 """
1980 from pathlib import Path
1982 from .config import ConditionMatcher, match_glob_pattern
1984 # Add gitdir matchers
1985 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool:
1986 """Match gitdir against a pattern.
1988 Args:
1989 pattern: Pattern to match against
1990 case_sensitive: Whether to match case-sensitively
1992 Returns:
1993 True if gitdir matches pattern
1994 """
1995 # Handle relative patterns (starting with ./)
1996 if pattern.startswith("./"):
1997 # Can't handle relative patterns without config directory context
1998 return False
2000 # Normalize repository path
2001 try:
2002 repo_path = str(Path(self._controldir).resolve())
2003 except (OSError, ValueError):
2004 return False
2006 # Expand ~ in pattern and normalize
2007 pattern = os.path.expanduser(pattern)
2009 # Normalize pattern following Git's rules
2010 pattern = pattern.replace("\\", "/")
2011 if not pattern.startswith(("~/", "./", "/", "**")):
2012 # Check for Windows absolute path
2013 if len(pattern) >= 2 and pattern[1] == ":":
2014 pass
2015 else:
2016 pattern = "**/" + pattern
2017 if pattern.endswith("/"):
2018 pattern = pattern + "**"
2020 # Use the existing _match_gitdir_pattern function
2021 from .config import _match_gitdir_pattern
2023 pattern_bytes = pattern.encode("utf-8", errors="replace")
2024 repo_path_bytes = repo_path.encode("utf-8", errors="replace")
2026 return _match_gitdir_pattern(
2027 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive
2028 )
2030 # Add onbranch matcher
2031 def match_onbranch(pattern: str) -> bool:
2032 """Match current branch against a pattern.
2034 Args:
2035 pattern: Pattern to match against
2037 Returns:
2038 True if current branch matches pattern
2039 """
2040 try:
2041 # Get the current branch using refs
2042 ref_chain, _ = self.refs.follow(HEADREF)
2043 head_ref = ref_chain[-1] # Get the final resolved ref
2044 except KeyError:
2045 pass
2046 else:
2047 if head_ref and head_ref.startswith(b"refs/heads/"):
2048 # Extract branch name from ref
2049 branch = extract_branch_name(head_ref).decode(
2050 "utf-8", errors="replace"
2051 )
2052 return match_glob_pattern(branch, pattern)
2053 return False
2055 matchers: dict[str, ConditionMatcher] = {
2056 "onbranch:": match_onbranch,
2057 "gitdir:": lambda pattern: match_gitdir(pattern, True),
2058 "gitdir/i:": lambda pattern: match_gitdir(pattern, False),
2059 }
2061 return matchers
2063 def get_worktree_config(self) -> "ConfigFile":
2064 """Get the worktree-specific config.
2066 Returns:
2067 ConfigFile object for the worktree config
2068 """
2069 from .config import ConfigFile
2071 path = os.path.join(self.commondir(), "config.worktree")
2072 try:
2073 # Pass condition matchers for includeIf evaluation
2074 condition_matchers = self._get_config_condition_matchers()
2075 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
2076 except FileNotFoundError:
2077 cf = ConfigFile()
2078 cf.path = path
2079 return cf
2081 def get_config(self) -> "ConfigFile":
2082 """Retrieve the config object.
2084 Returns: `ConfigFile` object for the ``.git/config`` file.
2085 """
2086 from .config import ConfigFile
2088 path = os.path.join(self._commondir, "config")
2089 try:
2090 # Pass condition matchers for includeIf evaluation
2091 condition_matchers = self._get_config_condition_matchers()
2092 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
2093 except FileNotFoundError:
2094 ret = ConfigFile()
2095 ret.path = path
2096 return ret
2098 def get_rebase_state_manager(self) -> "RebaseStateManager":
2099 """Get the appropriate rebase state manager for this repository.
2101 Returns: DiskRebaseStateManager instance
2102 """
2103 import os
2105 from .rebase import DiskRebaseStateManager
2107 path = os.path.join(self.controldir(), "rebase-merge")
2108 return DiskRebaseStateManager(path)
2110 def get_description(self) -> bytes | None:
2111 """Retrieve the description of this repository.
2113 Returns: Description as bytes or None.
2114 """
2115 path = os.path.join(self._controldir, "description")
2116 try:
2117 with GitFile(path, "rb") as f:
2118 return f.read()
2119 except FileNotFoundError:
2120 return None
2122 def __repr__(self) -> str:
2123 """Return string representation of this repository."""
2124 return f"<Repo at {self.path!r}>"
2126 def set_description(self, description: bytes) -> None:
2127 """Set the description for this repository.
2129 Args:
2130 description: Text to set as description for this repository.
2131 """
2132 self._put_named_file("description", description)
2134 @classmethod
2135 def _init_maybe_bare(
2136 cls,
2137 path: str | bytes | os.PathLike[str],
2138 controldir: str | bytes | os.PathLike[str],
2139 bare: bool,
2140 object_store: PackBasedObjectStore | None = None,
2141 config: "StackedConfig | None" = None,
2142 default_branch: bytes | None = None,
2143 symlinks: bool | None = None,
2144 format: int | None = None,
2145 shared_repository: str | bool | None = None,
2146 object_format: str | None = None,
2147 ) -> "Repo":
2148 path = os.fspath(path)
2149 if isinstance(path, bytes):
2150 path = os.fsdecode(path)
2151 controldir = os.fspath(controldir)
2152 if isinstance(controldir, bytes):
2153 controldir = os.fsdecode(controldir)
2155 # Determine shared repository permissions early
2156 file_mode: int | None = None
2157 dir_mode: int | None = None
2158 if shared_repository is not None:
2159 file_mode, dir_mode = parse_shared_repository(shared_repository)
2161 # Create base directories with appropriate permissions
2162 for d in BASE_DIRECTORIES:
2163 dir_path = os.path.join(controldir, *d)
2164 os.mkdir(dir_path)
2165 if dir_mode is not None:
2166 os.chmod(dir_path, dir_mode)
2168 # Determine hash algorithm
2169 from .object_format import get_object_format
2171 hash_alg = get_object_format(object_format)
2173 if object_store is None:
2174 object_store = DiskObjectStore.init(
2175 os.path.join(controldir, OBJECTDIR),
2176 file_mode=file_mode,
2177 dir_mode=dir_mode,
2178 object_format=hash_alg,
2179 )
2180 ret = cls(path, bare=bare, object_store=object_store)
2181 if default_branch is None:
2182 if config is None:
2183 from .config import StackedConfig
2185 config = StackedConfig.default()
2186 try:
2187 default_branch = config.get("init", "defaultBranch")
2188 except KeyError:
2189 default_branch = DEFAULT_BRANCH
2190 ret.refs.set_symbolic_ref(HEADREF, local_branch_name(default_branch))
2191 ret._init_files(
2192 bare=bare,
2193 symlinks=symlinks,
2194 format=format,
2195 shared_repository=shared_repository,
2196 object_format=object_format,
2197 )
2198 return ret
2200 @classmethod
2201 def init(
2202 cls,
2203 path: str | bytes | os.PathLike[str],
2204 *,
2205 mkdir: bool = False,
2206 config: "StackedConfig | None" = None,
2207 default_branch: bytes | None = None,
2208 symlinks: bool | None = None,
2209 format: int | None = None,
2210 shared_repository: str | bool | None = None,
2211 object_format: str | None = None,
2212 ) -> "Repo":
2213 """Create a new repository.
2215 Args:
2216 path: Path in which to create the repository
2217 mkdir: Whether to create the directory
2218 config: Configuration object
2219 default_branch: Default branch name
2220 symlinks: Whether to support symlinks
2221 format: Repository format version (defaults to 0)
2222 shared_repository: Shared repository setting (group, all, umask, or octal)
2223 object_format: Object format to use ("sha1" or "sha256", defaults to "sha1")
2224 Returns: `Repo` instance
2225 """
2226 path = os.fspath(path)
2227 if isinstance(path, bytes):
2228 path = os.fsdecode(path)
2229 if mkdir:
2230 os.mkdir(path)
2231 controldir = os.path.join(path, CONTROLDIR)
2232 os.mkdir(controldir)
2233 _set_filesystem_hidden(controldir)
2234 return cls._init_maybe_bare(
2235 path,
2236 controldir,
2237 False,
2238 config=config,
2239 default_branch=default_branch,
2240 symlinks=symlinks,
2241 format=format,
2242 shared_repository=shared_repository,
2243 object_format=object_format,
2244 )
2246 @classmethod
2247 def _init_new_working_directory(
2248 cls,
2249 path: str | bytes | os.PathLike[str],
2250 main_repo: "Repo",
2251 identifier: str | None = None,
2252 mkdir: bool = False,
2253 relative_paths: bool = False,
2254 ) -> "Repo":
2255 """Create a new working directory linked to a repository.
2257 Args:
2258 path: Path in which to create the working tree.
2259 main_repo: Main repository to reference
2260 identifier: Worktree identifier
2261 mkdir: Whether to create the directory
2262 relative_paths: Whether to use relative paths for gitdir references
2263 Returns: `Repo` instance
2264 """
2265 path = os.fspath(path)
2266 if isinstance(path, bytes):
2267 path = os.fsdecode(path)
2268 if mkdir:
2269 os.mkdir(path)
2270 if identifier is None:
2271 identifier = os.path.basename(path)
2272 # Ensure we use absolute path for the worktree control directory
2273 main_controldir = os.path.abspath(main_repo.controldir())
2274 main_worktreesdir = os.path.join(main_controldir, WORKTREES)
2275 worktree_controldir = os.path.join(main_worktreesdir, identifier)
2276 gitdirfile_abs = os.path.abspath(os.path.join(path, CONTROLDIR))
2278 # Write gitdir reference in .git file (can be relative)
2279 # Import helper from worktree module to avoid duplication
2280 from .worktree import _compute_gitdir_path
2282 gitdir_ref = _compute_gitdir_path(
2283 main_repo,
2284 worktree_controldir,
2285 os.path.dirname(gitdirfile_abs),
2286 relative_paths,
2287 )
2289 with open(gitdirfile_abs, "wb") as f:
2290 f.write(b"gitdir: " + os.fsencode(gitdir_ref) + b"\n")
2292 # Get shared repository permissions from main repository
2293 _, dir_mode = main_repo._get_shared_repository_permissions()
2295 # Create directories with appropriate permissions
2296 try:
2297 os.mkdir(main_worktreesdir)
2298 if dir_mode is not None:
2299 os.chmod(main_worktreesdir, dir_mode)
2300 except FileExistsError:
2301 pass
2302 try:
2303 os.mkdir(worktree_controldir)
2304 if dir_mode is not None:
2305 os.chmod(worktree_controldir, dir_mode)
2306 except FileExistsError:
2307 pass
2309 # Write gitdir path in control directory (can be relative)
2310 gitdir_path = _compute_gitdir_path(
2311 main_repo, gitdirfile_abs, worktree_controldir, relative_paths
2312 )
2314 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f:
2315 f.write(os.fsencode(gitdir_path) + b"\n")
2316 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f:
2317 f.write(b"../..\n")
2318 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f:
2319 f.write(main_repo.head() + b"\n")
2320 r = cls(os.path.normpath(path))
2321 r.get_worktree().reset_index(config=r.get_config_stack())
2322 return r
2324 @classmethod
2325 def init_bare(
2326 cls,
2327 path: str | bytes | os.PathLike[str],
2328 *,
2329 mkdir: bool = False,
2330 object_store: PackBasedObjectStore | None = None,
2331 config: "StackedConfig | None" = None,
2332 default_branch: bytes | None = None,
2333 format: int | None = None,
2334 shared_repository: str | bool | None = None,
2335 object_format: str | None = None,
2336 ) -> "Repo":
2337 """Create a new bare repository.
2339 ``path`` should already exist and be an empty directory.
2341 Args:
2342 path: Path to create bare repository in
2343 mkdir: Whether to create the directory
2344 object_store: Object store to use
2345 config: Configuration object
2346 default_branch: Default branch name
2347 format: Repository format version (defaults to 0)
2348 shared_repository: Shared repository setting (group, all, umask, or octal)
2349 object_format: Object format to use ("sha1" or "sha256", defaults to "sha1")
2350 Returns: a `Repo` instance
2351 """
2352 path = os.fspath(path)
2353 if isinstance(path, bytes):
2354 path = os.fsdecode(path)
2355 if mkdir:
2356 os.mkdir(path)
2357 return cls._init_maybe_bare(
2358 path,
2359 path,
2360 True,
2361 object_store=object_store,
2362 config=config,
2363 default_branch=default_branch,
2364 format=format,
2365 shared_repository=shared_repository,
2366 object_format=object_format,
2367 )
2369 create = init_bare
2371 def close(self) -> None:
2372 """Close any files opened by this repository."""
2373 self.object_store.close()
2374 # Clean up filter context if it was created
2375 if self.filter_context is not None:
2376 self.filter_context.close()
2377 self.filter_context = None
2379 def __enter__(self) -> Self:
2380 """Enter context manager."""
2381 return self
2383 def __exit__(
2384 self,
2385 exc_type: type[BaseException] | None,
2386 exc_val: BaseException | None,
2387 exc_tb: TracebackType | None,
2388 ) -> None:
2389 """Exit context manager and close repository."""
2390 self.close()
2392 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]:
2393 """Read .gitattributes file from working tree.
2395 Returns:
2396 Dictionary mapping file patterns to attributes
2397 """
2398 gitattributes = {}
2399 gitattributes_path = os.path.join(self.path, ".gitattributes")
2401 if os.path.exists(gitattributes_path):
2402 with open(gitattributes_path, "rb") as f:
2403 for line in f:
2404 line = line.strip()
2405 if not line or line.startswith(b"#"):
2406 continue
2408 parts = line.split()
2409 if len(parts) < 2:
2410 continue
2412 pattern = parts[0]
2413 attrs = {}
2415 for attr in parts[1:]:
2416 if attr.startswith(b"-"):
2417 # Unset attribute
2418 attrs[attr[1:]] = b"false"
2419 elif b"=" in attr:
2420 # Set to value
2421 key, value = attr.split(b"=", 1)
2422 attrs[key] = value
2423 else:
2424 # Set attribute
2425 attrs[attr] = b"true"
2427 gitattributes[pattern] = attrs
2429 return gitattributes
2431 def get_blob_normalizer(
2432 self, config: "Config | None" = None
2433 ) -> "FilterBlobNormalizer":
2434 """Return a BlobNormalizer object.
2436 Args:
2437 config: Configuration to consult for filter setup. If None,
2438 falls back to ``self.get_config_stack()``.
2439 """
2440 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2442 if config is None:
2443 config = self.get_config_stack()
2444 git_attributes = self.get_gitattributes()
2446 # Lazily create FilterContext if needed
2447 if self.filter_context is None:
2448 filter_registry = FilterRegistry(config, self)
2449 self.filter_context = FilterContext(filter_registry)
2450 else:
2451 # Refresh the context with current config to handle config changes
2452 self.filter_context.refresh_config(config)
2454 return FilterBlobNormalizer(
2455 config, git_attributes, filter_context=self.filter_context
2456 )
2458 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
2459 """Read gitattributes for the repository.
2461 Args:
2462 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
2464 Returns:
2465 GitAttributes object that can be used to match paths
2466 """
2467 from .attrs import (
2468 GitAttributes,
2469 Pattern,
2470 parse_git_attributes,
2471 )
2473 patterns = []
2475 # Read system gitattributes (TODO: implement this)
2476 # Read global gitattributes (TODO: implement this)
2478 # Read repository .gitattributes from index/tree
2479 if tree is None:
2480 try:
2481 # Try to get from HEAD
2482 head = self[b"HEAD"]
2483 # Peel tags to get to the underlying commit
2484 while isinstance(head, Tag):
2485 _cls, obj = head.object
2486 head = self.get_object(obj)
2487 if not isinstance(head, Commit):
2488 raise ValueError(
2489 f"Expected HEAD to point to a Commit, got {type(head).__name__}. "
2490 f"This usually means HEAD points to a {type(head).__name__} object "
2491 f"instead of a Commit."
2492 )
2493 tree = head.tree
2494 except KeyError:
2495 # No HEAD, no attributes from tree
2496 pass
2498 if tree is not None:
2499 try:
2500 tree_obj = self[tree]
2501 assert isinstance(tree_obj, Tree)
2502 if b".gitattributes" in tree_obj:
2503 _, attrs_sha = tree_obj[b".gitattributes"]
2504 attrs_blob = self[attrs_sha]
2505 if isinstance(attrs_blob, Blob):
2506 attrs_data = BytesIO(attrs_blob.data)
2507 for pattern_bytes, attrs in parse_git_attributes(attrs_data):
2508 pattern = Pattern(pattern_bytes)
2509 patterns.append((pattern, attrs))
2510 except (KeyError, NotTreeError):
2511 pass
2513 # Read .git/info/attributes
2514 info_attrs_path = os.path.join(self.controldir(), "info", "attributes")
2515 if os.path.exists(info_attrs_path):
2516 with open(info_attrs_path, "rb") as f:
2517 for pattern_bytes, attrs in parse_git_attributes(f):
2518 pattern = Pattern(pattern_bytes)
2519 patterns.append((pattern, attrs))
2521 # Read .gitattributes from working directory (if it exists)
2522 working_attrs_path = os.path.join(self.path, ".gitattributes")
2523 if os.path.exists(working_attrs_path):
2524 with open(working_attrs_path, "rb") as f:
2525 for pattern_bytes, attrs in parse_git_attributes(f):
2526 pattern = Pattern(pattern_bytes)
2527 patterns.append((pattern, attrs))
2529 return GitAttributes(patterns)
2532class MemoryRepo(BaseRepo):
2533 """Repo that stores refs, objects, and named files in memory.
2535 MemoryRepos are always bare: they have no working tree and no index, since
2536 those have a stronger dependency on the filesystem.
2537 """
2539 filter_context: "FilterContext | None"
2541 def __init__(self) -> None:
2542 """Create a new repository in memory."""
2543 from .config import ConfigFile
2544 from .object_format import DEFAULT_OBJECT_FORMAT
2546 self._reflog: list[Any] = []
2547 refs_container = DictRefsContainer({}, logger=self._append_reflog)
2548 BaseRepo.__init__(self, MemoryObjectStore(), refs_container)
2549 self._named_files: dict[str, bytes] = {}
2550 self.bare = True
2551 self._config = ConfigFile()
2552 self._description: bytes | None = None
2553 self.filter_context = None
2554 # MemoryRepo defaults to default object format
2555 self.object_format = DEFAULT_OBJECT_FORMAT
2557 def _append_reflog(
2558 self,
2559 ref: bytes,
2560 old_sha: bytes | None,
2561 new_sha: bytes | None,
2562 committer: bytes | None,
2563 timestamp: int | None,
2564 timezone: int | None,
2565 message: bytes | None,
2566 ) -> None:
2567 self._reflog.append(
2568 (ref, old_sha, new_sha, committer, timestamp, timezone, message)
2569 )
2571 def set_description(self, description: bytes) -> None:
2572 """Set the description for this repository.
2574 Args:
2575 description: Text to set as description
2576 """
2577 self._description = description
2579 def get_description(self) -> bytes | None:
2580 """Get the description of this repository.
2582 Returns:
2583 Repository description as bytes
2584 """
2585 return self._description
2587 def _determine_file_mode(self) -> bool:
2588 """Probe the file-system to determine whether permissions can be trusted.
2590 Returns: True if permissions can be trusted, False otherwise.
2591 """
2592 return sys.platform != "win32"
2594 def _determine_symlinks(self) -> bool:
2595 """Probe the file-system to determine whether permissions can be trusted.
2597 Returns: True if permissions can be trusted, False otherwise.
2598 """
2599 return sys.platform != "win32"
2601 def _put_named_file(self, path: str, contents: bytes) -> None:
2602 """Write a file to the control dir with the given name and contents.
2604 Args:
2605 path: The path to the file, relative to the control dir.
2606 contents: A string to write to the file.
2607 """
2608 self._named_files[path] = contents
2610 def _del_named_file(self, path: str) -> None:
2611 try:
2612 del self._named_files[path]
2613 except KeyError:
2614 pass
2616 def get_named_file(
2617 self,
2618 path: str | bytes,
2619 basedir: str | None = None,
2620 ) -> BytesIO | None:
2621 """Get a file from the control dir with a specific name.
2623 Although the filename should be interpreted as a filename relative to
2624 the control dir in a disk-baked Repo, the object returned need not be
2625 pointing to a file in that location.
2627 Args:
2628 path: The path to the file, relative to the control dir.
2629 basedir: Optional base directory for the path
2630 Returns: An open file object, or None if the file does not exist.
2631 """
2632 path_str = path.decode() if isinstance(path, bytes) else path
2633 contents = self._named_files.get(path_str, None)
2634 if contents is None:
2635 return None
2636 return BytesIO(contents)
2638 def open_index(self, config: "Config | None" = None) -> "Index":
2639 """Fail to open index for this repo, since it is bare.
2641 Args:
2642 config: Unused; kept for signature compatibility with ``BaseRepo``.
2644 Raises:
2645 NoIndexPresent: Raised when no index is present
2646 """
2647 raise NoIndexPresent
2649 def _init_config(self, config: "ConfigFile") -> None:
2650 """Initialize repository configuration for MemoryRepo."""
2651 self._config = config
2653 def get_config(self) -> "ConfigFile":
2654 """Retrieve the config object.
2656 Returns: `ConfigFile` object.
2657 """
2658 return self._config
2660 def get_rebase_state_manager(self) -> "RebaseStateManager":
2661 """Get the appropriate rebase state manager for this repository.
2663 Returns: MemoryRebaseStateManager instance
2664 """
2665 from .rebase import MemoryRebaseStateManager
2667 return MemoryRebaseStateManager(self)
2669 def get_blob_normalizer(
2670 self, config: "Config | None" = None
2671 ) -> "FilterBlobNormalizer":
2672 """Return a BlobNormalizer object for checkin/checkout operations.
2674 Args:
2675 config: Configuration to consult for filter setup. If None,
2676 falls back to ``self.get_config_stack()``.
2677 """
2678 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2680 if config is None:
2681 config = self.get_config_stack()
2682 git_attributes = self.get_gitattributes()
2684 if self.filter_context is None:
2685 filter_registry = FilterRegistry(config, self)
2686 self.filter_context = FilterContext(filter_registry)
2687 else:
2688 self.filter_context.refresh_config(config)
2690 return FilterBlobNormalizer(
2691 config, git_attributes, filter_context=self.filter_context
2692 )
2694 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
2695 """Read gitattributes for the repository."""
2696 from .attrs import GitAttributes
2698 # Memory repos don't have working trees or gitattributes files
2699 # Return empty GitAttributes
2700 return GitAttributes([])
2702 def close(self) -> None:
2703 """Close any resources opened by this repository."""
2704 # Clean up filter context if it was created
2705 if self.filter_context is not None:
2706 self.filter_context.close()
2707 self.filter_context = None
2708 # Close object store to release pack files
2709 self.object_store.close()
2711 def do_commit(
2712 self,
2713 message: bytes | None = None,
2714 committer: bytes | None = None,
2715 author: bytes | None = None,
2716 commit_timestamp: float | None = None,
2717 commit_timezone: int | None = None,
2718 author_timestamp: float | None = None,
2719 author_timezone: int | None = None,
2720 tree: ObjectID | None = None,
2721 encoding: bytes | None = None,
2722 ref: Ref | None = HEADREF,
2723 merge_heads: list[ObjectID] | None = None,
2724 no_verify: bool = False,
2725 sign: bool = False,
2726 config: "Config | None" = None,
2727 ) -> bytes:
2728 """Create a new commit.
2730 This is a simplified implementation for in-memory repositories that
2731 doesn't support worktree operations or hooks.
2733 Args:
2734 message: Commit message
2735 committer: Committer fullname
2736 author: Author fullname
2737 commit_timestamp: Commit timestamp (defaults to now)
2738 commit_timezone: Commit timestamp timezone (defaults to GMT)
2739 author_timestamp: Author timestamp (defaults to commit timestamp)
2740 author_timezone: Author timestamp timezone (defaults to commit timezone)
2741 tree: SHA1 of the tree root to use
2742 encoding: Encoding
2743 ref: Optional ref to commit to (defaults to current branch).
2744 If None, creates a dangling commit without updating any ref.
2745 merge_heads: Merge heads
2746 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo)
2747 sign: GPG Sign the commit (ignored for MemoryRepo)
2748 config: Configuration to consult for committer/author identity. If
2749 None, falls back to ``self.get_config_stack()``.
2751 Returns:
2752 New commit SHA1
2753 """
2754 import time
2756 from .objects import Commit
2758 if tree is None:
2759 raise ValueError("tree must be specified for MemoryRepo")
2761 c = Commit()
2762 if len(tree) != self.object_format.hex_length:
2763 raise ValueError(
2764 f"tree must be a {self.object_format.hex_length}-character hex sha string"
2765 )
2766 c.tree = tree
2768 if config is None:
2769 config = self.get_config_stack()
2770 if merge_heads is None:
2771 merge_heads = []
2772 if committer is None:
2773 committer = get_user_identity(config, kind="COMMITTER")
2774 check_user_identity(committer)
2775 c.committer = committer
2776 if commit_timestamp is None:
2777 commit_timestamp = time.time()
2778 c.commit_time = int(commit_timestamp)
2779 if commit_timezone is None:
2780 commit_timezone = 0
2781 c.commit_timezone = commit_timezone
2782 if author is None:
2783 author = get_user_identity(config, kind="AUTHOR")
2784 c.author = author
2785 check_user_identity(author)
2786 if author_timestamp is None:
2787 author_timestamp = commit_timestamp
2788 c.author_time = int(author_timestamp)
2789 if author_timezone is None:
2790 author_timezone = commit_timezone
2791 c.author_timezone = author_timezone
2792 if encoding is None:
2793 try:
2794 encoding = config.get(("i18n",), "commitEncoding")
2795 except KeyError:
2796 pass
2797 if encoding is not None:
2798 c.encoding = encoding
2800 # Handle message (for MemoryRepo, we don't support callable messages)
2801 if callable(message):
2802 message = message(self, c)
2803 if message is None:
2804 raise ValueError("Message callback returned None")
2806 if message is None:
2807 raise ValueError("No commit message specified")
2809 c.message = message
2811 if ref is None:
2812 # Create a dangling commit
2813 c.parents = merge_heads
2814 self.object_store.add_object(c)
2815 else:
2816 try:
2817 old_head = self.refs[ref]
2818 c.parents = [old_head, *merge_heads]
2819 self.object_store.add_object(c)
2820 ok = self.refs.set_if_equals(
2821 ref,
2822 old_head,
2823 c.id,
2824 message=b"commit: " + message,
2825 committer=committer,
2826 timestamp=int(commit_timestamp),
2827 timezone=commit_timezone,
2828 )
2829 except KeyError:
2830 c.parents = merge_heads
2831 self.object_store.add_object(c)
2832 ok = self.refs.add_if_new(
2833 ref,
2834 c.id,
2835 message=b"commit: " + message,
2836 committer=committer,
2837 timestamp=int(commit_timestamp),
2838 timezone=commit_timezone,
2839 )
2840 if not ok:
2841 from .errors import CommitError
2843 raise CommitError(f"{ref!r} changed during commit")
2845 return c.id
2847 @classmethod
2848 def init_bare(
2849 cls,
2850 objects: Iterable[ShaFile],
2851 refs: Mapping[Ref, ObjectID],
2852 format: int | None = None,
2853 object_format: str | None = None,
2854 ) -> "MemoryRepo":
2855 """Create a new bare repository in memory.
2857 Args:
2858 objects: Objects for the new repository,
2859 as iterable
2860 refs: Refs as dictionary, mapping names
2861 to object SHA1s
2862 format: Repository format version (defaults to 0)
2863 object_format: Object format to use ("sha1" or "sha256", defaults to "sha1")
2864 """
2865 ret = cls()
2866 for obj in objects:
2867 ret.object_store.add_object(obj)
2868 for refname, sha in refs.items():
2869 ret.refs.add_if_new(refname, sha)
2870 ret._init_files(bare=True, format=format, object_format=object_format)
2871 return ret