Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# repo.py -- For dealing with git repositories.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
24"""Repository access.
26This module contains the base class for git repositories
27(BaseRepo) and an implementation which uses a repository on
28local disk (Repo).
30"""
32import os
33import stat
34import sys
35import time
36import warnings
37from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Sequence
38from io import BytesIO
39from types import TracebackType
40from typing import (
41 TYPE_CHECKING,
42 Any,
43 BinaryIO,
44 Optional,
45 TypeVar,
46)
48if TYPE_CHECKING:
49 # There are no circular imports here, but we try to defer imports as long
50 # as possible to reduce start-up time for anything that doesn't need
51 # these imports.
52 from .attrs import GitAttributes
53 from .config import ConditionMatcher, ConfigFile, StackedConfig
54 from .diff_tree import RenameDetector
55 from .filters import FilterBlobNormalizer, FilterContext
56 from .index import Index
57 from .notes import Notes
58 from .object_store import BaseObjectStore, GraphWalker
59 from .pack import UnpackedObject
60 from .rebase import RebaseStateManager
61 from .walk import Walker
62 from .worktree import WorkTree
64from . import reflog, replace_me
65from .errors import (
66 NoIndexPresent,
67 NotBlobError,
68 NotCommitError,
69 NotGitRepository,
70 NotTagError,
71 NotTreeError,
72 RefFormatError,
73)
74from .file import GitFile
75from .hooks import (
76 CommitMsgShellHook,
77 Hook,
78 PostCommitShellHook,
79 PostReceiveShellHook,
80 PreCommitShellHook,
81)
82from .object_store import (
83 DiskObjectStore,
84 MemoryObjectStore,
85 MissingObjectFinder,
86 ObjectStoreGraphWalker,
87 PackBasedObjectStore,
88 PackCapableObjectStore,
89 find_shallow,
90 peel_sha,
91)
92from .objects import (
93 Blob,
94 Commit,
95 ObjectID,
96 ShaFile,
97 Tag,
98 Tree,
99 check_hexsha,
100 valid_hexsha,
101)
102from .pack import generate_unpacked_objects
103from .refs import (
104 ANNOTATED_TAG_SUFFIX, # noqa: F401
105 LOCAL_TAG_PREFIX, # noqa: F401
106 SYMREF, # noqa: F401
107 DictRefsContainer,
108 DiskRefsContainer,
109 InfoRefsContainer, # noqa: F401
110 Ref,
111 RefsContainer,
112 _set_default_branch,
113 _set_head,
114 _set_origin_head,
115 check_ref_format, # noqa: F401
116 extract_branch_name,
117 is_per_worktree_ref,
118 local_branch_name,
119 read_packed_refs, # noqa: F401
120 read_packed_refs_with_peeled, # noqa: F401
121 serialize_refs,
122 write_packed_refs, # noqa: F401
123)
125CONTROLDIR = ".git"
126OBJECTDIR = "objects"
127DEFAULT_OFS_DELTA = True
129T = TypeVar("T", bound="ShaFile")
130REFSDIR = "refs"
131REFSDIR_TAGS = "tags"
132REFSDIR_HEADS = "heads"
133INDEX_FILENAME = "index"
134COMMONDIR = "commondir"
135GITDIR = "gitdir"
136WORKTREES = "worktrees"
138BASE_DIRECTORIES = [
139 ["branches"],
140 [REFSDIR],
141 [REFSDIR, REFSDIR_TAGS],
142 [REFSDIR, REFSDIR_HEADS],
143 ["hooks"],
144 ["info"],
145]
147DEFAULT_BRANCH = b"master"
150class InvalidUserIdentity(Exception):
151 """User identity is not of the format 'user <email>'."""
153 def __init__(self, identity: str) -> None:
154 """Initialize InvalidUserIdentity exception."""
155 self.identity = identity
158class DefaultIdentityNotFound(Exception):
159 """Default identity could not be determined."""
162# TODO(jelmer): Cache?
163def _get_default_identity() -> tuple[str, str]:
164 import socket
166 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
167 username = os.environ.get(name)
168 if username:
169 break
170 else:
171 username = None
173 try:
174 import pwd
175 except ImportError:
176 fullname = None
177 else:
178 try:
179 entry = pwd.getpwuid(os.getuid()) # type: ignore[attr-defined,unused-ignore]
180 except KeyError:
181 fullname = None
182 else:
183 if getattr(entry, "gecos", None):
184 fullname = entry.pw_gecos.split(",")[0]
185 else:
186 fullname = None
187 if username is None:
188 username = entry.pw_name
189 if not fullname:
190 if username is None:
191 raise DefaultIdentityNotFound("no username found")
192 fullname = username
193 email = os.environ.get("EMAIL")
194 if email is None:
195 if username is None:
196 raise DefaultIdentityNotFound("no username found")
197 email = f"{username}@{socket.gethostname()}"
198 return (fullname, email)
201def get_user_identity(config: "StackedConfig", kind: str | None = None) -> bytes:
202 """Determine the identity to use for new commits.
204 If kind is set, this first checks
205 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL.
207 If those variables are not set, then it will fall back
208 to reading the user.name and user.email settings from
209 the specified configuration.
211 If that also fails, then it will fall back to using
212 the current users' identity as obtained from the host
213 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f).
215 Args:
216 config: Configuration stack to read from
217 kind: Optional kind to return identity for,
218 usually either "AUTHOR" or "COMMITTER".
220 Returns:
221 A user identity
222 """
223 user: bytes | None = None
224 email: bytes | None = None
225 if kind:
226 user_uc = os.environ.get("GIT_" + kind + "_NAME")
227 if user_uc is not None:
228 user = user_uc.encode("utf-8")
229 email_uc = os.environ.get("GIT_" + kind + "_EMAIL")
230 if email_uc is not None:
231 email = email_uc.encode("utf-8")
232 if user is None:
233 try:
234 user = config.get(("user",), "name")
235 except KeyError:
236 user = None
237 if email is None:
238 try:
239 email = config.get(("user",), "email")
240 except KeyError:
241 email = None
242 default_user, default_email = _get_default_identity()
243 if user is None:
244 user = default_user.encode("utf-8")
245 if email is None:
246 email = default_email.encode("utf-8")
247 if email.startswith(b"<") and email.endswith(b">"):
248 email = email[1:-1]
249 return user + b" <" + email + b">"
252def check_user_identity(identity: bytes) -> None:
253 """Verify that a user identity is formatted correctly.
255 Args:
256 identity: User identity bytestring
257 Raises:
258 InvalidUserIdentity: Raised when identity is invalid
259 """
260 try:
261 _fst, snd = identity.split(b" <", 1)
262 except ValueError as exc:
263 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc
264 if b">" not in snd:
265 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
266 if b"\0" in identity or b"\n" in identity:
267 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
270def parse_graftpoints(
271 graftpoints: Iterable[bytes],
272) -> dict[bytes, list[bytes]]:
273 """Convert a list of graftpoints into a dict.
275 Args:
276 graftpoints: Iterator of graftpoint lines
278 Each line is formatted as:
279 <commit sha1> <parent sha1> [<parent sha1>]*
281 Resulting dictionary is:
282 <commit sha1>: [<parent sha1>*]
284 https://git.wiki.kernel.org/index.php/GraftPoint
285 """
286 grafts = {}
287 for line in graftpoints:
288 raw_graft = line.split(None, 1)
290 commit = raw_graft[0]
291 if len(raw_graft) == 2:
292 parents = raw_graft[1].split()
293 else:
294 parents = []
296 for sha in [commit, *parents]:
297 check_hexsha(sha, "Invalid graftpoint")
299 grafts[commit] = parents
300 return grafts
303def serialize_graftpoints(graftpoints: Mapping[bytes, Sequence[bytes]]) -> bytes:
304 """Convert a dictionary of grafts into string.
306 The graft dictionary is:
307 <commit sha1>: [<parent sha1>*]
309 Each line is formatted as:
310 <commit sha1> <parent sha1> [<parent sha1>]*
312 https://git.wiki.kernel.org/index.php/GraftPoint
314 """
315 graft_lines = []
316 for commit, parents in graftpoints.items():
317 if parents:
318 graft_lines.append(commit + b" " + b" ".join(parents))
319 else:
320 graft_lines.append(commit)
321 return b"\n".join(graft_lines)
324def _set_filesystem_hidden(path: str) -> None:
325 """Mark path as to be hidden if supported by platform and filesystem.
327 On win32 uses SetFileAttributesW api:
328 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw>
329 """
330 if sys.platform == "win32":
331 import ctypes
332 from ctypes.wintypes import BOOL, DWORD, LPCWSTR
334 FILE_ATTRIBUTE_HIDDEN = 2
335 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)(
336 ("SetFileAttributesW", ctypes.windll.kernel32)
337 )
339 if isinstance(path, bytes):
340 path = os.fsdecode(path)
341 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN):
342 pass # Could raise or log `ctypes.WinError()` here
344 # Could implement other platform specific filesystem hiding here
347class ParentsProvider:
348 """Provider for commit parent information."""
350 def __init__(
351 self,
352 store: "BaseObjectStore",
353 grafts: dict[bytes, list[bytes]] = {},
354 shallows: Iterable[bytes] = [],
355 ) -> None:
356 """Initialize ParentsProvider.
358 Args:
359 store: Object store to use
360 grafts: Graft information
361 shallows: Shallow commit SHAs
362 """
363 self.store = store
364 self.grafts = grafts
365 self.shallows = set(shallows)
367 # Get commit graph once at initialization for performance
368 self.commit_graph = store.get_commit_graph()
370 def get_parents(
371 self, commit_id: bytes, commit: Commit | None = None
372 ) -> list[bytes]:
373 """Get parents for a commit using the parents provider."""
374 try:
375 return self.grafts[commit_id]
376 except KeyError:
377 pass
378 if commit_id in self.shallows:
379 return []
381 # Try to use commit graph for faster parent lookup
382 if self.commit_graph:
383 parents = self.commit_graph.get_parents(commit_id)
384 if parents is not None:
385 return parents
387 # Fallback to reading the commit object
388 if commit is None:
389 obj = self.store[commit_id]
390 assert isinstance(obj, Commit)
391 commit = obj
392 parents = commit.parents
393 assert isinstance(parents, list)
394 return parents
397class BaseRepo:
398 """Base class for a git repository.
400 This base class is meant to be used for Repository implementations that e.g.
401 work on top of a different transport than a standard filesystem path.
403 Attributes:
404 object_store: Dictionary-like object for accessing
405 the objects
406 refs: Dictionary-like object with the refs in this
407 repository
408 """
410 def __init__(
411 self, object_store: "PackCapableObjectStore", refs: RefsContainer
412 ) -> None:
413 """Open a repository.
415 This shouldn't be called directly, but rather through one of the
416 base classes, such as MemoryRepo or Repo.
418 Args:
419 object_store: Object store to use
420 refs: Refs container to use
421 """
422 self.object_store = object_store
423 self.refs = refs
425 self._graftpoints: dict[bytes, list[bytes]] = {}
426 self.hooks: dict[str, Hook] = {}
428 def _determine_file_mode(self) -> bool:
429 """Probe the file-system to determine whether permissions can be trusted.
431 Returns: True if permissions can be trusted, False otherwise.
432 """
433 raise NotImplementedError(self._determine_file_mode)
435 def _determine_symlinks(self) -> bool:
436 """Probe the filesystem to determine whether symlinks can be created.
438 Returns: True if symlinks can be created, False otherwise.
439 """
440 # For now, just mimic the old behaviour
441 return sys.platform != "win32"
443 def _init_files(
444 self, bare: bool, symlinks: bool | None = None, format: int | None = None
445 ) -> None:
446 """Initialize a default set of named files."""
447 from .config import ConfigFile
449 self._put_named_file("description", b"Unnamed repository")
450 f = BytesIO()
451 cf = ConfigFile()
452 if format is None:
453 format = 0
454 if format not in (0, 1):
455 raise ValueError(f"Unsupported repository format version: {format}")
456 cf.set("core", "repositoryformatversion", str(format))
457 if self._determine_file_mode():
458 cf.set("core", "filemode", True)
459 else:
460 cf.set("core", "filemode", False)
462 if symlinks is None and not bare:
463 symlinks = self._determine_symlinks()
465 if symlinks is False:
466 cf.set("core", "symlinks", symlinks)
468 cf.set("core", "bare", bare)
469 cf.set("core", "logallrefupdates", True)
470 cf.write_to_file(f)
471 self._put_named_file("config", f.getvalue())
472 self._put_named_file(os.path.join("info", "exclude"), b"")
474 def get_named_file(self, path: str) -> BinaryIO | None:
475 """Get a file from the control dir with a specific name.
477 Although the filename should be interpreted as a filename relative to
478 the control dir in a disk-based Repo, the object returned need not be
479 pointing to a file in that location.
481 Args:
482 path: The path to the file, relative to the control dir.
483 Returns: An open file object, or None if the file does not exist.
484 """
485 raise NotImplementedError(self.get_named_file)
487 def _put_named_file(self, path: str, contents: bytes) -> None:
488 """Write a file to the control dir with the given name and contents.
490 Args:
491 path: The path to the file, relative to the control dir.
492 contents: A string to write to the file.
493 """
494 raise NotImplementedError(self._put_named_file)
496 def _del_named_file(self, path: str) -> None:
497 """Delete a file in the control directory with the given name."""
498 raise NotImplementedError(self._del_named_file)
500 def open_index(self) -> "Index":
501 """Open the index for this repository.
503 Raises:
504 NoIndexPresent: If no index is present
505 Returns: The matching `Index`
506 """
507 raise NotImplementedError(self.open_index)
509 def fetch(
510 self,
511 target: "BaseRepo",
512 determine_wants: Callable[[Mapping[bytes, bytes], int | None], list[bytes]]
513 | None = None,
514 progress: Callable[..., None] | None = None,
515 depth: int | None = None,
516 ) -> dict[bytes, bytes]:
517 """Fetch objects into another repository.
519 Args:
520 target: The target repository
521 determine_wants: Optional function to determine what refs to
522 fetch.
523 progress: Optional progress function
524 depth: Optional shallow fetch depth
525 Returns: The local refs
526 """
527 if determine_wants is None:
528 determine_wants = target.object_store.determine_wants_all
529 count, pack_data = self.fetch_pack_data(
530 determine_wants,
531 target.get_graph_walker(),
532 progress=progress,
533 depth=depth,
534 )
535 target.object_store.add_pack_data(count, pack_data, progress)
536 return self.get_refs()
538 def fetch_pack_data(
539 self,
540 determine_wants: Callable[[Mapping[bytes, bytes], int | None], list[bytes]],
541 graph_walker: "GraphWalker",
542 progress: Callable[[bytes], None] | None,
543 *,
544 get_tagged: Callable[[], dict[bytes, bytes]] | None = None,
545 depth: int | None = None,
546 ) -> tuple[int, Iterator["UnpackedObject"]]:
547 """Fetch the pack data required for a set of revisions.
549 Args:
550 determine_wants: Function that takes a dictionary with heads
551 and returns the list of heads to fetch.
552 graph_walker: Object that can iterate over the list of revisions
553 to fetch and has an "ack" method that will be called to acknowledge
554 that a revision is present.
555 progress: Simple progress function that will be called with
556 updated progress strings.
557 get_tagged: Function that returns a dict of pointed-to sha ->
558 tag sha for including tags.
559 depth: Shallow fetch depth
560 Returns: count and iterator over pack data
561 """
562 missing_objects = self.find_missing_objects(
563 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth
564 )
565 if missing_objects is None:
566 return 0, iter([])
567 remote_has = missing_objects.get_remote_has()
568 object_ids = list(missing_objects)
569 return len(object_ids), generate_unpacked_objects(
570 self.object_store, object_ids, progress=progress, other_haves=remote_has
571 )
573 def find_missing_objects(
574 self,
575 determine_wants: Callable[[Mapping[bytes, bytes], int | None], list[bytes]],
576 graph_walker: "GraphWalker",
577 progress: Callable[[bytes], None] | None,
578 *,
579 get_tagged: Callable[[], dict[bytes, bytes]] | None = None,
580 depth: int | None = None,
581 ) -> MissingObjectFinder | None:
582 """Fetch the missing objects required for a set of revisions.
584 Args:
585 determine_wants: Function that takes a dictionary with heads
586 and returns the list of heads to fetch.
587 graph_walker: Object that can iterate over the list of revisions
588 to fetch and has an "ack" method that will be called to acknowledge
589 that a revision is present.
590 progress: Simple progress function that will be called with
591 updated progress strings.
592 get_tagged: Function that returns a dict of pointed-to sha ->
593 tag sha for including tags.
594 depth: Shallow fetch depth
595 Returns: iterator over objects, with __len__ implemented
596 """
597 refs = serialize_refs(self.object_store, self.get_refs())
599 wants = determine_wants(refs, depth)
600 if not isinstance(wants, list):
601 raise TypeError("determine_wants() did not return a list")
603 current_shallow = set(getattr(graph_walker, "shallow", set()))
605 if depth not in (None, 0):
606 assert depth is not None
607 shallow, not_shallow = find_shallow(self.object_store, wants, depth)
608 # Only update if graph_walker has shallow attribute
609 if hasattr(graph_walker, "shallow"):
610 graph_walker.shallow.update(shallow - not_shallow)
611 new_shallow = graph_walker.shallow - current_shallow
612 unshallow = not_shallow & current_shallow
613 setattr(graph_walker, "unshallow", unshallow)
614 if hasattr(graph_walker, "update_shallow"):
615 graph_walker.update_shallow(new_shallow, unshallow)
616 else:
617 unshallow = getattr(graph_walker, "unshallow", set())
619 if wants == []:
620 # TODO(dborowitz): find a way to short-circuit that doesn't change
621 # this interface.
623 if getattr(graph_walker, "shallow", set()) or unshallow:
624 # Do not send a pack in shallow short-circuit path
625 return None
627 # Return an actual MissingObjectFinder with empty wants
628 return MissingObjectFinder(
629 self.object_store,
630 haves=[],
631 wants=[],
632 )
634 # If the graph walker is set up with an implementation that can
635 # ACK/NAK to the wire, it will write data to the client through
636 # this call as a side-effect.
637 haves = self.object_store.find_common_revisions(graph_walker)
639 # Deal with shallow requests separately because the haves do
640 # not reflect what objects are missing
641 if getattr(graph_walker, "shallow", set()) or unshallow:
642 # TODO: filter the haves commits from iter_shas. the specific
643 # commits aren't missing.
644 haves = []
646 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow)
648 def get_parents(commit: Commit) -> list[bytes]:
649 """Get parents for a commit using the parents provider.
651 Args:
652 commit: Commit object
654 Returns:
655 List of parent commit SHAs
656 """
657 return parents_provider.get_parents(commit.id, commit)
659 return MissingObjectFinder(
660 self.object_store,
661 haves=haves,
662 wants=wants,
663 shallow=getattr(graph_walker, "shallow", set()),
664 progress=progress,
665 get_tagged=get_tagged,
666 get_parents=get_parents,
667 )
669 def generate_pack_data(
670 self,
671 have: set[ObjectID],
672 want: set[ObjectID],
673 *,
674 shallow: set[ObjectID] | None = None,
675 progress: Callable[[str], None] | None = None,
676 ofs_delta: bool | None = None,
677 ) -> tuple[int, Iterator["UnpackedObject"]]:
678 """Generate pack data objects for a set of wants/haves.
680 Args:
681 have: List of SHA1s of objects that should not be sent
682 want: List of SHA1s of objects that should be sent
683 shallow: Set of shallow commit SHA1s to skip (defaults to repo's shallow commits)
684 ofs_delta: Whether OFS deltas can be included
685 progress: Optional progress reporting method
686 """
687 if shallow is None:
688 shallow = self.get_shallow()
689 return self.object_store.generate_pack_data(
690 have,
691 want,
692 shallow=shallow,
693 progress=progress,
694 ofs_delta=ofs_delta if ofs_delta is not None else DEFAULT_OFS_DELTA,
695 )
697 def get_graph_walker(
698 self, heads: list[ObjectID] | None = None
699 ) -> ObjectStoreGraphWalker:
700 """Retrieve a graph walker.
702 A graph walker is used by a remote repository (or proxy)
703 to find out which objects are present in this repository.
705 Args:
706 heads: Repository heads to use (optional)
707 Returns: A graph walker object
708 """
709 if heads is None:
710 heads = [
711 sha
712 for sha in self.refs.as_dict(b"refs/heads").values()
713 if sha in self.object_store
714 ]
715 parents_provider = ParentsProvider(self.object_store)
716 return ObjectStoreGraphWalker(
717 heads,
718 parents_provider.get_parents,
719 shallow=self.get_shallow(),
720 update_shallow=self.update_shallow,
721 )
723 def get_refs(self) -> dict[bytes, bytes]:
724 """Get dictionary with all refs.
726 Returns: A ``dict`` mapping ref names to SHA1s
727 """
728 return self.refs.as_dict()
730 def head(self) -> bytes:
731 """Return the SHA1 pointed at by HEAD."""
732 # TODO: move this method to WorkTree
733 return self.refs[b"HEAD"]
735 def _get_object(self, sha: bytes, cls: type[T]) -> T:
736 assert len(sha) in (20, 40)
737 ret = self.get_object(sha)
738 if not isinstance(ret, cls):
739 if cls is Commit:
740 raise NotCommitError(ret.id)
741 elif cls is Blob:
742 raise NotBlobError(ret.id)
743 elif cls is Tree:
744 raise NotTreeError(ret.id)
745 elif cls is Tag:
746 raise NotTagError(ret.id)
747 else:
748 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}")
749 return ret
751 def get_object(self, sha: bytes) -> ShaFile:
752 """Retrieve the object with the specified SHA.
754 Args:
755 sha: SHA to retrieve
756 Returns: A ShaFile object
757 Raises:
758 KeyError: when the object can not be found
759 """
760 return self.object_store[sha]
762 def parents_provider(self) -> ParentsProvider:
763 """Get a parents provider for this repository.
765 Returns:
766 ParentsProvider instance configured with grafts and shallows
767 """
768 return ParentsProvider(
769 self.object_store,
770 grafts=self._graftpoints,
771 shallows=self.get_shallow(),
772 )
774 def get_parents(self, sha: bytes, commit: Commit | None = None) -> list[bytes]:
775 """Retrieve the parents of a specific commit.
777 If the specific commit is a graftpoint, the graft parents
778 will be returned instead.
780 Args:
781 sha: SHA of the commit for which to retrieve the parents
782 commit: Optional commit matching the sha
783 Returns: List of parents
784 """
785 return self.parents_provider().get_parents(sha, commit)
787 def get_config(self) -> "ConfigFile":
788 """Retrieve the config object.
790 Returns: `ConfigFile` object for the ``.git/config`` file.
791 """
792 raise NotImplementedError(self.get_config)
794 def get_worktree_config(self) -> "ConfigFile":
795 """Retrieve the worktree config object."""
796 raise NotImplementedError(self.get_worktree_config)
798 def get_description(self) -> bytes | None:
799 """Retrieve the description for this repository.
801 Returns: Bytes with the description of the repository
802 as set by the user.
803 """
804 raise NotImplementedError(self.get_description)
806 def set_description(self, description: bytes) -> None:
807 """Set the description for this repository.
809 Args:
810 description: Text to set as description for this repository.
811 """
812 raise NotImplementedError(self.set_description)
814 def get_rebase_state_manager(self) -> "RebaseStateManager":
815 """Get the appropriate rebase state manager for this repository.
817 Returns: RebaseStateManager instance
818 """
819 raise NotImplementedError(self.get_rebase_state_manager)
821 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
822 """Return a BlobNormalizer object for checkin/checkout operations.
824 Returns: BlobNormalizer instance
825 """
826 raise NotImplementedError(self.get_blob_normalizer)
828 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
829 """Read gitattributes for the repository.
831 Args:
832 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
834 Returns:
835 GitAttributes object that can be used to match paths
836 """
837 raise NotImplementedError(self.get_gitattributes)
839 def get_config_stack(self) -> "StackedConfig":
840 """Return a config stack for this repository.
842 This stack accesses the configuration for both this repository
843 itself (.git/config) and the global configuration, which usually
844 lives in ~/.gitconfig.
846 Returns: `Config` instance for this repository
847 """
848 from .config import ConfigFile, StackedConfig
850 local_config = self.get_config()
851 backends: list[ConfigFile] = [local_config]
852 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
853 backends.append(self.get_worktree_config())
855 backends += StackedConfig.default_backends()
856 return StackedConfig(backends, writable=local_config)
858 def get_shallow(self) -> set[ObjectID]:
859 """Get the set of shallow commits.
861 Returns: Set of shallow commits.
862 """
863 f = self.get_named_file("shallow")
864 if f is None:
865 return set()
866 with f:
867 return {line.strip() for line in f}
869 def update_shallow(
870 self, new_shallow: set[bytes] | None, new_unshallow: set[bytes] | None
871 ) -> None:
872 """Update the list of shallow objects.
874 Args:
875 new_shallow: Newly shallow objects
876 new_unshallow: Newly no longer shallow objects
877 """
878 shallow = self.get_shallow()
879 if new_shallow:
880 shallow.update(new_shallow)
881 if new_unshallow:
882 shallow.difference_update(new_unshallow)
883 if shallow:
884 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow]))
885 else:
886 self._del_named_file("shallow")
888 def get_peeled(self, ref: Ref) -> ObjectID:
889 """Get the peeled value of a ref.
891 Args:
892 ref: The refname to peel.
893 Returns: The fully-peeled SHA1 of a tag object, after peeling all
894 intermediate tags; if the original ref does not point to a tag,
895 this will equal the original SHA1.
896 """
897 cached = self.refs.get_peeled(ref)
898 if cached is not None:
899 return cached
900 return peel_sha(self.object_store, self.refs[ref])[1].id
902 @property
903 def notes(self) -> "Notes":
904 """Access notes functionality for this repository.
906 Returns:
907 Notes object for accessing notes
908 """
909 from .notes import Notes
911 return Notes(self.object_store, self.refs)
913 def get_walker(
914 self,
915 include: Sequence[bytes] | None = None,
916 exclude: Sequence[bytes] | None = None,
917 order: str = "date",
918 reverse: bool = False,
919 max_entries: int | None = None,
920 paths: Sequence[bytes] | None = None,
921 rename_detector: Optional["RenameDetector"] = None,
922 follow: bool = False,
923 since: int | None = None,
924 until: int | None = None,
925 queue_cls: type | None = None,
926 ) -> "Walker":
927 """Obtain a walker for this repository.
929 Args:
930 include: Iterable of SHAs of commits to include along with their
931 ancestors. Defaults to [HEAD]
932 exclude: Iterable of SHAs of commits to exclude along with their
933 ancestors, overriding includes.
934 order: ORDER_* constant specifying the order of results.
935 Anything other than ORDER_DATE may result in O(n) memory usage.
936 reverse: If True, reverse the order of output, requiring O(n)
937 memory.
938 max_entries: The maximum number of entries to yield, or None for
939 no limit.
940 paths: Iterable of file or subtree paths to show entries for.
941 rename_detector: diff.RenameDetector object for detecting
942 renames.
943 follow: If True, follow path across renames/copies. Forces a
944 default rename_detector.
945 since: Timestamp to list commits after.
946 until: Timestamp to list commits before.
947 queue_cls: A class to use for a queue of commits, supporting the
948 iterator protocol. The constructor takes a single argument, the Walker.
950 Returns: A `Walker` object
951 """
952 from .walk import Walker, _CommitTimeQueue
954 if include is None:
955 include = [self.head()]
957 # Pass all arguments to Walker explicitly to avoid type issues with **kwargs
958 return Walker(
959 self.object_store,
960 include,
961 exclude=exclude,
962 order=order,
963 reverse=reverse,
964 max_entries=max_entries,
965 paths=paths,
966 rename_detector=rename_detector,
967 follow=follow,
968 since=since,
969 until=until,
970 get_parents=lambda commit: self.get_parents(commit.id, commit),
971 queue_cls=queue_cls if queue_cls is not None else _CommitTimeQueue,
972 )
974 def __getitem__(self, name: ObjectID | Ref) -> "ShaFile":
975 """Retrieve a Git object by SHA1 or ref.
977 Args:
978 name: A Git object SHA1 or a ref name
979 Returns: A `ShaFile` object, such as a Commit or Blob
980 Raises:
981 KeyError: when the specified ref or object does not exist
982 """
983 if not isinstance(name, bytes):
984 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}")
985 if len(name) in (20, 40):
986 try:
987 return self.object_store[name]
988 except (KeyError, ValueError):
989 pass
990 try:
991 return self.object_store[self.refs[name]]
992 except RefFormatError as exc:
993 raise KeyError(name) from exc
995 def __contains__(self, name: bytes) -> bool:
996 """Check if a specific Git object or ref is present.
998 Args:
999 name: Git object SHA1 or ref name
1000 """
1001 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)):
1002 return name in self.object_store or name in self.refs
1003 else:
1004 return name in self.refs
1006 def __setitem__(self, name: bytes, value: ShaFile | bytes) -> None:
1007 """Set a ref.
1009 Args:
1010 name: ref name
1011 value: Ref value - either a ShaFile object, or a hex sha
1012 """
1013 if name.startswith(b"refs/") or name == b"HEAD":
1014 if isinstance(value, ShaFile):
1015 self.refs[name] = value.id
1016 elif isinstance(value, bytes):
1017 self.refs[name] = value
1018 else:
1019 raise TypeError(value)
1020 else:
1021 raise ValueError(name)
1023 def __delitem__(self, name: bytes) -> None:
1024 """Remove a ref.
1026 Args:
1027 name: Name of the ref to remove
1028 """
1029 if name.startswith(b"refs/") or name == b"HEAD":
1030 del self.refs[name]
1031 else:
1032 raise ValueError(name)
1034 def _get_user_identity(
1035 self, config: "StackedConfig", kind: str | None = None
1036 ) -> bytes:
1037 """Determine the identity to use for new commits."""
1038 warnings.warn(
1039 "use get_user_identity() rather than Repo._get_user_identity",
1040 DeprecationWarning,
1041 )
1042 return get_user_identity(config)
1044 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None:
1045 """Add or modify graftpoints.
1047 Args:
1048 updated_graftpoints: Dict of commit shas to list of parent shas
1049 """
1050 # Simple validation
1051 for commit, parents in updated_graftpoints.items():
1052 for sha in [commit, *parents]:
1053 check_hexsha(sha, "Invalid graftpoint")
1055 self._graftpoints.update(updated_graftpoints)
1057 def _remove_graftpoints(self, to_remove: Sequence[bytes] = ()) -> None:
1058 """Remove graftpoints.
1060 Args:
1061 to_remove: List of commit shas
1062 """
1063 for sha in to_remove:
1064 del self._graftpoints[sha]
1066 def _read_heads(self, name: str) -> list[bytes]:
1067 f = self.get_named_file(name)
1068 if f is None:
1069 return []
1070 with f:
1071 return [line.strip() for line in f.readlines() if line.strip()]
1073 def get_worktree(self) -> "WorkTree":
1074 """Get the working tree for this repository.
1076 Returns:
1077 WorkTree instance for performing working tree operations
1079 Raises:
1080 NotImplementedError: If the repository doesn't support working trees
1081 """
1082 raise NotImplementedError(
1083 "Working tree operations not supported by this repository type"
1084 )
1086 @replace_me(remove_in="0.26.0")
1087 def do_commit(
1088 self,
1089 message: bytes | None = None,
1090 committer: bytes | None = None,
1091 author: bytes | None = None,
1092 commit_timestamp: float | None = None,
1093 commit_timezone: int | None = None,
1094 author_timestamp: float | None = None,
1095 author_timezone: int | None = None,
1096 tree: ObjectID | None = None,
1097 encoding: bytes | None = None,
1098 ref: Ref | None = b"HEAD",
1099 merge_heads: list[ObjectID] | None = None,
1100 no_verify: bool = False,
1101 sign: bool = False,
1102 ) -> bytes:
1103 """Create a new commit.
1105 If not specified, committer and author default to
1106 get_user_identity(..., 'COMMITTER')
1107 and get_user_identity(..., 'AUTHOR') respectively.
1109 Args:
1110 message: Commit message (bytes or callable that takes (repo, commit)
1111 and returns bytes)
1112 committer: Committer fullname
1113 author: Author fullname
1114 commit_timestamp: Commit timestamp (defaults to now)
1115 commit_timezone: Commit timestamp timezone (defaults to GMT)
1116 author_timestamp: Author timestamp (defaults to commit
1117 timestamp)
1118 author_timezone: Author timestamp timezone
1119 (defaults to commit timestamp timezone)
1120 tree: SHA1 of the tree root to use (if not specified the
1121 current index will be committed).
1122 encoding: Encoding
1123 ref: Optional ref to commit to (defaults to current branch).
1124 If None, creates a dangling commit without updating any ref.
1125 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
1126 no_verify: Skip pre-commit and commit-msg hooks
1127 sign: GPG Sign the commit (bool, defaults to False,
1128 pass True to use default GPG key,
1129 pass a str containing Key ID to use a specific GPG key)
1131 Returns:
1132 New commit SHA1
1133 """
1134 return self.get_worktree().commit(
1135 message=message,
1136 committer=committer,
1137 author=author,
1138 commit_timestamp=commit_timestamp,
1139 commit_timezone=commit_timezone,
1140 author_timestamp=author_timestamp,
1141 author_timezone=author_timezone,
1142 tree=tree,
1143 encoding=encoding,
1144 ref=ref,
1145 merge_heads=merge_heads,
1146 no_verify=no_verify,
1147 sign=sign,
1148 )
1151def read_gitfile(f: BinaryIO) -> str:
1152 """Read a ``.git`` file.
1154 The first line of the file should start with "gitdir: "
1156 Args:
1157 f: File-like object to read from
1158 Returns: A path
1159 """
1160 cs = f.read()
1161 if not cs.startswith(b"gitdir: "):
1162 raise ValueError("Expected file to start with 'gitdir: '")
1163 return cs[len(b"gitdir: ") :].rstrip(b"\r\n").decode("utf-8")
1166class UnsupportedVersion(Exception):
1167 """Unsupported repository version."""
1169 def __init__(self, version: int) -> None:
1170 """Initialize UnsupportedVersion exception.
1172 Args:
1173 version: The unsupported repository version
1174 """
1175 self.version = version
1178class UnsupportedExtension(Exception):
1179 """Unsupported repository extension."""
1181 def __init__(self, extension: str) -> None:
1182 """Initialize UnsupportedExtension exception.
1184 Args:
1185 extension: The unsupported repository extension
1186 """
1187 self.extension = extension
1190class Repo(BaseRepo):
1191 """A git repository backed by local disk.
1193 To open an existing repository, call the constructor with
1194 the path of the repository.
1196 To create a new repository, use the Repo.init class method.
1198 Note that a repository object may hold on to resources such
1199 as file handles for performance reasons; call .close() to free
1200 up those resources.
1202 Attributes:
1203 path: Path to the working copy (if it exists) or repository control
1204 directory (if the repository is bare)
1205 bare: Whether this is a bare repository
1206 """
1208 path: str
1209 bare: bool
1210 object_store: DiskObjectStore
1211 filter_context: Optional["FilterContext"]
1213 def __init__(
1214 self,
1215 root: str | bytes | os.PathLike[str],
1216 object_store: PackBasedObjectStore | None = None,
1217 bare: bool | None = None,
1218 ) -> None:
1219 """Open a repository on disk.
1221 Args:
1222 root: Path to the repository's root.
1223 object_store: ObjectStore to use; if omitted, we use the
1224 repository's default object store
1225 bare: True if this is a bare repository.
1226 """
1227 root = os.fspath(root)
1228 if isinstance(root, bytes):
1229 root = os.fsdecode(root)
1230 hidden_path = os.path.join(root, CONTROLDIR)
1231 if bare is None:
1232 if os.path.isfile(hidden_path) or os.path.isdir(
1233 os.path.join(hidden_path, OBJECTDIR)
1234 ):
1235 bare = False
1236 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(
1237 os.path.join(root, REFSDIR)
1238 ):
1239 bare = True
1240 else:
1241 raise NotGitRepository(
1242 "No git repository was found at {path}".format(**dict(path=root))
1243 )
1245 self.bare = bare
1246 if bare is False:
1247 if os.path.isfile(hidden_path):
1248 with open(hidden_path, "rb") as f:
1249 path = read_gitfile(f)
1250 self._controldir = os.path.join(root, path)
1251 else:
1252 self._controldir = hidden_path
1253 else:
1254 self._controldir = root
1255 commondir = self.get_named_file(COMMONDIR)
1256 if commondir is not None:
1257 with commondir:
1258 self._commondir = os.path.join(
1259 self.controldir(),
1260 os.fsdecode(commondir.read().rstrip(b"\r\n")),
1261 )
1262 else:
1263 self._commondir = self._controldir
1264 self.path = root
1266 # Initialize refs early so they're available for config condition matchers
1267 self.refs = DiskRefsContainer(
1268 self.commondir(), self._controldir, logger=self._write_reflog
1269 )
1271 # Initialize worktrees container
1272 from .worktree import WorkTreeContainer
1274 self.worktrees = WorkTreeContainer(self)
1276 config = self.get_config()
1277 try:
1278 repository_format_version = config.get("core", "repositoryformatversion")
1279 format_version = (
1280 0
1281 if repository_format_version is None
1282 else int(repository_format_version)
1283 )
1284 except KeyError:
1285 format_version = 0
1287 if format_version not in (0, 1):
1288 raise UnsupportedVersion(format_version)
1290 # Track extensions we encounter
1291 has_reftable_extension = False
1292 for extension, value in config.items((b"extensions",)):
1293 if extension.lower() == b"refstorage":
1294 if value == b"reftable":
1295 has_reftable_extension = True
1296 else:
1297 raise UnsupportedExtension(f"refStorage = {value.decode()}")
1298 elif extension.lower() not in (b"worktreeconfig",):
1299 raise UnsupportedExtension(extension.decode("utf-8"))
1301 if object_store is None:
1302 object_store = DiskObjectStore.from_config(
1303 os.path.join(self.commondir(), OBJECTDIR), config
1304 )
1306 # Use reftable if extension is configured
1307 if has_reftable_extension:
1308 from .reftable import ReftableRefsContainer
1310 self.refs = ReftableRefsContainer(self.commondir())
1311 # Update worktrees container after refs change
1312 self.worktrees = WorkTreeContainer(self)
1313 BaseRepo.__init__(self, object_store, self.refs)
1315 self._graftpoints = {}
1316 graft_file = self.get_named_file(
1317 os.path.join("info", "grafts"), basedir=self.commondir()
1318 )
1319 if graft_file:
1320 with graft_file:
1321 self._graftpoints.update(parse_graftpoints(graft_file))
1322 graft_file = self.get_named_file("shallow", basedir=self.commondir())
1323 if graft_file:
1324 with graft_file:
1325 self._graftpoints.update(parse_graftpoints(graft_file))
1327 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir())
1328 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir())
1329 self.hooks["post-commit"] = PostCommitShellHook(self.controldir())
1330 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir())
1332 # Initialize filter context as None, will be created lazily
1333 self.filter_context = None
1335 def get_worktree(self) -> "WorkTree":
1336 """Get the working tree for this repository.
1338 Returns:
1339 WorkTree instance for performing working tree operations
1340 """
1341 from .worktree import WorkTree
1343 return WorkTree(self, self.path)
1345 def _write_reflog(
1346 self,
1347 ref: bytes,
1348 old_sha: bytes,
1349 new_sha: bytes,
1350 committer: bytes | None,
1351 timestamp: int | None,
1352 timezone: int | None,
1353 message: bytes,
1354 ) -> None:
1355 from .reflog import format_reflog_line
1357 path = self._reflog_path(ref)
1358 try:
1359 os.makedirs(os.path.dirname(path))
1360 except FileExistsError:
1361 pass
1362 if committer is None:
1363 config = self.get_config_stack()
1364 committer = get_user_identity(config)
1365 check_user_identity(committer)
1366 if timestamp is None:
1367 timestamp = int(time.time())
1368 if timezone is None:
1369 timezone = 0 # FIXME
1370 with open(path, "ab") as f:
1371 f.write(
1372 format_reflog_line(
1373 old_sha, new_sha, committer, timestamp, timezone, message
1374 )
1375 + b"\n"
1376 )
1378 def _reflog_path(self, ref: bytes) -> str:
1379 if ref.startswith((b"main-worktree/", b"worktrees/")):
1380 raise NotImplementedError(f"refs {ref.decode()} are not supported")
1382 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir()
1383 return os.path.join(base, "logs", os.fsdecode(ref))
1385 def read_reflog(self, ref: bytes) -> Generator[reflog.Entry, None, None]:
1386 """Read reflog entries for a reference.
1388 Args:
1389 ref: Reference name (e.g. b'HEAD', b'refs/heads/master')
1391 Yields:
1392 reflog.Entry objects in chronological order (oldest first)
1393 """
1394 from .reflog import read_reflog
1396 path = self._reflog_path(ref)
1397 try:
1398 with open(path, "rb") as f:
1399 yield from read_reflog(f)
1400 except FileNotFoundError:
1401 return
1403 @classmethod
1404 def discover(cls, start: str | bytes | os.PathLike[str] = ".") -> "Repo":
1405 """Iterate parent directories to discover a repository.
1407 Return a Repo object for the first parent directory that looks like a
1408 Git repository.
1410 Args:
1411 start: The directory to start discovery from (defaults to '.')
1412 """
1413 path = os.path.abspath(start)
1414 while True:
1415 try:
1416 return cls(path)
1417 except NotGitRepository:
1418 new_path, _tail = os.path.split(path)
1419 if new_path == path: # Root reached
1420 break
1421 path = new_path
1422 start_str = os.fspath(start)
1423 if isinstance(start_str, bytes):
1424 start_str = start_str.decode("utf-8")
1425 raise NotGitRepository(f"No git repository was found at {start_str}")
1427 def controldir(self) -> str:
1428 """Return the path of the control directory."""
1429 return self._controldir
1431 def commondir(self) -> str:
1432 """Return the path of the common directory.
1434 For a main working tree, it is identical to controldir().
1436 For a linked working tree, it is the control directory of the
1437 main working tree.
1438 """
1439 return self._commondir
1441 def _determine_file_mode(self) -> bool:
1442 """Probe the file-system to determine whether permissions can be trusted.
1444 Returns: True if permissions can be trusted, False otherwise.
1445 """
1446 fname = os.path.join(self.path, ".probe-permissions")
1447 with open(fname, "w") as f:
1448 f.write("")
1450 st1 = os.lstat(fname)
1451 try:
1452 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR)
1453 except PermissionError:
1454 return False
1455 st2 = os.lstat(fname)
1457 os.unlink(fname)
1459 mode_differs = st1.st_mode != st2.st_mode
1460 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0
1462 return mode_differs and st2_has_exec
1464 def _determine_symlinks(self) -> bool:
1465 """Probe the filesystem to determine whether symlinks can be created.
1467 Returns: True if symlinks can be created, False otherwise.
1468 """
1469 # TODO(jelmer): Actually probe disk / look at filesystem
1470 return sys.platform != "win32"
1472 def _put_named_file(self, path: str, contents: bytes) -> None:
1473 """Write a file to the control dir with the given name and contents.
1475 Args:
1476 path: The path to the file, relative to the control dir.
1477 contents: A string to write to the file.
1478 """
1479 path = path.lstrip(os.path.sep)
1480 with GitFile(os.path.join(self.controldir(), path), "wb") as f:
1481 f.write(contents)
1483 def _del_named_file(self, path: str) -> None:
1484 try:
1485 os.unlink(os.path.join(self.controldir(), path))
1486 except FileNotFoundError:
1487 return
1489 def get_named_file(
1490 self,
1491 path: str | bytes,
1492 basedir: str | None = None,
1493 ) -> BinaryIO | None:
1494 """Get a file from the control dir with a specific name.
1496 Although the filename should be interpreted as a filename relative to
1497 the control dir in a disk-based Repo, the object returned need not be
1498 pointing to a file in that location.
1500 Args:
1501 path: The path to the file, relative to the control dir.
1502 basedir: Optional argument that specifies an alternative to the
1503 control dir.
1504 Returns: An open file object, or None if the file does not exist.
1505 """
1506 # TODO(dborowitz): sanitize filenames, since this is used directly by
1507 # the dumb web serving code.
1508 if basedir is None:
1509 basedir = self.controldir()
1510 if isinstance(path, bytes):
1511 path = path.decode("utf-8")
1512 path = path.lstrip(os.path.sep)
1513 try:
1514 return open(os.path.join(basedir, path), "rb")
1515 except FileNotFoundError:
1516 return None
1518 def index_path(self) -> str:
1519 """Return path to the index file."""
1520 return os.path.join(self.controldir(), INDEX_FILENAME)
1522 def open_index(self) -> "Index":
1523 """Open the index for this repository.
1525 Raises:
1526 NoIndexPresent: If no index is present
1527 Returns: The matching `Index`
1528 """
1529 from .index import Index
1531 if not self.has_index():
1532 raise NoIndexPresent
1534 # Check for manyFiles feature configuration
1535 config = self.get_config_stack()
1536 many_files = config.get_boolean(b"feature", b"manyFiles", False)
1537 skip_hash = False
1538 index_version = None
1540 if many_files:
1541 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true
1542 try:
1543 index_version_str = config.get(b"index", b"version")
1544 index_version = int(index_version_str)
1545 except KeyError:
1546 index_version = 4 # Default to version 4 for manyFiles
1547 skip_hash = config.get_boolean(b"index", b"skipHash", True)
1548 else:
1549 # Check for explicit index settings
1550 try:
1551 index_version_str = config.get(b"index", b"version")
1552 index_version = int(index_version_str)
1553 except KeyError:
1554 index_version = None
1555 skip_hash = config.get_boolean(b"index", b"skipHash", False)
1557 return Index(self.index_path(), skip_hash=skip_hash, version=index_version)
1559 def has_index(self) -> bool:
1560 """Check if an index is present."""
1561 # Bare repos must never have index files; non-bare repos may have a
1562 # missing index file, which is treated as empty.
1563 return not self.bare
1565 @replace_me(remove_in="0.26.0")
1566 def stage(
1567 self,
1568 fs_paths: str
1569 | bytes
1570 | os.PathLike[str]
1571 | Iterable[str | bytes | os.PathLike[str]],
1572 ) -> None:
1573 """Stage a set of paths.
1575 Args:
1576 fs_paths: List of paths, relative to the repository path
1577 """
1578 return self.get_worktree().stage(fs_paths)
1580 @replace_me(remove_in="0.26.0")
1581 def unstage(self, fs_paths: Sequence[str]) -> None:
1582 """Unstage specific file in the index.
1584 Args:
1585 fs_paths: a list of files to unstage,
1586 relative to the repository path.
1587 """
1588 return self.get_worktree().unstage(fs_paths)
1590 def clone(
1591 self,
1592 target_path: str | bytes | os.PathLike[str],
1593 *,
1594 mkdir: bool = True,
1595 bare: bool = False,
1596 origin: bytes = b"origin",
1597 checkout: bool | None = None,
1598 branch: bytes | None = None,
1599 progress: Callable[[str], None] | None = None,
1600 depth: int | None = None,
1601 symlinks: bool | None = None,
1602 ) -> "Repo":
1603 """Clone this repository.
1605 Args:
1606 target_path: Target path
1607 mkdir: Create the target directory
1608 bare: Whether to create a bare repository
1609 checkout: Whether or not to check-out HEAD after cloning
1610 origin: Base name for refs in target repository
1611 cloned from this repository
1612 branch: Optional branch or tag to be used as HEAD in the new repository
1613 instead of this repository's HEAD.
1614 progress: Optional progress function
1615 depth: Depth at which to fetch
1616 symlinks: Symlinks setting (default to autodetect)
1617 Returns: Created repository as `Repo`
1618 """
1619 encoded_path = os.fsencode(self.path)
1621 if mkdir:
1622 os.mkdir(target_path)
1624 try:
1625 if not bare:
1626 target = Repo.init(target_path, symlinks=symlinks)
1627 if checkout is None:
1628 checkout = True
1629 else:
1630 if checkout:
1631 raise ValueError("checkout and bare are incompatible")
1632 target = Repo.init_bare(target_path)
1634 try:
1635 target_config = target.get_config()
1636 target_config.set((b"remote", origin), b"url", encoded_path)
1637 target_config.set(
1638 (b"remote", origin),
1639 b"fetch",
1640 b"+refs/heads/*:refs/remotes/" + origin + b"/*",
1641 )
1642 target_config.write_to_path()
1644 ref_message = b"clone: from " + encoded_path
1645 self.fetch(target, depth=depth)
1646 target.refs.import_refs(
1647 b"refs/remotes/" + origin,
1648 self.refs.as_dict(b"refs/heads"),
1649 message=ref_message,
1650 )
1651 target.refs.import_refs(
1652 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message
1653 )
1655 head_chain, origin_sha = self.refs.follow(b"HEAD")
1656 origin_head = head_chain[-1] if head_chain else None
1657 if origin_sha and not origin_head:
1658 # set detached HEAD
1659 target.refs[b"HEAD"] = origin_sha
1660 else:
1661 _set_origin_head(target.refs, origin, origin_head)
1662 head_ref = _set_default_branch(
1663 target.refs, origin, origin_head, branch, ref_message
1664 )
1666 # Update target head
1667 if head_ref:
1668 head = _set_head(target.refs, head_ref, ref_message)
1669 else:
1670 head = None
1672 if checkout and head is not None:
1673 target.get_worktree().reset_index()
1674 except BaseException:
1675 target.close()
1676 raise
1677 except BaseException:
1678 if mkdir:
1679 import shutil
1681 shutil.rmtree(target_path)
1682 raise
1683 return target
1685 @replace_me(remove_in="0.26.0")
1686 def reset_index(self, tree: bytes | None = None) -> None:
1687 """Reset the index back to a specific tree.
1689 Args:
1690 tree: Tree SHA to reset to, None for current HEAD tree.
1691 """
1692 return self.get_worktree().reset_index(tree)
1694 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]:
1695 """Get condition matchers for includeIf conditions.
1697 Returns a dict of condition prefix to matcher function.
1698 """
1699 from pathlib import Path
1701 from .config import ConditionMatcher, match_glob_pattern
1703 # Add gitdir matchers
1704 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool:
1705 """Match gitdir against a pattern.
1707 Args:
1708 pattern: Pattern to match against
1709 case_sensitive: Whether to match case-sensitively
1711 Returns:
1712 True if gitdir matches pattern
1713 """
1714 # Handle relative patterns (starting with ./)
1715 if pattern.startswith("./"):
1716 # Can't handle relative patterns without config directory context
1717 return False
1719 # Normalize repository path
1720 try:
1721 repo_path = str(Path(self._controldir).resolve())
1722 except (OSError, ValueError):
1723 return False
1725 # Expand ~ in pattern and normalize
1726 pattern = os.path.expanduser(pattern)
1728 # Normalize pattern following Git's rules
1729 pattern = pattern.replace("\\", "/")
1730 if not pattern.startswith(("~/", "./", "/", "**")):
1731 # Check for Windows absolute path
1732 if len(pattern) >= 2 and pattern[1] == ":":
1733 pass
1734 else:
1735 pattern = "**/" + pattern
1736 if pattern.endswith("/"):
1737 pattern = pattern + "**"
1739 # Use the existing _match_gitdir_pattern function
1740 from .config import _match_gitdir_pattern
1742 pattern_bytes = pattern.encode("utf-8", errors="replace")
1743 repo_path_bytes = repo_path.encode("utf-8", errors="replace")
1745 return _match_gitdir_pattern(
1746 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive
1747 )
1749 # Add onbranch matcher
1750 def match_onbranch(pattern: str) -> bool:
1751 """Match current branch against a pattern.
1753 Args:
1754 pattern: Pattern to match against
1756 Returns:
1757 True if current branch matches pattern
1758 """
1759 try:
1760 # Get the current branch using refs
1761 ref_chain, _ = self.refs.follow(b"HEAD")
1762 head_ref = ref_chain[-1] # Get the final resolved ref
1763 except KeyError:
1764 pass
1765 else:
1766 if head_ref and head_ref.startswith(b"refs/heads/"):
1767 # Extract branch name from ref
1768 branch = extract_branch_name(head_ref).decode(
1769 "utf-8", errors="replace"
1770 )
1771 return match_glob_pattern(branch, pattern)
1772 return False
1774 matchers: dict[str, ConditionMatcher] = {
1775 "onbranch:": match_onbranch,
1776 "gitdir:": lambda pattern: match_gitdir(pattern, True),
1777 "gitdir/i:": lambda pattern: match_gitdir(pattern, False),
1778 }
1780 return matchers
1782 def get_worktree_config(self) -> "ConfigFile":
1783 """Get the worktree-specific config.
1785 Returns:
1786 ConfigFile object for the worktree config
1787 """
1788 from .config import ConfigFile
1790 path = os.path.join(self.commondir(), "config.worktree")
1791 try:
1792 # Pass condition matchers for includeIf evaluation
1793 condition_matchers = self._get_config_condition_matchers()
1794 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1795 except FileNotFoundError:
1796 cf = ConfigFile()
1797 cf.path = path
1798 return cf
1800 def get_config(self) -> "ConfigFile":
1801 """Retrieve the config object.
1803 Returns: `ConfigFile` object for the ``.git/config`` file.
1804 """
1805 from .config import ConfigFile
1807 path = os.path.join(self._commondir, "config")
1808 try:
1809 # Pass condition matchers for includeIf evaluation
1810 condition_matchers = self._get_config_condition_matchers()
1811 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1812 except FileNotFoundError:
1813 ret = ConfigFile()
1814 ret.path = path
1815 return ret
1817 def get_rebase_state_manager(self) -> "RebaseStateManager":
1818 """Get the appropriate rebase state manager for this repository.
1820 Returns: DiskRebaseStateManager instance
1821 """
1822 import os
1824 from .rebase import DiskRebaseStateManager
1826 path = os.path.join(self.controldir(), "rebase-merge")
1827 return DiskRebaseStateManager(path)
1829 def get_description(self) -> bytes | None:
1830 """Retrieve the description of this repository.
1832 Returns: Description as bytes or None.
1833 """
1834 path = os.path.join(self._controldir, "description")
1835 try:
1836 with GitFile(path, "rb") as f:
1837 return f.read()
1838 except FileNotFoundError:
1839 return None
1841 def __repr__(self) -> str:
1842 """Return string representation of this repository."""
1843 return f"<Repo at {self.path!r}>"
1845 def set_description(self, description: bytes) -> None:
1846 """Set the description for this repository.
1848 Args:
1849 description: Text to set as description for this repository.
1850 """
1851 self._put_named_file("description", description)
1853 @classmethod
1854 def _init_maybe_bare(
1855 cls,
1856 path: str | bytes | os.PathLike[str],
1857 controldir: str | bytes | os.PathLike[str],
1858 bare: bool,
1859 object_store: PackBasedObjectStore | None = None,
1860 config: Optional["StackedConfig"] = None,
1861 default_branch: bytes | None = None,
1862 symlinks: bool | None = None,
1863 format: int | None = None,
1864 ) -> "Repo":
1865 path = os.fspath(path)
1866 if isinstance(path, bytes):
1867 path = os.fsdecode(path)
1868 controldir = os.fspath(controldir)
1869 if isinstance(controldir, bytes):
1870 controldir = os.fsdecode(controldir)
1871 for d in BASE_DIRECTORIES:
1872 os.mkdir(os.path.join(controldir, *d))
1873 if object_store is None:
1874 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR))
1875 ret = cls(path, bare=bare, object_store=object_store)
1876 if default_branch is None:
1877 if config is None:
1878 from .config import StackedConfig
1880 config = StackedConfig.default()
1881 try:
1882 default_branch = config.get("init", "defaultBranch")
1883 except KeyError:
1884 default_branch = DEFAULT_BRANCH
1885 ret.refs.set_symbolic_ref(b"HEAD", local_branch_name(default_branch))
1886 ret._init_files(bare=bare, symlinks=symlinks, format=format)
1887 return ret
1889 @classmethod
1890 def init(
1891 cls,
1892 path: str | bytes | os.PathLike[str],
1893 *,
1894 mkdir: bool = False,
1895 config: Optional["StackedConfig"] = None,
1896 default_branch: bytes | None = None,
1897 symlinks: bool | None = None,
1898 format: int | None = None,
1899 ) -> "Repo":
1900 """Create a new repository.
1902 Args:
1903 path: Path in which to create the repository
1904 mkdir: Whether to create the directory
1905 config: Configuration object
1906 default_branch: Default branch name
1907 symlinks: Whether to support symlinks
1908 format: Repository format version (defaults to 0)
1909 Returns: `Repo` instance
1910 """
1911 path = os.fspath(path)
1912 if isinstance(path, bytes):
1913 path = os.fsdecode(path)
1914 if mkdir:
1915 os.mkdir(path)
1916 controldir = os.path.join(path, CONTROLDIR)
1917 os.mkdir(controldir)
1918 _set_filesystem_hidden(controldir)
1919 return cls._init_maybe_bare(
1920 path,
1921 controldir,
1922 False,
1923 config=config,
1924 default_branch=default_branch,
1925 symlinks=symlinks,
1926 format=format,
1927 )
1929 @classmethod
1930 def _init_new_working_directory(
1931 cls,
1932 path: str | bytes | os.PathLike[str],
1933 main_repo: "Repo",
1934 identifier: str | None = None,
1935 mkdir: bool = False,
1936 ) -> "Repo":
1937 """Create a new working directory linked to a repository.
1939 Args:
1940 path: Path in which to create the working tree.
1941 main_repo: Main repository to reference
1942 identifier: Worktree identifier
1943 mkdir: Whether to create the directory
1944 Returns: `Repo` instance
1945 """
1946 path = os.fspath(path)
1947 if isinstance(path, bytes):
1948 path = os.fsdecode(path)
1949 if mkdir:
1950 os.mkdir(path)
1951 if identifier is None:
1952 identifier = os.path.basename(path)
1953 # Ensure we use absolute path for the worktree control directory
1954 main_controldir = os.path.abspath(main_repo.controldir())
1955 main_worktreesdir = os.path.join(main_controldir, WORKTREES)
1956 worktree_controldir = os.path.join(main_worktreesdir, identifier)
1957 gitdirfile = os.path.join(path, CONTROLDIR)
1958 with open(gitdirfile, "wb") as f:
1959 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n")
1960 try:
1961 os.mkdir(main_worktreesdir)
1962 except FileExistsError:
1963 pass
1964 try:
1965 os.mkdir(worktree_controldir)
1966 except FileExistsError:
1967 pass
1968 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f:
1969 f.write(os.fsencode(gitdirfile) + b"\n")
1970 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f:
1971 f.write(b"../..\n")
1972 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f:
1973 f.write(main_repo.head() + b"\n")
1974 r = cls(os.path.normpath(path))
1975 r.get_worktree().reset_index()
1976 return r
1978 @classmethod
1979 def init_bare(
1980 cls,
1981 path: str | bytes | os.PathLike[str],
1982 *,
1983 mkdir: bool = False,
1984 object_store: PackBasedObjectStore | None = None,
1985 config: Optional["StackedConfig"] = None,
1986 default_branch: bytes | None = None,
1987 format: int | None = None,
1988 ) -> "Repo":
1989 """Create a new bare repository.
1991 ``path`` should already exist and be an empty directory.
1993 Args:
1994 path: Path to create bare repository in
1995 mkdir: Whether to create the directory
1996 object_store: Object store to use
1997 config: Configuration object
1998 default_branch: Default branch name
1999 format: Repository format version (defaults to 0)
2000 Returns: a `Repo` instance
2001 """
2002 path = os.fspath(path)
2003 if isinstance(path, bytes):
2004 path = os.fsdecode(path)
2005 if mkdir:
2006 os.mkdir(path)
2007 return cls._init_maybe_bare(
2008 path,
2009 path,
2010 True,
2011 object_store=object_store,
2012 config=config,
2013 default_branch=default_branch,
2014 format=format,
2015 )
2017 create = init_bare
2019 def close(self) -> None:
2020 """Close any files opened by this repository."""
2021 self.object_store.close()
2022 # Clean up filter context if it was created
2023 if self.filter_context is not None:
2024 self.filter_context.close()
2025 self.filter_context = None
2027 def __enter__(self) -> "Repo":
2028 """Enter context manager."""
2029 return self
2031 def __exit__(
2032 self,
2033 exc_type: type[BaseException] | None,
2034 exc_val: BaseException | None,
2035 exc_tb: TracebackType | None,
2036 ) -> None:
2037 """Exit context manager and close repository."""
2038 self.close()
2040 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]:
2041 """Read .gitattributes file from working tree.
2043 Returns:
2044 Dictionary mapping file patterns to attributes
2045 """
2046 gitattributes = {}
2047 gitattributes_path = os.path.join(self.path, ".gitattributes")
2049 if os.path.exists(gitattributes_path):
2050 with open(gitattributes_path, "rb") as f:
2051 for line in f:
2052 line = line.strip()
2053 if not line or line.startswith(b"#"):
2054 continue
2056 parts = line.split()
2057 if len(parts) < 2:
2058 continue
2060 pattern = parts[0]
2061 attrs = {}
2063 for attr in parts[1:]:
2064 if attr.startswith(b"-"):
2065 # Unset attribute
2066 attrs[attr[1:]] = b"false"
2067 elif b"=" in attr:
2068 # Set to value
2069 key, value = attr.split(b"=", 1)
2070 attrs[key] = value
2071 else:
2072 # Set attribute
2073 attrs[attr] = b"true"
2075 gitattributes[pattern] = attrs
2077 return gitattributes
2079 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
2080 """Return a BlobNormalizer object."""
2081 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2083 # Get fresh configuration and GitAttributes
2084 config_stack = self.get_config_stack()
2085 git_attributes = self.get_gitattributes()
2087 # Lazily create FilterContext if needed
2088 if self.filter_context is None:
2089 filter_registry = FilterRegistry(config_stack, self)
2090 self.filter_context = FilterContext(filter_registry)
2091 else:
2092 # Refresh the context with current config to handle config changes
2093 self.filter_context.refresh_config(config_stack)
2095 # Return a new FilterBlobNormalizer with the context
2096 return FilterBlobNormalizer(
2097 config_stack, git_attributes, filter_context=self.filter_context
2098 )
2100 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
2101 """Read gitattributes for the repository.
2103 Args:
2104 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
2106 Returns:
2107 GitAttributes object that can be used to match paths
2108 """
2109 from .attrs import (
2110 GitAttributes,
2111 Pattern,
2112 parse_git_attributes,
2113 )
2115 patterns = []
2117 # Read system gitattributes (TODO: implement this)
2118 # Read global gitattributes (TODO: implement this)
2120 # Read repository .gitattributes from index/tree
2121 if tree is None:
2122 try:
2123 # Try to get from HEAD
2124 head = self[b"HEAD"]
2125 if isinstance(head, Tag):
2126 _cls, obj = head.object
2127 head = self.get_object(obj)
2128 assert isinstance(head, Commit)
2129 tree = head.tree
2130 except KeyError:
2131 # No HEAD, no attributes from tree
2132 pass
2134 if tree is not None:
2135 try:
2136 tree_obj = self[tree]
2137 assert isinstance(tree_obj, Tree)
2138 if b".gitattributes" in tree_obj:
2139 _, attrs_sha = tree_obj[b".gitattributes"]
2140 attrs_blob = self[attrs_sha]
2141 if isinstance(attrs_blob, Blob):
2142 attrs_data = BytesIO(attrs_blob.data)
2143 for pattern_bytes, attrs in parse_git_attributes(attrs_data):
2144 pattern = Pattern(pattern_bytes)
2145 patterns.append((pattern, attrs))
2146 except (KeyError, NotTreeError):
2147 pass
2149 # Read .git/info/attributes
2150 info_attrs_path = os.path.join(self.controldir(), "info", "attributes")
2151 if os.path.exists(info_attrs_path):
2152 with open(info_attrs_path, "rb") as f:
2153 for pattern_bytes, attrs in parse_git_attributes(f):
2154 pattern = Pattern(pattern_bytes)
2155 patterns.append((pattern, attrs))
2157 # Read .gitattributes from working directory (if it exists)
2158 working_attrs_path = os.path.join(self.path, ".gitattributes")
2159 if os.path.exists(working_attrs_path):
2160 with open(working_attrs_path, "rb") as f:
2161 for pattern_bytes, attrs in parse_git_attributes(f):
2162 pattern = Pattern(pattern_bytes)
2163 patterns.append((pattern, attrs))
2165 return GitAttributes(patterns)
2167 @replace_me(remove_in="0.26.0")
2168 def _sparse_checkout_file_path(self) -> str:
2169 """Return the path of the sparse-checkout file in this repo's control dir."""
2170 return self.get_worktree()._sparse_checkout_file_path()
2172 @replace_me(remove_in="0.26.0")
2173 def configure_for_cone_mode(self) -> None:
2174 """Ensure the repository is configured for cone-mode sparse-checkout."""
2175 return self.get_worktree().configure_for_cone_mode()
2177 @replace_me(remove_in="0.26.0")
2178 def infer_cone_mode(self) -> bool:
2179 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
2180 return self.get_worktree().infer_cone_mode()
2182 @replace_me(remove_in="0.26.0")
2183 def get_sparse_checkout_patterns(self) -> list[str]:
2184 """Return a list of sparse-checkout patterns from info/sparse-checkout.
2186 Returns:
2187 A list of patterns. Returns an empty list if the file is missing.
2188 """
2189 return self.get_worktree().get_sparse_checkout_patterns()
2191 @replace_me(remove_in="0.26.0")
2192 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None:
2193 """Write the given sparse-checkout patterns into info/sparse-checkout.
2195 Creates the info/ directory if it does not exist.
2197 Args:
2198 patterns: A list of gitignore-style patterns to store.
2199 """
2200 return self.get_worktree().set_sparse_checkout_patterns(patterns)
2202 @replace_me(remove_in="0.26.0")
2203 def set_cone_mode_patterns(self, dirs: Sequence[str] | None = None) -> None:
2204 """Write the given cone-mode directory patterns into info/sparse-checkout.
2206 For each directory to include, add an inclusion line that "undoes" the prior
2207 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
2208 Never add the same line twice.
2209 """
2210 return self.get_worktree().set_cone_mode_patterns(dirs)
2213class MemoryRepo(BaseRepo):
2214 """Repo that stores refs, objects, and named files in memory.
2216 MemoryRepos are always bare: they have no working tree and no index, since
2217 those have a stronger dependency on the filesystem.
2218 """
2220 filter_context: Optional["FilterContext"]
2222 def __init__(self) -> None:
2223 """Create a new repository in memory."""
2224 from .config import ConfigFile
2226 self._reflog: list[Any] = []
2227 refs_container = DictRefsContainer({}, logger=self._append_reflog)
2228 BaseRepo.__init__(self, MemoryObjectStore(), refs_container)
2229 self._named_files: dict[str, bytes] = {}
2230 self.bare = True
2231 self._config = ConfigFile()
2232 self._description: bytes | None = None
2233 self.filter_context = None
2235 def _append_reflog(
2236 self,
2237 ref: bytes,
2238 old_sha: bytes | None,
2239 new_sha: bytes | None,
2240 committer: bytes | None,
2241 timestamp: int | None,
2242 timezone: int | None,
2243 message: bytes | None,
2244 ) -> None:
2245 self._reflog.append(
2246 (ref, old_sha, new_sha, committer, timestamp, timezone, message)
2247 )
2249 def set_description(self, description: bytes) -> None:
2250 """Set the description for this repository.
2252 Args:
2253 description: Text to set as description
2254 """
2255 self._description = description
2257 def get_description(self) -> bytes | None:
2258 """Get the description of this repository.
2260 Returns:
2261 Repository description as bytes
2262 """
2263 return self._description
2265 def _determine_file_mode(self) -> bool:
2266 """Probe the file-system to determine whether permissions can be trusted.
2268 Returns: True if permissions can be trusted, False otherwise.
2269 """
2270 return sys.platform != "win32"
2272 def _determine_symlinks(self) -> bool:
2273 """Probe the file-system to determine whether permissions can be trusted.
2275 Returns: True if permissions can be trusted, False otherwise.
2276 """
2277 return sys.platform != "win32"
2279 def _put_named_file(self, path: str, contents: bytes) -> None:
2280 """Write a file to the control dir with the given name and contents.
2282 Args:
2283 path: The path to the file, relative to the control dir.
2284 contents: A string to write to the file.
2285 """
2286 self._named_files[path] = contents
2288 def _del_named_file(self, path: str) -> None:
2289 try:
2290 del self._named_files[path]
2291 except KeyError:
2292 pass
2294 def get_named_file(
2295 self,
2296 path: str | bytes,
2297 basedir: str | None = None,
2298 ) -> BytesIO | None:
2299 """Get a file from the control dir with a specific name.
2301 Although the filename should be interpreted as a filename relative to
2302 the control dir in a disk-baked Repo, the object returned need not be
2303 pointing to a file in that location.
2305 Args:
2306 path: The path to the file, relative to the control dir.
2307 basedir: Optional base directory for the path
2308 Returns: An open file object, or None if the file does not exist.
2309 """
2310 path_str = path.decode() if isinstance(path, bytes) else path
2311 contents = self._named_files.get(path_str, None)
2312 if contents is None:
2313 return None
2314 return BytesIO(contents)
2316 def open_index(self) -> "Index":
2317 """Fail to open index for this repo, since it is bare.
2319 Raises:
2320 NoIndexPresent: Raised when no index is present
2321 """
2322 raise NoIndexPresent
2324 def get_config(self) -> "ConfigFile":
2325 """Retrieve the config object.
2327 Returns: `ConfigFile` object.
2328 """
2329 return self._config
2331 def get_rebase_state_manager(self) -> "RebaseStateManager":
2332 """Get the appropriate rebase state manager for this repository.
2334 Returns: MemoryRebaseStateManager instance
2335 """
2336 from .rebase import MemoryRebaseStateManager
2338 return MemoryRebaseStateManager(self)
2340 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
2341 """Return a BlobNormalizer object for checkin/checkout operations."""
2342 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2344 # Get fresh configuration and GitAttributes
2345 config_stack = self.get_config_stack()
2346 git_attributes = self.get_gitattributes()
2348 # Lazily create FilterContext if needed
2349 if self.filter_context is None:
2350 filter_registry = FilterRegistry(config_stack, self)
2351 self.filter_context = FilterContext(filter_registry)
2352 else:
2353 # Refresh the context with current config to handle config changes
2354 self.filter_context.refresh_config(config_stack)
2356 # Return a new FilterBlobNormalizer with the context
2357 return FilterBlobNormalizer(
2358 config_stack, git_attributes, filter_context=self.filter_context
2359 )
2361 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
2362 """Read gitattributes for the repository."""
2363 from .attrs import GitAttributes
2365 # Memory repos don't have working trees or gitattributes files
2366 # Return empty GitAttributes
2367 return GitAttributes([])
2369 def close(self) -> None:
2370 """Close any resources opened by this repository."""
2371 # Clean up filter context if it was created
2372 if self.filter_context is not None:
2373 self.filter_context.close()
2374 self.filter_context = None
2376 def do_commit(
2377 self,
2378 message: bytes | None = None,
2379 committer: bytes | None = None,
2380 author: bytes | None = None,
2381 commit_timestamp: float | None = None,
2382 commit_timezone: int | None = None,
2383 author_timestamp: float | None = None,
2384 author_timezone: int | None = None,
2385 tree: ObjectID | None = None,
2386 encoding: bytes | None = None,
2387 ref: Ref | None = b"HEAD",
2388 merge_heads: list[ObjectID] | None = None,
2389 no_verify: bool = False,
2390 sign: bool = False,
2391 ) -> bytes:
2392 """Create a new commit.
2394 This is a simplified implementation for in-memory repositories that
2395 doesn't support worktree operations or hooks.
2397 Args:
2398 message: Commit message
2399 committer: Committer fullname
2400 author: Author fullname
2401 commit_timestamp: Commit timestamp (defaults to now)
2402 commit_timezone: Commit timestamp timezone (defaults to GMT)
2403 author_timestamp: Author timestamp (defaults to commit timestamp)
2404 author_timezone: Author timestamp timezone (defaults to commit timezone)
2405 tree: SHA1 of the tree root to use
2406 encoding: Encoding
2407 ref: Optional ref to commit to (defaults to current branch).
2408 If None, creates a dangling commit without updating any ref.
2409 merge_heads: Merge heads
2410 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo)
2411 sign: GPG Sign the commit (ignored for MemoryRepo)
2413 Returns:
2414 New commit SHA1
2415 """
2416 import time
2418 from .objects import Commit
2420 if tree is None:
2421 raise ValueError("tree must be specified for MemoryRepo")
2423 c = Commit()
2424 if len(tree) != 40:
2425 raise ValueError("tree must be a 40-byte hex sha string")
2426 c.tree = tree
2428 config = self.get_config_stack()
2429 if merge_heads is None:
2430 merge_heads = []
2431 if committer is None:
2432 committer = get_user_identity(config, kind="COMMITTER")
2433 check_user_identity(committer)
2434 c.committer = committer
2435 if commit_timestamp is None:
2436 commit_timestamp = time.time()
2437 c.commit_time = int(commit_timestamp)
2438 if commit_timezone is None:
2439 commit_timezone = 0
2440 c.commit_timezone = commit_timezone
2441 if author is None:
2442 author = get_user_identity(config, kind="AUTHOR")
2443 c.author = author
2444 check_user_identity(author)
2445 if author_timestamp is None:
2446 author_timestamp = commit_timestamp
2447 c.author_time = int(author_timestamp)
2448 if author_timezone is None:
2449 author_timezone = commit_timezone
2450 c.author_timezone = author_timezone
2451 if encoding is None:
2452 try:
2453 encoding = config.get(("i18n",), "commitEncoding")
2454 except KeyError:
2455 pass
2456 if encoding is not None:
2457 c.encoding = encoding
2459 # Handle message (for MemoryRepo, we don't support callable messages)
2460 if callable(message):
2461 message = message(self, c)
2462 if message is None:
2463 raise ValueError("Message callback returned None")
2465 if message is None:
2466 raise ValueError("No commit message specified")
2468 c.message = message
2470 if ref is None:
2471 # Create a dangling commit
2472 c.parents = merge_heads
2473 self.object_store.add_object(c)
2474 else:
2475 try:
2476 old_head = self.refs[ref]
2477 c.parents = [old_head, *merge_heads]
2478 self.object_store.add_object(c)
2479 ok = self.refs.set_if_equals(
2480 ref,
2481 old_head,
2482 c.id,
2483 message=b"commit: " + message,
2484 committer=committer,
2485 timestamp=int(commit_timestamp),
2486 timezone=commit_timezone,
2487 )
2488 except KeyError:
2489 c.parents = merge_heads
2490 self.object_store.add_object(c)
2491 ok = self.refs.add_if_new(
2492 ref,
2493 c.id,
2494 message=b"commit: " + message,
2495 committer=committer,
2496 timestamp=int(commit_timestamp),
2497 timezone=commit_timezone,
2498 )
2499 if not ok:
2500 from .errors import CommitError
2502 raise CommitError(f"{ref!r} changed during commit")
2504 return c.id
2506 @classmethod
2507 def init_bare(
2508 cls,
2509 objects: Iterable[ShaFile],
2510 refs: Mapping[bytes, bytes],
2511 format: int | None = None,
2512 ) -> "MemoryRepo":
2513 """Create a new bare repository in memory.
2515 Args:
2516 objects: Objects for the new repository,
2517 as iterable
2518 refs: Refs as dictionary, mapping names
2519 to object SHA1s
2520 format: Repository format version (defaults to 0)
2521 """
2522 ret = cls()
2523 for obj in objects:
2524 ret.object_store.add_object(obj)
2525 for refname, sha in refs.items():
2526 ret.refs.add_if_new(refname, sha)
2527 ret._init_files(bare=True, format=format)
2528 return ret