Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# repo.py -- For dealing with git repositories.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
24"""Repository access.
26This module contains the base class for git repositories
27(BaseRepo) and an implementation which uses a repository on
28local disk (Repo).
30"""
32import os
33import stat
34import sys
35import time
36import warnings
37from collections.abc import Generator, Iterable, Iterator, Mapping, Sequence
38from io import BytesIO
39from types import TracebackType
40from typing import (
41 TYPE_CHECKING,
42 Any,
43 BinaryIO,
44 Callable,
45 Optional,
46 TypeVar,
47 Union,
48)
50if TYPE_CHECKING:
51 # There are no circular imports here, but we try to defer imports as long
52 # as possible to reduce start-up time for anything that doesn't need
53 # these imports.
54 from .attrs import GitAttributes
55 from .config import ConditionMatcher, ConfigFile, StackedConfig
56 from .diff_tree import RenameDetector
57 from .filters import FilterBlobNormalizer, FilterContext
58 from .index import Index
59 from .notes import Notes
60 from .object_store import BaseObjectStore, GraphWalker
61 from .pack import UnpackedObject
62 from .rebase import RebaseStateManager
63 from .walk import Walker
64 from .worktree import WorkTree
66from . import reflog, replace_me
67from .errors import (
68 NoIndexPresent,
69 NotBlobError,
70 NotCommitError,
71 NotGitRepository,
72 NotTagError,
73 NotTreeError,
74 RefFormatError,
75)
76from .file import GitFile
77from .hooks import (
78 CommitMsgShellHook,
79 Hook,
80 PostCommitShellHook,
81 PostReceiveShellHook,
82 PreCommitShellHook,
83)
84from .object_store import (
85 DiskObjectStore,
86 MemoryObjectStore,
87 MissingObjectFinder,
88 ObjectStoreGraphWalker,
89 PackBasedObjectStore,
90 PackCapableObjectStore,
91 find_shallow,
92 peel_sha,
93)
94from .objects import (
95 Blob,
96 Commit,
97 ObjectID,
98 ShaFile,
99 Tag,
100 Tree,
101 check_hexsha,
102 valid_hexsha,
103)
104from .pack import generate_unpacked_objects
105from .refs import (
106 ANNOTATED_TAG_SUFFIX, # noqa: F401
107 LOCAL_BRANCH_PREFIX,
108 LOCAL_TAG_PREFIX, # noqa: F401
109 SYMREF, # noqa: F401
110 DictRefsContainer,
111 DiskRefsContainer,
112 InfoRefsContainer, # noqa: F401
113 Ref,
114 RefsContainer,
115 _set_default_branch,
116 _set_head,
117 _set_origin_head,
118 check_ref_format, # noqa: F401
119 is_per_worktree_ref,
120 read_packed_refs, # noqa: F401
121 read_packed_refs_with_peeled, # noqa: F401
122 serialize_refs,
123 write_packed_refs, # noqa: F401
124)
126CONTROLDIR = ".git"
127OBJECTDIR = "objects"
128DEFAULT_OFS_DELTA = True
130T = TypeVar("T", bound="ShaFile")
131REFSDIR = "refs"
132REFSDIR_TAGS = "tags"
133REFSDIR_HEADS = "heads"
134INDEX_FILENAME = "index"
135COMMONDIR = "commondir"
136GITDIR = "gitdir"
137WORKTREES = "worktrees"
139BASE_DIRECTORIES = [
140 ["branches"],
141 [REFSDIR],
142 [REFSDIR, REFSDIR_TAGS],
143 [REFSDIR, REFSDIR_HEADS],
144 ["hooks"],
145 ["info"],
146]
148DEFAULT_BRANCH = b"master"
151class InvalidUserIdentity(Exception):
152 """User identity is not of the format 'user <email>'."""
154 def __init__(self, identity: str) -> None:
155 """Initialize InvalidUserIdentity exception."""
156 self.identity = identity
159class DefaultIdentityNotFound(Exception):
160 """Default identity could not be determined."""
163# TODO(jelmer): Cache?
164def _get_default_identity() -> tuple[str, str]:
165 import socket
167 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
168 username = os.environ.get(name)
169 if username:
170 break
171 else:
172 username = None
174 try:
175 import pwd
176 except ImportError:
177 fullname = None
178 else:
179 try:
180 entry = pwd.getpwuid(os.getuid()) # type: ignore[attr-defined,unused-ignore]
181 except KeyError:
182 fullname = None
183 else:
184 if getattr(entry, "gecos", None):
185 fullname = entry.pw_gecos.split(",")[0]
186 else:
187 fullname = None
188 if username is None:
189 username = entry.pw_name
190 if not fullname:
191 if username is None:
192 raise DefaultIdentityNotFound("no username found")
193 fullname = username
194 email = os.environ.get("EMAIL")
195 if email is None:
196 if username is None:
197 raise DefaultIdentityNotFound("no username found")
198 email = f"{username}@{socket.gethostname()}"
199 return (fullname, email)
202def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes:
203 """Determine the identity to use for new commits.
205 If kind is set, this first checks
206 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL.
208 If those variables are not set, then it will fall back
209 to reading the user.name and user.email settings from
210 the specified configuration.
212 If that also fails, then it will fall back to using
213 the current users' identity as obtained from the host
214 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f).
216 Args:
217 config: Configuration stack to read from
218 kind: Optional kind to return identity for,
219 usually either "AUTHOR" or "COMMITTER".
221 Returns:
222 A user identity
223 """
224 user: Optional[bytes] = None
225 email: Optional[bytes] = None
226 if kind:
227 user_uc = os.environ.get("GIT_" + kind + "_NAME")
228 if user_uc is not None:
229 user = user_uc.encode("utf-8")
230 email_uc = os.environ.get("GIT_" + kind + "_EMAIL")
231 if email_uc is not None:
232 email = email_uc.encode("utf-8")
233 if user is None:
234 try:
235 user = config.get(("user",), "name")
236 except KeyError:
237 user = None
238 if email is None:
239 try:
240 email = config.get(("user",), "email")
241 except KeyError:
242 email = None
243 default_user, default_email = _get_default_identity()
244 if user is None:
245 user = default_user.encode("utf-8")
246 if email is None:
247 email = default_email.encode("utf-8")
248 if email.startswith(b"<") and email.endswith(b">"):
249 email = email[1:-1]
250 return user + b" <" + email + b">"
253def check_user_identity(identity: bytes) -> None:
254 """Verify that a user identity is formatted correctly.
256 Args:
257 identity: User identity bytestring
258 Raises:
259 InvalidUserIdentity: Raised when identity is invalid
260 """
261 try:
262 _fst, snd = identity.split(b" <", 1)
263 except ValueError as exc:
264 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc
265 if b">" not in snd:
266 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
267 if b"\0" in identity or b"\n" in identity:
268 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
271def parse_graftpoints(
272 graftpoints: Iterable[bytes],
273) -> dict[bytes, list[bytes]]:
274 """Convert a list of graftpoints into a dict.
276 Args:
277 graftpoints: Iterator of graftpoint lines
279 Each line is formatted as:
280 <commit sha1> <parent sha1> [<parent sha1>]*
282 Resulting dictionary is:
283 <commit sha1>: [<parent sha1>*]
285 https://git.wiki.kernel.org/index.php/GraftPoint
286 """
287 grafts = {}
288 for line in graftpoints:
289 raw_graft = line.split(None, 1)
291 commit = raw_graft[0]
292 if len(raw_graft) == 2:
293 parents = raw_graft[1].split()
294 else:
295 parents = []
297 for sha in [commit, *parents]:
298 check_hexsha(sha, "Invalid graftpoint")
300 grafts[commit] = parents
301 return grafts
304def serialize_graftpoints(graftpoints: Mapping[bytes, Sequence[bytes]]) -> bytes:
305 """Convert a dictionary of grafts into string.
307 The graft dictionary is:
308 <commit sha1>: [<parent sha1>*]
310 Each line is formatted as:
311 <commit sha1> <parent sha1> [<parent sha1>]*
313 https://git.wiki.kernel.org/index.php/GraftPoint
315 """
316 graft_lines = []
317 for commit, parents in graftpoints.items():
318 if parents:
319 graft_lines.append(commit + b" " + b" ".join(parents))
320 else:
321 graft_lines.append(commit)
322 return b"\n".join(graft_lines)
325def _set_filesystem_hidden(path: str) -> None:
326 """Mark path as to be hidden if supported by platform and filesystem.
328 On win32 uses SetFileAttributesW api:
329 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw>
330 """
331 if sys.platform == "win32":
332 import ctypes
333 from ctypes.wintypes import BOOL, DWORD, LPCWSTR
335 FILE_ATTRIBUTE_HIDDEN = 2
336 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)(
337 ("SetFileAttributesW", ctypes.windll.kernel32)
338 )
340 if isinstance(path, bytes):
341 path = os.fsdecode(path)
342 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN):
343 pass # Could raise or log `ctypes.WinError()` here
345 # Could implement other platform specific filesystem hiding here
348class ParentsProvider:
349 """Provider for commit parent information."""
351 def __init__(
352 self,
353 store: "BaseObjectStore",
354 grafts: dict[bytes, list[bytes]] = {},
355 shallows: Iterable[bytes] = [],
356 ) -> None:
357 """Initialize ParentsProvider.
359 Args:
360 store: Object store to use
361 grafts: Graft information
362 shallows: Shallow commit SHAs
363 """
364 self.store = store
365 self.grafts = grafts
366 self.shallows = set(shallows)
368 # Get commit graph once at initialization for performance
369 self.commit_graph = store.get_commit_graph()
371 def get_parents(
372 self, commit_id: bytes, commit: Optional[Commit] = None
373 ) -> list[bytes]:
374 """Get parents for a commit using the parents provider."""
375 try:
376 return self.grafts[commit_id]
377 except KeyError:
378 pass
379 if commit_id in self.shallows:
380 return []
382 # Try to use commit graph for faster parent lookup
383 if self.commit_graph:
384 parents = self.commit_graph.get_parents(commit_id)
385 if parents is not None:
386 return parents
388 # Fallback to reading the commit object
389 if commit is None:
390 obj = self.store[commit_id]
391 assert isinstance(obj, Commit)
392 commit = obj
393 parents = commit.parents
394 assert isinstance(parents, list)
395 return parents
398class BaseRepo:
399 """Base class for a git repository.
401 This base class is meant to be used for Repository implementations that e.g.
402 work on top of a different transport than a standard filesystem path.
404 Attributes:
405 object_store: Dictionary-like object for accessing
406 the objects
407 refs: Dictionary-like object with the refs in this
408 repository
409 """
411 def __init__(
412 self, object_store: "PackCapableObjectStore", refs: RefsContainer
413 ) -> None:
414 """Open a repository.
416 This shouldn't be called directly, but rather through one of the
417 base classes, such as MemoryRepo or Repo.
419 Args:
420 object_store: Object store to use
421 refs: Refs container to use
422 """
423 self.object_store = object_store
424 self.refs = refs
426 self._graftpoints: dict[bytes, list[bytes]] = {}
427 self.hooks: dict[str, Hook] = {}
429 def _determine_file_mode(self) -> bool:
430 """Probe the file-system to determine whether permissions can be trusted.
432 Returns: True if permissions can be trusted, False otherwise.
433 """
434 raise NotImplementedError(self._determine_file_mode)
436 def _determine_symlinks(self) -> bool:
437 """Probe the filesystem to determine whether symlinks can be created.
439 Returns: True if symlinks can be created, False otherwise.
440 """
441 # For now, just mimic the old behaviour
442 return sys.platform != "win32"
444 def _init_files(
445 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None
446 ) -> None:
447 """Initialize a default set of named files."""
448 from .config import ConfigFile
450 self._put_named_file("description", b"Unnamed repository")
451 f = BytesIO()
452 cf = ConfigFile()
453 if format is None:
454 format = 0
455 if format not in (0, 1):
456 raise ValueError(f"Unsupported repository format version: {format}")
457 cf.set("core", "repositoryformatversion", str(format))
458 if self._determine_file_mode():
459 cf.set("core", "filemode", True)
460 else:
461 cf.set("core", "filemode", False)
463 if symlinks is None and not bare:
464 symlinks = self._determine_symlinks()
466 if symlinks is False:
467 cf.set("core", "symlinks", symlinks)
469 cf.set("core", "bare", bare)
470 cf.set("core", "logallrefupdates", True)
471 cf.write_to_file(f)
472 self._put_named_file("config", f.getvalue())
473 self._put_named_file(os.path.join("info", "exclude"), b"")
475 def get_named_file(self, path: str) -> Optional[BinaryIO]:
476 """Get a file from the control dir with a specific name.
478 Although the filename should be interpreted as a filename relative to
479 the control dir in a disk-based Repo, the object returned need not be
480 pointing to a file in that location.
482 Args:
483 path: The path to the file, relative to the control dir.
484 Returns: An open file object, or None if the file does not exist.
485 """
486 raise NotImplementedError(self.get_named_file)
488 def _put_named_file(self, path: str, contents: bytes) -> None:
489 """Write a file to the control dir with the given name and contents.
491 Args:
492 path: The path to the file, relative to the control dir.
493 contents: A string to write to the file.
494 """
495 raise NotImplementedError(self._put_named_file)
497 def _del_named_file(self, path: str) -> None:
498 """Delete a file in the control directory with the given name."""
499 raise NotImplementedError(self._del_named_file)
501 def open_index(self) -> "Index":
502 """Open the index for this repository.
504 Raises:
505 NoIndexPresent: If no index is present
506 Returns: The matching `Index`
507 """
508 raise NotImplementedError(self.open_index)
510 def fetch(
511 self,
512 target: "BaseRepo",
513 determine_wants: Optional[
514 Callable[[Mapping[bytes, bytes], Optional[int]], list[bytes]]
515 ] = None,
516 progress: Optional[Callable[..., None]] = None,
517 depth: Optional[int] = None,
518 ) -> dict[bytes, bytes]:
519 """Fetch objects into another repository.
521 Args:
522 target: The target repository
523 determine_wants: Optional function to determine what refs to
524 fetch.
525 progress: Optional progress function
526 depth: Optional shallow fetch depth
527 Returns: The local refs
528 """
529 if determine_wants is None:
530 determine_wants = target.object_store.determine_wants_all
531 count, pack_data = self.fetch_pack_data(
532 determine_wants,
533 target.get_graph_walker(),
534 progress=progress,
535 depth=depth,
536 )
537 target.object_store.add_pack_data(count, pack_data, progress)
538 return self.get_refs()
540 def fetch_pack_data(
541 self,
542 determine_wants: Callable[[Mapping[bytes, bytes], Optional[int]], list[bytes]],
543 graph_walker: "GraphWalker",
544 progress: Optional[Callable[[bytes], None]],
545 *,
546 get_tagged: Optional[Callable[[], dict[bytes, bytes]]] = None,
547 depth: Optional[int] = None,
548 ) -> tuple[int, Iterator["UnpackedObject"]]:
549 """Fetch the pack data required for a set of revisions.
551 Args:
552 determine_wants: Function that takes a dictionary with heads
553 and returns the list of heads to fetch.
554 graph_walker: Object that can iterate over the list of revisions
555 to fetch and has an "ack" method that will be called to acknowledge
556 that a revision is present.
557 progress: Simple progress function that will be called with
558 updated progress strings.
559 get_tagged: Function that returns a dict of pointed-to sha ->
560 tag sha for including tags.
561 depth: Shallow fetch depth
562 Returns: count and iterator over pack data
563 """
564 missing_objects = self.find_missing_objects(
565 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth
566 )
567 if missing_objects is None:
568 return 0, iter([])
569 remote_has = missing_objects.get_remote_has()
570 object_ids = list(missing_objects)
571 return len(object_ids), generate_unpacked_objects(
572 self.object_store, object_ids, progress=progress, other_haves=remote_has
573 )
575 def find_missing_objects(
576 self,
577 determine_wants: Callable[[Mapping[bytes, bytes], Optional[int]], list[bytes]],
578 graph_walker: "GraphWalker",
579 progress: Optional[Callable[[bytes], None]],
580 *,
581 get_tagged: Optional[Callable[[], dict[bytes, bytes]]] = None,
582 depth: Optional[int] = None,
583 ) -> Optional[MissingObjectFinder]:
584 """Fetch the missing objects required for a set of revisions.
586 Args:
587 determine_wants: Function that takes a dictionary with heads
588 and returns the list of heads to fetch.
589 graph_walker: Object that can iterate over the list of revisions
590 to fetch and has an "ack" method that will be called to acknowledge
591 that a revision is present.
592 progress: Simple progress function that will be called with
593 updated progress strings.
594 get_tagged: Function that returns a dict of pointed-to sha ->
595 tag sha for including tags.
596 depth: Shallow fetch depth
597 Returns: iterator over objects, with __len__ implemented
598 """
599 refs = serialize_refs(self.object_store, self.get_refs())
601 wants = determine_wants(refs, depth)
602 if not isinstance(wants, list):
603 raise TypeError("determine_wants() did not return a list")
605 current_shallow = set(getattr(graph_walker, "shallow", set()))
607 if depth not in (None, 0):
608 assert depth is not None
609 shallow, not_shallow = find_shallow(self.object_store, wants, depth)
610 # Only update if graph_walker has shallow attribute
611 if hasattr(graph_walker, "shallow"):
612 graph_walker.shallow.update(shallow - not_shallow)
613 new_shallow = graph_walker.shallow - current_shallow
614 unshallow = not_shallow & current_shallow
615 setattr(graph_walker, "unshallow", unshallow)
616 if hasattr(graph_walker, "update_shallow"):
617 graph_walker.update_shallow(new_shallow, unshallow)
618 else:
619 unshallow = getattr(graph_walker, "unshallow", set())
621 if wants == []:
622 # TODO(dborowitz): find a way to short-circuit that doesn't change
623 # this interface.
625 if getattr(graph_walker, "shallow", set()) or unshallow:
626 # Do not send a pack in shallow short-circuit path
627 return None
629 # Return an actual MissingObjectFinder with empty wants
630 return MissingObjectFinder(
631 self.object_store,
632 haves=[],
633 wants=[],
634 )
636 # If the graph walker is set up with an implementation that can
637 # ACK/NAK to the wire, it will write data to the client through
638 # this call as a side-effect.
639 haves = self.object_store.find_common_revisions(graph_walker)
641 # Deal with shallow requests separately because the haves do
642 # not reflect what objects are missing
643 if getattr(graph_walker, "shallow", set()) or unshallow:
644 # TODO: filter the haves commits from iter_shas. the specific
645 # commits aren't missing.
646 haves = []
648 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow)
650 def get_parents(commit: Commit) -> list[bytes]:
651 """Get parents for a commit using the parents provider.
653 Args:
654 commit: Commit object
656 Returns:
657 List of parent commit SHAs
658 """
659 return parents_provider.get_parents(commit.id, commit)
661 return MissingObjectFinder(
662 self.object_store,
663 haves=haves,
664 wants=wants,
665 shallow=getattr(graph_walker, "shallow", set()),
666 progress=progress,
667 get_tagged=get_tagged,
668 get_parents=get_parents,
669 )
671 def generate_pack_data(
672 self,
673 have: set[ObjectID],
674 want: set[ObjectID],
675 progress: Optional[Callable[[str], None]] = None,
676 ofs_delta: Optional[bool] = None,
677 ) -> tuple[int, Iterator["UnpackedObject"]]:
678 """Generate pack data objects for a set of wants/haves.
680 Args:
681 have: List of SHA1s of objects that should not be sent
682 want: List of SHA1s of objects that should be sent
683 ofs_delta: Whether OFS deltas can be included
684 progress: Optional progress reporting method
685 """
686 return self.object_store.generate_pack_data(
687 have,
688 want,
689 shallow=self.get_shallow(),
690 progress=progress,
691 ofs_delta=ofs_delta if ofs_delta is not None else DEFAULT_OFS_DELTA,
692 )
694 def get_graph_walker(
695 self, heads: Optional[list[ObjectID]] = None
696 ) -> ObjectStoreGraphWalker:
697 """Retrieve a graph walker.
699 A graph walker is used by a remote repository (or proxy)
700 to find out which objects are present in this repository.
702 Args:
703 heads: Repository heads to use (optional)
704 Returns: A graph walker object
705 """
706 if heads is None:
707 heads = [
708 sha
709 for sha in self.refs.as_dict(b"refs/heads").values()
710 if sha in self.object_store
711 ]
712 parents_provider = ParentsProvider(self.object_store)
713 return ObjectStoreGraphWalker(
714 heads,
715 parents_provider.get_parents,
716 shallow=self.get_shallow(),
717 update_shallow=self.update_shallow,
718 )
720 def get_refs(self) -> dict[bytes, bytes]:
721 """Get dictionary with all refs.
723 Returns: A ``dict`` mapping ref names to SHA1s
724 """
725 return self.refs.as_dict()
727 def head(self) -> bytes:
728 """Return the SHA1 pointed at by HEAD."""
729 # TODO: move this method to WorkTree
730 return self.refs[b"HEAD"]
732 def _get_object(self, sha: bytes, cls: type[T]) -> T:
733 assert len(sha) in (20, 40)
734 ret = self.get_object(sha)
735 if not isinstance(ret, cls):
736 if cls is Commit:
737 raise NotCommitError(ret.id)
738 elif cls is Blob:
739 raise NotBlobError(ret.id)
740 elif cls is Tree:
741 raise NotTreeError(ret.id)
742 elif cls is Tag:
743 raise NotTagError(ret.id)
744 else:
745 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}")
746 return ret
748 def get_object(self, sha: bytes) -> ShaFile:
749 """Retrieve the object with the specified SHA.
751 Args:
752 sha: SHA to retrieve
753 Returns: A ShaFile object
754 Raises:
755 KeyError: when the object can not be found
756 """
757 return self.object_store[sha]
759 def parents_provider(self) -> ParentsProvider:
760 """Get a parents provider for this repository.
762 Returns:
763 ParentsProvider instance configured with grafts and shallows
764 """
765 return ParentsProvider(
766 self.object_store,
767 grafts=self._graftpoints,
768 shallows=self.get_shallow(),
769 )
771 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]:
772 """Retrieve the parents of a specific commit.
774 If the specific commit is a graftpoint, the graft parents
775 will be returned instead.
777 Args:
778 sha: SHA of the commit for which to retrieve the parents
779 commit: Optional commit matching the sha
780 Returns: List of parents
781 """
782 return self.parents_provider().get_parents(sha, commit)
784 def get_config(self) -> "ConfigFile":
785 """Retrieve the config object.
787 Returns: `ConfigFile` object for the ``.git/config`` file.
788 """
789 raise NotImplementedError(self.get_config)
791 def get_worktree_config(self) -> "ConfigFile":
792 """Retrieve the worktree config object."""
793 raise NotImplementedError(self.get_worktree_config)
795 def get_description(self) -> Optional[bytes]:
796 """Retrieve the description for this repository.
798 Returns: Bytes with the description of the repository
799 as set by the user.
800 """
801 raise NotImplementedError(self.get_description)
803 def set_description(self, description: bytes) -> None:
804 """Set the description for this repository.
806 Args:
807 description: Text to set as description for this repository.
808 """
809 raise NotImplementedError(self.set_description)
811 def get_rebase_state_manager(self) -> "RebaseStateManager":
812 """Get the appropriate rebase state manager for this repository.
814 Returns: RebaseStateManager instance
815 """
816 raise NotImplementedError(self.get_rebase_state_manager)
818 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
819 """Return a BlobNormalizer object for checkin/checkout operations.
821 Returns: BlobNormalizer instance
822 """
823 raise NotImplementedError(self.get_blob_normalizer)
825 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
826 """Read gitattributes for the repository.
828 Args:
829 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
831 Returns:
832 GitAttributes object that can be used to match paths
833 """
834 raise NotImplementedError(self.get_gitattributes)
836 def get_config_stack(self) -> "StackedConfig":
837 """Return a config stack for this repository.
839 This stack accesses the configuration for both this repository
840 itself (.git/config) and the global configuration, which usually
841 lives in ~/.gitconfig.
843 Returns: `Config` instance for this repository
844 """
845 from .config import ConfigFile, StackedConfig
847 local_config = self.get_config()
848 backends: list[ConfigFile] = [local_config]
849 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
850 backends.append(self.get_worktree_config())
852 backends += StackedConfig.default_backends()
853 return StackedConfig(backends, writable=local_config)
855 def get_shallow(self) -> set[ObjectID]:
856 """Get the set of shallow commits.
858 Returns: Set of shallow commits.
859 """
860 f = self.get_named_file("shallow")
861 if f is None:
862 return set()
863 with f:
864 return {line.strip() for line in f}
866 def update_shallow(
867 self, new_shallow: Optional[set[bytes]], new_unshallow: Optional[set[bytes]]
868 ) -> None:
869 """Update the list of shallow objects.
871 Args:
872 new_shallow: Newly shallow objects
873 new_unshallow: Newly no longer shallow objects
874 """
875 shallow = self.get_shallow()
876 if new_shallow:
877 shallow.update(new_shallow)
878 if new_unshallow:
879 shallow.difference_update(new_unshallow)
880 if shallow:
881 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow]))
882 else:
883 self._del_named_file("shallow")
885 def get_peeled(self, ref: Ref) -> ObjectID:
886 """Get the peeled value of a ref.
888 Args:
889 ref: The refname to peel.
890 Returns: The fully-peeled SHA1 of a tag object, after peeling all
891 intermediate tags; if the original ref does not point to a tag,
892 this will equal the original SHA1.
893 """
894 cached = self.refs.get_peeled(ref)
895 if cached is not None:
896 return cached
897 return peel_sha(self.object_store, self.refs[ref])[1].id
899 @property
900 def notes(self) -> "Notes":
901 """Access notes functionality for this repository.
903 Returns:
904 Notes object for accessing notes
905 """
906 from .notes import Notes
908 return Notes(self.object_store, self.refs)
910 def get_walker(
911 self,
912 include: Optional[Sequence[bytes]] = None,
913 exclude: Optional[Sequence[bytes]] = None,
914 order: str = "date",
915 reverse: bool = False,
916 max_entries: Optional[int] = None,
917 paths: Optional[Sequence[bytes]] = None,
918 rename_detector: Optional["RenameDetector"] = None,
919 follow: bool = False,
920 since: Optional[int] = None,
921 until: Optional[int] = None,
922 queue_cls: Optional[type] = None,
923 ) -> "Walker":
924 """Obtain a walker for this repository.
926 Args:
927 include: Iterable of SHAs of commits to include along with their
928 ancestors. Defaults to [HEAD]
929 exclude: Iterable of SHAs of commits to exclude along with their
930 ancestors, overriding includes.
931 order: ORDER_* constant specifying the order of results.
932 Anything other than ORDER_DATE may result in O(n) memory usage.
933 reverse: If True, reverse the order of output, requiring O(n)
934 memory.
935 max_entries: The maximum number of entries to yield, or None for
936 no limit.
937 paths: Iterable of file or subtree paths to show entries for.
938 rename_detector: diff.RenameDetector object for detecting
939 renames.
940 follow: If True, follow path across renames/copies. Forces a
941 default rename_detector.
942 since: Timestamp to list commits after.
943 until: Timestamp to list commits before.
944 queue_cls: A class to use for a queue of commits, supporting the
945 iterator protocol. The constructor takes a single argument, the Walker.
946 **kwargs: Additional keyword arguments
948 Returns: A `Walker` object
949 """
950 from .walk import Walker, _CommitTimeQueue
952 if include is None:
953 include = [self.head()]
955 # Pass all arguments to Walker explicitly to avoid type issues with **kwargs
956 return Walker(
957 self.object_store,
958 include,
959 exclude=exclude,
960 order=order,
961 reverse=reverse,
962 max_entries=max_entries,
963 paths=paths,
964 rename_detector=rename_detector,
965 follow=follow,
966 since=since,
967 until=until,
968 get_parents=lambda commit: self.get_parents(commit.id, commit),
969 queue_cls=queue_cls if queue_cls is not None else _CommitTimeQueue,
970 )
972 def __getitem__(self, name: Union[ObjectID, Ref]) -> "ShaFile":
973 """Retrieve a Git object by SHA1 or ref.
975 Args:
976 name: A Git object SHA1 or a ref name
977 Returns: A `ShaFile` object, such as a Commit or Blob
978 Raises:
979 KeyError: when the specified ref or object does not exist
980 """
981 if not isinstance(name, bytes):
982 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}")
983 if len(name) in (20, 40):
984 try:
985 return self.object_store[name]
986 except (KeyError, ValueError):
987 pass
988 try:
989 return self.object_store[self.refs[name]]
990 except RefFormatError as exc:
991 raise KeyError(name) from exc
993 def __contains__(self, name: bytes) -> bool:
994 """Check if a specific Git object or ref is present.
996 Args:
997 name: Git object SHA1 or ref name
998 """
999 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)):
1000 return name in self.object_store or name in self.refs
1001 else:
1002 return name in self.refs
1004 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None:
1005 """Set a ref.
1007 Args:
1008 name: ref name
1009 value: Ref value - either a ShaFile object, or a hex sha
1010 """
1011 if name.startswith(b"refs/") or name == b"HEAD":
1012 if isinstance(value, ShaFile):
1013 self.refs[name] = value.id
1014 elif isinstance(value, bytes):
1015 self.refs[name] = value
1016 else:
1017 raise TypeError(value)
1018 else:
1019 raise ValueError(name)
1021 def __delitem__(self, name: bytes) -> None:
1022 """Remove a ref.
1024 Args:
1025 name: Name of the ref to remove
1026 """
1027 if name.startswith(b"refs/") or name == b"HEAD":
1028 del self.refs[name]
1029 else:
1030 raise ValueError(name)
1032 def _get_user_identity(
1033 self, config: "StackedConfig", kind: Optional[str] = None
1034 ) -> bytes:
1035 """Determine the identity to use for new commits."""
1036 warnings.warn(
1037 "use get_user_identity() rather than Repo._get_user_identity",
1038 DeprecationWarning,
1039 )
1040 return get_user_identity(config)
1042 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None:
1043 """Add or modify graftpoints.
1045 Args:
1046 updated_graftpoints: Dict of commit shas to list of parent shas
1047 """
1048 # Simple validation
1049 for commit, parents in updated_graftpoints.items():
1050 for sha in [commit, *parents]:
1051 check_hexsha(sha, "Invalid graftpoint")
1053 self._graftpoints.update(updated_graftpoints)
1055 def _remove_graftpoints(self, to_remove: Sequence[bytes] = ()) -> None:
1056 """Remove graftpoints.
1058 Args:
1059 to_remove: List of commit shas
1060 """
1061 for sha in to_remove:
1062 del self._graftpoints[sha]
1064 def _read_heads(self, name: str) -> list[bytes]:
1065 f = self.get_named_file(name)
1066 if f is None:
1067 return []
1068 with f:
1069 return [line.strip() for line in f.readlines() if line.strip()]
1071 def get_worktree(self) -> "WorkTree":
1072 """Get the working tree for this repository.
1074 Returns:
1075 WorkTree instance for performing working tree operations
1077 Raises:
1078 NotImplementedError: If the repository doesn't support working trees
1079 """
1080 raise NotImplementedError(
1081 "Working tree operations not supported by this repository type"
1082 )
1084 @replace_me(remove_in="0.26.0")
1085 def do_commit(
1086 self,
1087 message: Optional[bytes] = None,
1088 committer: Optional[bytes] = None,
1089 author: Optional[bytes] = None,
1090 commit_timestamp: Optional[float] = None,
1091 commit_timezone: Optional[int] = None,
1092 author_timestamp: Optional[float] = None,
1093 author_timezone: Optional[int] = None,
1094 tree: Optional[ObjectID] = None,
1095 encoding: Optional[bytes] = None,
1096 ref: Optional[Ref] = b"HEAD",
1097 merge_heads: Optional[list[ObjectID]] = None,
1098 no_verify: bool = False,
1099 sign: bool = False,
1100 ) -> bytes:
1101 """Create a new commit.
1103 If not specified, committer and author default to
1104 get_user_identity(..., 'COMMITTER')
1105 and get_user_identity(..., 'AUTHOR') respectively.
1107 Args:
1108 message: Commit message (bytes or callable that takes (repo, commit)
1109 and returns bytes)
1110 committer: Committer fullname
1111 author: Author fullname
1112 commit_timestamp: Commit timestamp (defaults to now)
1113 commit_timezone: Commit timestamp timezone (defaults to GMT)
1114 author_timestamp: Author timestamp (defaults to commit
1115 timestamp)
1116 author_timezone: Author timestamp timezone
1117 (defaults to commit timestamp timezone)
1118 tree: SHA1 of the tree root to use (if not specified the
1119 current index will be committed).
1120 encoding: Encoding
1121 ref: Optional ref to commit to (defaults to current branch).
1122 If None, creates a dangling commit without updating any ref.
1123 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
1124 no_verify: Skip pre-commit and commit-msg hooks
1125 sign: GPG Sign the commit (bool, defaults to False,
1126 pass True to use default GPG key,
1127 pass a str containing Key ID to use a specific GPG key)
1129 Returns:
1130 New commit SHA1
1131 """
1132 return self.get_worktree().commit(
1133 message=message,
1134 committer=committer,
1135 author=author,
1136 commit_timestamp=commit_timestamp,
1137 commit_timezone=commit_timezone,
1138 author_timestamp=author_timestamp,
1139 author_timezone=author_timezone,
1140 tree=tree,
1141 encoding=encoding,
1142 ref=ref,
1143 merge_heads=merge_heads,
1144 no_verify=no_verify,
1145 sign=sign,
1146 )
1149def read_gitfile(f: BinaryIO) -> str:
1150 """Read a ``.git`` file.
1152 The first line of the file should start with "gitdir: "
1154 Args:
1155 f: File-like object to read from
1156 Returns: A path
1157 """
1158 cs = f.read()
1159 if not cs.startswith(b"gitdir: "):
1160 raise ValueError("Expected file to start with 'gitdir: '")
1161 return cs[len(b"gitdir: ") :].rstrip(b"\n").decode("utf-8")
1164class UnsupportedVersion(Exception):
1165 """Unsupported repository version."""
1167 def __init__(self, version: int) -> None:
1168 """Initialize UnsupportedVersion exception.
1170 Args:
1171 version: The unsupported repository version
1172 """
1173 self.version = version
1176class UnsupportedExtension(Exception):
1177 """Unsupported repository extension."""
1179 def __init__(self, extension: str) -> None:
1180 """Initialize UnsupportedExtension exception.
1182 Args:
1183 extension: The unsupported repository extension
1184 """
1185 self.extension = extension
1188class Repo(BaseRepo):
1189 """A git repository backed by local disk.
1191 To open an existing repository, call the constructor with
1192 the path of the repository.
1194 To create a new repository, use the Repo.init class method.
1196 Note that a repository object may hold on to resources such
1197 as file handles for performance reasons; call .close() to free
1198 up those resources.
1200 Attributes:
1201 path: Path to the working copy (if it exists) or repository control
1202 directory (if the repository is bare)
1203 bare: Whether this is a bare repository
1204 """
1206 path: str
1207 bare: bool
1208 object_store: DiskObjectStore
1209 filter_context: Optional["FilterContext"]
1211 def __init__(
1212 self,
1213 root: Union[str, bytes, os.PathLike[str]],
1214 object_store: Optional[PackBasedObjectStore] = None,
1215 bare: Optional[bool] = None,
1216 ) -> None:
1217 """Open a repository on disk.
1219 Args:
1220 root: Path to the repository's root.
1221 object_store: ObjectStore to use; if omitted, we use the
1222 repository's default object store
1223 bare: True if this is a bare repository.
1224 """
1225 root = os.fspath(root)
1226 if isinstance(root, bytes):
1227 root = os.fsdecode(root)
1228 hidden_path = os.path.join(root, CONTROLDIR)
1229 if bare is None:
1230 if os.path.isfile(hidden_path) or os.path.isdir(
1231 os.path.join(hidden_path, OBJECTDIR)
1232 ):
1233 bare = False
1234 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(
1235 os.path.join(root, REFSDIR)
1236 ):
1237 bare = True
1238 else:
1239 raise NotGitRepository(
1240 "No git repository was found at {path}".format(**dict(path=root))
1241 )
1243 self.bare = bare
1244 if bare is False:
1245 if os.path.isfile(hidden_path):
1246 with open(hidden_path, "rb") as f:
1247 path = read_gitfile(f)
1248 self._controldir = os.path.join(root, path)
1249 else:
1250 self._controldir = hidden_path
1251 else:
1252 self._controldir = root
1253 commondir = self.get_named_file(COMMONDIR)
1254 if commondir is not None:
1255 with commondir:
1256 self._commondir = os.path.join(
1257 self.controldir(),
1258 os.fsdecode(commondir.read().rstrip(b"\r\n")),
1259 )
1260 else:
1261 self._commondir = self._controldir
1262 self.path = root
1264 # Initialize refs early so they're available for config condition matchers
1265 self.refs = DiskRefsContainer(
1266 self.commondir(), self._controldir, logger=self._write_reflog
1267 )
1269 # Initialize worktrees container
1270 from .worktree import WorkTreeContainer
1272 self.worktrees = WorkTreeContainer(self)
1274 config = self.get_config()
1275 try:
1276 repository_format_version = config.get("core", "repositoryformatversion")
1277 format_version = (
1278 0
1279 if repository_format_version is None
1280 else int(repository_format_version)
1281 )
1282 except KeyError:
1283 format_version = 0
1285 if format_version not in (0, 1):
1286 raise UnsupportedVersion(format_version)
1288 # Track extensions we encounter
1289 has_reftable_extension = False
1290 for extension, value in config.items((b"extensions",)):
1291 if extension.lower() == b"refstorage":
1292 if value == b"reftable":
1293 has_reftable_extension = True
1294 else:
1295 raise UnsupportedExtension(f"refStorage = {value.decode()}")
1296 elif extension.lower() not in (b"worktreeconfig",):
1297 raise UnsupportedExtension(extension.decode("utf-8"))
1299 if object_store is None:
1300 object_store = DiskObjectStore.from_config(
1301 os.path.join(self.commondir(), OBJECTDIR), config
1302 )
1304 # Use reftable if extension is configured
1305 if has_reftable_extension:
1306 from .reftable import ReftableRefsContainer
1308 self.refs = ReftableRefsContainer(self.commondir())
1309 # Update worktrees container after refs change
1310 self.worktrees = WorkTreeContainer(self)
1311 BaseRepo.__init__(self, object_store, self.refs)
1313 self._graftpoints = {}
1314 graft_file = self.get_named_file(
1315 os.path.join("info", "grafts"), basedir=self.commondir()
1316 )
1317 if graft_file:
1318 with graft_file:
1319 self._graftpoints.update(parse_graftpoints(graft_file))
1320 graft_file = self.get_named_file("shallow", basedir=self.commondir())
1321 if graft_file:
1322 with graft_file:
1323 self._graftpoints.update(parse_graftpoints(graft_file))
1325 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir())
1326 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir())
1327 self.hooks["post-commit"] = PostCommitShellHook(self.controldir())
1328 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir())
1330 # Initialize filter context as None, will be created lazily
1331 self.filter_context = None
1333 def get_worktree(self) -> "WorkTree":
1334 """Get the working tree for this repository.
1336 Returns:
1337 WorkTree instance for performing working tree operations
1338 """
1339 from .worktree import WorkTree
1341 return WorkTree(self, self.path)
1343 def _write_reflog(
1344 self,
1345 ref: bytes,
1346 old_sha: bytes,
1347 new_sha: bytes,
1348 committer: Optional[bytes],
1349 timestamp: Optional[int],
1350 timezone: Optional[int],
1351 message: bytes,
1352 ) -> None:
1353 from .reflog import format_reflog_line
1355 path = self._reflog_path(ref)
1356 try:
1357 os.makedirs(os.path.dirname(path))
1358 except FileExistsError:
1359 pass
1360 if committer is None:
1361 config = self.get_config_stack()
1362 committer = get_user_identity(config)
1363 check_user_identity(committer)
1364 if timestamp is None:
1365 timestamp = int(time.time())
1366 if timezone is None:
1367 timezone = 0 # FIXME
1368 with open(path, "ab") as f:
1369 f.write(
1370 format_reflog_line(
1371 old_sha, new_sha, committer, timestamp, timezone, message
1372 )
1373 + b"\n"
1374 )
1376 def _reflog_path(self, ref: bytes) -> str:
1377 if ref.startswith((b"main-worktree/", b"worktrees/")):
1378 raise NotImplementedError(f"refs {ref.decode()} are not supported")
1380 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir()
1381 return os.path.join(base, "logs", os.fsdecode(ref))
1383 def read_reflog(self, ref: bytes) -> Generator[reflog.Entry, None, None]:
1384 """Read reflog entries for a reference.
1386 Args:
1387 ref: Reference name (e.g. b'HEAD', b'refs/heads/master')
1389 Yields:
1390 reflog.Entry objects in chronological order (oldest first)
1391 """
1392 from .reflog import read_reflog
1394 path = self._reflog_path(ref)
1395 try:
1396 with open(path, "rb") as f:
1397 yield from read_reflog(f)
1398 except FileNotFoundError:
1399 return
1401 @classmethod
1402 def discover(cls, start: Union[str, bytes, os.PathLike[str]] = ".") -> "Repo":
1403 """Iterate parent directories to discover a repository.
1405 Return a Repo object for the first parent directory that looks like a
1406 Git repository.
1408 Args:
1409 start: The directory to start discovery from (defaults to '.')
1410 """
1411 path = os.path.abspath(start)
1412 while True:
1413 try:
1414 return cls(path)
1415 except NotGitRepository:
1416 new_path, _tail = os.path.split(path)
1417 if new_path == path: # Root reached
1418 break
1419 path = new_path
1420 start_str = os.fspath(start)
1421 if isinstance(start_str, bytes):
1422 start_str = start_str.decode("utf-8")
1423 raise NotGitRepository(f"No git repository was found at {start_str}")
1425 def controldir(self) -> str:
1426 """Return the path of the control directory."""
1427 return self._controldir
1429 def commondir(self) -> str:
1430 """Return the path of the common directory.
1432 For a main working tree, it is identical to controldir().
1434 For a linked working tree, it is the control directory of the
1435 main working tree.
1436 """
1437 return self._commondir
1439 def _determine_file_mode(self) -> bool:
1440 """Probe the file-system to determine whether permissions can be trusted.
1442 Returns: True if permissions can be trusted, False otherwise.
1443 """
1444 fname = os.path.join(self.path, ".probe-permissions")
1445 with open(fname, "w") as f:
1446 f.write("")
1448 st1 = os.lstat(fname)
1449 try:
1450 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR)
1451 except PermissionError:
1452 return False
1453 st2 = os.lstat(fname)
1455 os.unlink(fname)
1457 mode_differs = st1.st_mode != st2.st_mode
1458 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0
1460 return mode_differs and st2_has_exec
1462 def _determine_symlinks(self) -> bool:
1463 """Probe the filesystem to determine whether symlinks can be created.
1465 Returns: True if symlinks can be created, False otherwise.
1466 """
1467 # TODO(jelmer): Actually probe disk / look at filesystem
1468 return sys.platform != "win32"
1470 def _put_named_file(self, path: str, contents: bytes) -> None:
1471 """Write a file to the control dir with the given name and contents.
1473 Args:
1474 path: The path to the file, relative to the control dir.
1475 contents: A string to write to the file.
1476 """
1477 path = path.lstrip(os.path.sep)
1478 with GitFile(os.path.join(self.controldir(), path), "wb") as f:
1479 f.write(contents)
1481 def _del_named_file(self, path: str) -> None:
1482 try:
1483 os.unlink(os.path.join(self.controldir(), path))
1484 except FileNotFoundError:
1485 return
1487 def get_named_file(
1488 self,
1489 path: Union[str, bytes],
1490 basedir: Optional[str] = None,
1491 ) -> Optional[BinaryIO]:
1492 """Get a file from the control dir with a specific name.
1494 Although the filename should be interpreted as a filename relative to
1495 the control dir in a disk-based Repo, the object returned need not be
1496 pointing to a file in that location.
1498 Args:
1499 path: The path to the file, relative to the control dir.
1500 basedir: Optional argument that specifies an alternative to the
1501 control dir.
1502 Returns: An open file object, or None if the file does not exist.
1503 """
1504 # TODO(dborowitz): sanitize filenames, since this is used directly by
1505 # the dumb web serving code.
1506 if basedir is None:
1507 basedir = self.controldir()
1508 if isinstance(path, bytes):
1509 path = path.decode("utf-8")
1510 path = path.lstrip(os.path.sep)
1511 try:
1512 return open(os.path.join(basedir, path), "rb")
1513 except FileNotFoundError:
1514 return None
1516 def index_path(self) -> str:
1517 """Return path to the index file."""
1518 return os.path.join(self.controldir(), INDEX_FILENAME)
1520 def open_index(self) -> "Index":
1521 """Open the index for this repository.
1523 Raises:
1524 NoIndexPresent: If no index is present
1525 Returns: The matching `Index`
1526 """
1527 from .index import Index
1529 if not self.has_index():
1530 raise NoIndexPresent
1532 # Check for manyFiles feature configuration
1533 config = self.get_config_stack()
1534 many_files = config.get_boolean(b"feature", b"manyFiles", False)
1535 skip_hash = False
1536 index_version = None
1538 if many_files:
1539 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true
1540 try:
1541 index_version_str = config.get(b"index", b"version")
1542 index_version = int(index_version_str)
1543 except KeyError:
1544 index_version = 4 # Default to version 4 for manyFiles
1545 skip_hash = config.get_boolean(b"index", b"skipHash", True)
1546 else:
1547 # Check for explicit index settings
1548 try:
1549 index_version_str = config.get(b"index", b"version")
1550 index_version = int(index_version_str)
1551 except KeyError:
1552 index_version = None
1553 skip_hash = config.get_boolean(b"index", b"skipHash", False)
1555 return Index(self.index_path(), skip_hash=skip_hash, version=index_version)
1557 def has_index(self) -> bool:
1558 """Check if an index is present."""
1559 # Bare repos must never have index files; non-bare repos may have a
1560 # missing index file, which is treated as empty.
1561 return not self.bare
1563 @replace_me(remove_in="0.26.0")
1564 def stage(
1565 self,
1566 fs_paths: Union[
1567 str, bytes, os.PathLike[str], Iterable[Union[str, bytes, os.PathLike[str]]]
1568 ],
1569 ) -> None:
1570 """Stage a set of paths.
1572 Args:
1573 fs_paths: List of paths, relative to the repository path
1574 """
1575 return self.get_worktree().stage(fs_paths)
1577 @replace_me(remove_in="0.26.0")
1578 def unstage(self, fs_paths: Sequence[str]) -> None:
1579 """Unstage specific file in the index.
1581 Args:
1582 fs_paths: a list of files to unstage,
1583 relative to the repository path.
1584 """
1585 return self.get_worktree().unstage(fs_paths)
1587 def clone(
1588 self,
1589 target_path: Union[str, bytes, os.PathLike[str]],
1590 *,
1591 mkdir: bool = True,
1592 bare: bool = False,
1593 origin: bytes = b"origin",
1594 checkout: Optional[bool] = None,
1595 branch: Optional[bytes] = None,
1596 progress: Optional[Callable[[str], None]] = None,
1597 depth: Optional[int] = None,
1598 symlinks: Optional[bool] = None,
1599 ) -> "Repo":
1600 """Clone this repository.
1602 Args:
1603 target_path: Target path
1604 mkdir: Create the target directory
1605 bare: Whether to create a bare repository
1606 checkout: Whether or not to check-out HEAD after cloning
1607 origin: Base name for refs in target repository
1608 cloned from this repository
1609 branch: Optional branch or tag to be used as HEAD in the new repository
1610 instead of this repository's HEAD.
1611 progress: Optional progress function
1612 depth: Depth at which to fetch
1613 symlinks: Symlinks setting (default to autodetect)
1614 Returns: Created repository as `Repo`
1615 """
1616 encoded_path = os.fsencode(self.path)
1618 if mkdir:
1619 os.mkdir(target_path)
1621 try:
1622 if not bare:
1623 target = Repo.init(target_path, symlinks=symlinks)
1624 if checkout is None:
1625 checkout = True
1626 else:
1627 if checkout:
1628 raise ValueError("checkout and bare are incompatible")
1629 target = Repo.init_bare(target_path)
1631 try:
1632 target_config = target.get_config()
1633 target_config.set((b"remote", origin), b"url", encoded_path)
1634 target_config.set(
1635 (b"remote", origin),
1636 b"fetch",
1637 b"+refs/heads/*:refs/remotes/" + origin + b"/*",
1638 )
1639 target_config.write_to_path()
1641 ref_message = b"clone: from " + encoded_path
1642 self.fetch(target, depth=depth)
1643 target.refs.import_refs(
1644 b"refs/remotes/" + origin,
1645 self.refs.as_dict(b"refs/heads"),
1646 message=ref_message,
1647 )
1648 target.refs.import_refs(
1649 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message
1650 )
1652 head_chain, origin_sha = self.refs.follow(b"HEAD")
1653 origin_head = head_chain[-1] if head_chain else None
1654 if origin_sha and not origin_head:
1655 # set detached HEAD
1656 target.refs[b"HEAD"] = origin_sha
1657 else:
1658 _set_origin_head(target.refs, origin, origin_head)
1659 head_ref = _set_default_branch(
1660 target.refs, origin, origin_head, branch, ref_message
1661 )
1663 # Update target head
1664 if head_ref:
1665 head = _set_head(target.refs, head_ref, ref_message)
1666 else:
1667 head = None
1669 if checkout and head is not None:
1670 target.get_worktree().reset_index()
1671 except BaseException:
1672 target.close()
1673 raise
1674 except BaseException:
1675 if mkdir:
1676 import shutil
1678 shutil.rmtree(target_path)
1679 raise
1680 return target
1682 @replace_me(remove_in="0.26.0")
1683 def reset_index(self, tree: Optional[bytes] = None) -> None:
1684 """Reset the index back to a specific tree.
1686 Args:
1687 tree: Tree SHA to reset to, None for current HEAD tree.
1688 """
1689 return self.get_worktree().reset_index(tree)
1691 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]:
1692 """Get condition matchers for includeIf conditions.
1694 Returns a dict of condition prefix to matcher function.
1695 """
1696 from pathlib import Path
1698 from .config import ConditionMatcher, match_glob_pattern
1700 # Add gitdir matchers
1701 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool:
1702 """Match gitdir against a pattern.
1704 Args:
1705 pattern: Pattern to match against
1706 case_sensitive: Whether to match case-sensitively
1708 Returns:
1709 True if gitdir matches pattern
1710 """
1711 # Handle relative patterns (starting with ./)
1712 if pattern.startswith("./"):
1713 # Can't handle relative patterns without config directory context
1714 return False
1716 # Normalize repository path
1717 try:
1718 repo_path = str(Path(self._controldir).resolve())
1719 except (OSError, ValueError):
1720 return False
1722 # Expand ~ in pattern and normalize
1723 pattern = os.path.expanduser(pattern)
1725 # Normalize pattern following Git's rules
1726 pattern = pattern.replace("\\", "/")
1727 if not pattern.startswith(("~/", "./", "/", "**")):
1728 # Check for Windows absolute path
1729 if len(pattern) >= 2 and pattern[1] == ":":
1730 pass
1731 else:
1732 pattern = "**/" + pattern
1733 if pattern.endswith("/"):
1734 pattern = pattern + "**"
1736 # Use the existing _match_gitdir_pattern function
1737 from .config import _match_gitdir_pattern
1739 pattern_bytes = pattern.encode("utf-8", errors="replace")
1740 repo_path_bytes = repo_path.encode("utf-8", errors="replace")
1742 return _match_gitdir_pattern(
1743 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive
1744 )
1746 # Add onbranch matcher
1747 def match_onbranch(pattern: str) -> bool:
1748 """Match current branch against a pattern.
1750 Args:
1751 pattern: Pattern to match against
1753 Returns:
1754 True if current branch matches pattern
1755 """
1756 try:
1757 # Get the current branch using refs
1758 ref_chain, _ = self.refs.follow(b"HEAD")
1759 head_ref = ref_chain[-1] # Get the final resolved ref
1760 except KeyError:
1761 pass
1762 else:
1763 if head_ref and head_ref.startswith(b"refs/heads/"):
1764 # Extract branch name from ref
1765 branch = head_ref[11:].decode("utf-8", errors="replace")
1766 return match_glob_pattern(branch, pattern)
1767 return False
1769 matchers: dict[str, ConditionMatcher] = {
1770 "onbranch:": match_onbranch,
1771 "gitdir:": lambda pattern: match_gitdir(pattern, True),
1772 "gitdir/i:": lambda pattern: match_gitdir(pattern, False),
1773 }
1775 return matchers
1777 def get_worktree_config(self) -> "ConfigFile":
1778 """Get the worktree-specific config.
1780 Returns:
1781 ConfigFile object for the worktree config
1782 """
1783 from .config import ConfigFile
1785 path = os.path.join(self.commondir(), "config.worktree")
1786 try:
1787 # Pass condition matchers for includeIf evaluation
1788 condition_matchers = self._get_config_condition_matchers()
1789 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1790 except FileNotFoundError:
1791 cf = ConfigFile()
1792 cf.path = path
1793 return cf
1795 def get_config(self) -> "ConfigFile":
1796 """Retrieve the config object.
1798 Returns: `ConfigFile` object for the ``.git/config`` file.
1799 """
1800 from .config import ConfigFile
1802 path = os.path.join(self._commondir, "config")
1803 try:
1804 # Pass condition matchers for includeIf evaluation
1805 condition_matchers = self._get_config_condition_matchers()
1806 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1807 except FileNotFoundError:
1808 ret = ConfigFile()
1809 ret.path = path
1810 return ret
1812 def get_rebase_state_manager(self) -> "RebaseStateManager":
1813 """Get the appropriate rebase state manager for this repository.
1815 Returns: DiskRebaseStateManager instance
1816 """
1817 import os
1819 from .rebase import DiskRebaseStateManager
1821 path = os.path.join(self.controldir(), "rebase-merge")
1822 return DiskRebaseStateManager(path)
1824 def get_description(self) -> Optional[bytes]:
1825 """Retrieve the description of this repository.
1827 Returns: Description as bytes or None.
1828 """
1829 path = os.path.join(self._controldir, "description")
1830 try:
1831 with GitFile(path, "rb") as f:
1832 return f.read()
1833 except FileNotFoundError:
1834 return None
1836 def __repr__(self) -> str:
1837 """Return string representation of this repository."""
1838 return f"<Repo at {self.path!r}>"
1840 def set_description(self, description: bytes) -> None:
1841 """Set the description for this repository.
1843 Args:
1844 description: Text to set as description for this repository.
1845 """
1846 self._put_named_file("description", description)
1848 @classmethod
1849 def _init_maybe_bare(
1850 cls,
1851 path: Union[str, bytes, os.PathLike[str]],
1852 controldir: Union[str, bytes, os.PathLike[str]],
1853 bare: bool,
1854 object_store: Optional[PackBasedObjectStore] = None,
1855 config: Optional["StackedConfig"] = None,
1856 default_branch: Optional[bytes] = None,
1857 symlinks: Optional[bool] = None,
1858 format: Optional[int] = None,
1859 ) -> "Repo":
1860 path = os.fspath(path)
1861 if isinstance(path, bytes):
1862 path = os.fsdecode(path)
1863 controldir = os.fspath(controldir)
1864 if isinstance(controldir, bytes):
1865 controldir = os.fsdecode(controldir)
1866 for d in BASE_DIRECTORIES:
1867 os.mkdir(os.path.join(controldir, *d))
1868 if object_store is None:
1869 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR))
1870 ret = cls(path, bare=bare, object_store=object_store)
1871 if default_branch is None:
1872 if config is None:
1873 from .config import StackedConfig
1875 config = StackedConfig.default()
1876 try:
1877 default_branch = config.get("init", "defaultBranch")
1878 except KeyError:
1879 default_branch = DEFAULT_BRANCH
1880 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch)
1881 ret._init_files(bare=bare, symlinks=symlinks, format=format)
1882 return ret
1884 @classmethod
1885 def init(
1886 cls,
1887 path: Union[str, bytes, os.PathLike[str]],
1888 *,
1889 mkdir: bool = False,
1890 config: Optional["StackedConfig"] = None,
1891 default_branch: Optional[bytes] = None,
1892 symlinks: Optional[bool] = None,
1893 format: Optional[int] = None,
1894 ) -> "Repo":
1895 """Create a new repository.
1897 Args:
1898 path: Path in which to create the repository
1899 mkdir: Whether to create the directory
1900 config: Configuration object
1901 default_branch: Default branch name
1902 symlinks: Whether to support symlinks
1903 format: Repository format version (defaults to 0)
1904 Returns: `Repo` instance
1905 """
1906 path = os.fspath(path)
1907 if isinstance(path, bytes):
1908 path = os.fsdecode(path)
1909 if mkdir:
1910 os.mkdir(path)
1911 controldir = os.path.join(path, CONTROLDIR)
1912 os.mkdir(controldir)
1913 _set_filesystem_hidden(controldir)
1914 return cls._init_maybe_bare(
1915 path,
1916 controldir,
1917 False,
1918 config=config,
1919 default_branch=default_branch,
1920 symlinks=symlinks,
1921 format=format,
1922 )
1924 @classmethod
1925 def _init_new_working_directory(
1926 cls,
1927 path: Union[str, bytes, os.PathLike[str]],
1928 main_repo: "Repo",
1929 identifier: Optional[str] = None,
1930 mkdir: bool = False,
1931 ) -> "Repo":
1932 """Create a new working directory linked to a repository.
1934 Args:
1935 path: Path in which to create the working tree.
1936 main_repo: Main repository to reference
1937 identifier: Worktree identifier
1938 mkdir: Whether to create the directory
1939 Returns: `Repo` instance
1940 """
1941 path = os.fspath(path)
1942 if isinstance(path, bytes):
1943 path = os.fsdecode(path)
1944 if mkdir:
1945 os.mkdir(path)
1946 if identifier is None:
1947 identifier = os.path.basename(path)
1948 # Ensure we use absolute path for the worktree control directory
1949 main_controldir = os.path.abspath(main_repo.controldir())
1950 main_worktreesdir = os.path.join(main_controldir, WORKTREES)
1951 worktree_controldir = os.path.join(main_worktreesdir, identifier)
1952 gitdirfile = os.path.join(path, CONTROLDIR)
1953 with open(gitdirfile, "wb") as f:
1954 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n")
1955 try:
1956 os.mkdir(main_worktreesdir)
1957 except FileExistsError:
1958 pass
1959 try:
1960 os.mkdir(worktree_controldir)
1961 except FileExistsError:
1962 pass
1963 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f:
1964 f.write(os.fsencode(gitdirfile) + b"\n")
1965 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f:
1966 f.write(b"../..\n")
1967 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f:
1968 f.write(main_repo.head() + b"\n")
1969 r = cls(os.path.normpath(path))
1970 r.get_worktree().reset_index()
1971 return r
1973 @classmethod
1974 def init_bare(
1975 cls,
1976 path: Union[str, bytes, os.PathLike[str]],
1977 *,
1978 mkdir: bool = False,
1979 object_store: Optional[PackBasedObjectStore] = None,
1980 config: Optional["StackedConfig"] = None,
1981 default_branch: Optional[bytes] = None,
1982 format: Optional[int] = None,
1983 ) -> "Repo":
1984 """Create a new bare repository.
1986 ``path`` should already exist and be an empty directory.
1988 Args:
1989 path: Path to create bare repository in
1990 mkdir: Whether to create the directory
1991 object_store: Object store to use
1992 config: Configuration object
1993 default_branch: Default branch name
1994 format: Repository format version (defaults to 0)
1995 Returns: a `Repo` instance
1996 """
1997 path = os.fspath(path)
1998 if isinstance(path, bytes):
1999 path = os.fsdecode(path)
2000 if mkdir:
2001 os.mkdir(path)
2002 return cls._init_maybe_bare(
2003 path,
2004 path,
2005 True,
2006 object_store=object_store,
2007 config=config,
2008 default_branch=default_branch,
2009 format=format,
2010 )
2012 create = init_bare
2014 def close(self) -> None:
2015 """Close any files opened by this repository."""
2016 self.object_store.close()
2017 # Clean up filter context if it was created
2018 if self.filter_context is not None:
2019 self.filter_context.close()
2020 self.filter_context = None
2022 def __enter__(self) -> "Repo":
2023 """Enter context manager."""
2024 return self
2026 def __exit__(
2027 self,
2028 exc_type: Optional[type[BaseException]],
2029 exc_val: Optional[BaseException],
2030 exc_tb: Optional[TracebackType],
2031 ) -> None:
2032 """Exit context manager and close repository."""
2033 self.close()
2035 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]:
2036 """Read .gitattributes file from working tree.
2038 Returns:
2039 Dictionary mapping file patterns to attributes
2040 """
2041 gitattributes = {}
2042 gitattributes_path = os.path.join(self.path, ".gitattributes")
2044 if os.path.exists(gitattributes_path):
2045 with open(gitattributes_path, "rb") as f:
2046 for line in f:
2047 line = line.strip()
2048 if not line or line.startswith(b"#"):
2049 continue
2051 parts = line.split()
2052 if len(parts) < 2:
2053 continue
2055 pattern = parts[0]
2056 attrs = {}
2058 for attr in parts[1:]:
2059 if attr.startswith(b"-"):
2060 # Unset attribute
2061 attrs[attr[1:]] = b"false"
2062 elif b"=" in attr:
2063 # Set to value
2064 key, value = attr.split(b"=", 1)
2065 attrs[key] = value
2066 else:
2067 # Set attribute
2068 attrs[attr] = b"true"
2070 gitattributes[pattern] = attrs
2072 return gitattributes
2074 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
2075 """Return a BlobNormalizer object."""
2076 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2078 # Get fresh configuration and GitAttributes
2079 config_stack = self.get_config_stack()
2080 git_attributes = self.get_gitattributes()
2082 # Lazily create FilterContext if needed
2083 if self.filter_context is None:
2084 filter_registry = FilterRegistry(config_stack, self)
2085 self.filter_context = FilterContext(filter_registry)
2086 else:
2087 # Refresh the context with current config to handle config changes
2088 self.filter_context.refresh_config(config_stack)
2090 # Return a new FilterBlobNormalizer with the context
2091 return FilterBlobNormalizer(
2092 config_stack, git_attributes, filter_context=self.filter_context
2093 )
2095 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
2096 """Read gitattributes for the repository.
2098 Args:
2099 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
2101 Returns:
2102 GitAttributes object that can be used to match paths
2103 """
2104 from .attrs import (
2105 GitAttributes,
2106 Pattern,
2107 parse_git_attributes,
2108 )
2110 patterns = []
2112 # Read system gitattributes (TODO: implement this)
2113 # Read global gitattributes (TODO: implement this)
2115 # Read repository .gitattributes from index/tree
2116 if tree is None:
2117 try:
2118 # Try to get from HEAD
2119 head = self[b"HEAD"]
2120 if isinstance(head, Tag):
2121 _cls, obj = head.object
2122 head = self.get_object(obj)
2123 assert isinstance(head, Commit)
2124 tree = head.tree
2125 except KeyError:
2126 # No HEAD, no attributes from tree
2127 pass
2129 if tree is not None:
2130 try:
2131 tree_obj = self[tree]
2132 assert isinstance(tree_obj, Tree)
2133 if b".gitattributes" in tree_obj:
2134 _, attrs_sha = tree_obj[b".gitattributes"]
2135 attrs_blob = self[attrs_sha]
2136 if isinstance(attrs_blob, Blob):
2137 attrs_data = BytesIO(attrs_blob.data)
2138 for pattern_bytes, attrs in parse_git_attributes(attrs_data):
2139 pattern = Pattern(pattern_bytes)
2140 patterns.append((pattern, attrs))
2141 except (KeyError, NotTreeError):
2142 pass
2144 # Read .git/info/attributes
2145 info_attrs_path = os.path.join(self.controldir(), "info", "attributes")
2146 if os.path.exists(info_attrs_path):
2147 with open(info_attrs_path, "rb") as f:
2148 for pattern_bytes, attrs in parse_git_attributes(f):
2149 pattern = Pattern(pattern_bytes)
2150 patterns.append((pattern, attrs))
2152 # Read .gitattributes from working directory (if it exists)
2153 working_attrs_path = os.path.join(self.path, ".gitattributes")
2154 if os.path.exists(working_attrs_path):
2155 with open(working_attrs_path, "rb") as f:
2156 for pattern_bytes, attrs in parse_git_attributes(f):
2157 pattern = Pattern(pattern_bytes)
2158 patterns.append((pattern, attrs))
2160 return GitAttributes(patterns)
2162 @replace_me(remove_in="0.26.0")
2163 def _sparse_checkout_file_path(self) -> str:
2164 """Return the path of the sparse-checkout file in this repo's control dir."""
2165 return self.get_worktree()._sparse_checkout_file_path()
2167 @replace_me(remove_in="0.26.0")
2168 def configure_for_cone_mode(self) -> None:
2169 """Ensure the repository is configured for cone-mode sparse-checkout."""
2170 return self.get_worktree().configure_for_cone_mode()
2172 @replace_me(remove_in="0.26.0")
2173 def infer_cone_mode(self) -> bool:
2174 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
2175 return self.get_worktree().infer_cone_mode()
2177 @replace_me(remove_in="0.26.0")
2178 def get_sparse_checkout_patterns(self) -> list[str]:
2179 """Return a list of sparse-checkout patterns from info/sparse-checkout.
2181 Returns:
2182 A list of patterns. Returns an empty list if the file is missing.
2183 """
2184 return self.get_worktree().get_sparse_checkout_patterns()
2186 @replace_me(remove_in="0.26.0")
2187 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None:
2188 """Write the given sparse-checkout patterns into info/sparse-checkout.
2190 Creates the info/ directory if it does not exist.
2192 Args:
2193 patterns: A list of gitignore-style patterns to store.
2194 """
2195 return self.get_worktree().set_sparse_checkout_patterns(patterns)
2197 @replace_me(remove_in="0.26.0")
2198 def set_cone_mode_patterns(self, dirs: Union[Sequence[str], None] = None) -> None:
2199 """Write the given cone-mode directory patterns into info/sparse-checkout.
2201 For each directory to include, add an inclusion line that "undoes" the prior
2202 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
2203 Never add the same line twice.
2204 """
2205 return self.get_worktree().set_cone_mode_patterns(dirs)
2208class MemoryRepo(BaseRepo):
2209 """Repo that stores refs, objects, and named files in memory.
2211 MemoryRepos are always bare: they have no working tree and no index, since
2212 those have a stronger dependency on the filesystem.
2213 """
2215 filter_context: Optional["FilterContext"]
2217 def __init__(self) -> None:
2218 """Create a new repository in memory."""
2219 from .config import ConfigFile
2221 self._reflog: list[Any] = []
2222 refs_container = DictRefsContainer({}, logger=self._append_reflog)
2223 BaseRepo.__init__(self, MemoryObjectStore(), refs_container)
2224 self._named_files: dict[str, bytes] = {}
2225 self.bare = True
2226 self._config = ConfigFile()
2227 self._description: Optional[bytes] = None
2228 self.filter_context = None
2230 def _append_reflog(
2231 self,
2232 ref: bytes,
2233 old_sha: Optional[bytes],
2234 new_sha: Optional[bytes],
2235 committer: Optional[bytes],
2236 timestamp: Optional[int],
2237 timezone: Optional[int],
2238 message: Optional[bytes],
2239 ) -> None:
2240 self._reflog.append(
2241 (ref, old_sha, new_sha, committer, timestamp, timezone, message)
2242 )
2244 def set_description(self, description: bytes) -> None:
2245 """Set the description for this repository.
2247 Args:
2248 description: Text to set as description
2249 """
2250 self._description = description
2252 def get_description(self) -> Optional[bytes]:
2253 """Get the description of this repository.
2255 Returns:
2256 Repository description as bytes
2257 """
2258 return self._description
2260 def _determine_file_mode(self) -> bool:
2261 """Probe the file-system to determine whether permissions can be trusted.
2263 Returns: True if permissions can be trusted, False otherwise.
2264 """
2265 return sys.platform != "win32"
2267 def _determine_symlinks(self) -> bool:
2268 """Probe the file-system to determine whether permissions can be trusted.
2270 Returns: True if permissions can be trusted, False otherwise.
2271 """
2272 return sys.platform != "win32"
2274 def _put_named_file(self, path: str, contents: bytes) -> None:
2275 """Write a file to the control dir with the given name and contents.
2277 Args:
2278 path: The path to the file, relative to the control dir.
2279 contents: A string to write to the file.
2280 """
2281 self._named_files[path] = contents
2283 def _del_named_file(self, path: str) -> None:
2284 try:
2285 del self._named_files[path]
2286 except KeyError:
2287 pass
2289 def get_named_file(
2290 self,
2291 path: Union[str, bytes],
2292 basedir: Optional[str] = None,
2293 ) -> Optional[BytesIO]:
2294 """Get a file from the control dir with a specific name.
2296 Although the filename should be interpreted as a filename relative to
2297 the control dir in a disk-baked Repo, the object returned need not be
2298 pointing to a file in that location.
2300 Args:
2301 path: The path to the file, relative to the control dir.
2302 basedir: Optional base directory for the path
2303 Returns: An open file object, or None if the file does not exist.
2304 """
2305 path_str = path.decode() if isinstance(path, bytes) else path
2306 contents = self._named_files.get(path_str, None)
2307 if contents is None:
2308 return None
2309 return BytesIO(contents)
2311 def open_index(self) -> "Index":
2312 """Fail to open index for this repo, since it is bare.
2314 Raises:
2315 NoIndexPresent: Raised when no index is present
2316 """
2317 raise NoIndexPresent
2319 def get_config(self) -> "ConfigFile":
2320 """Retrieve the config object.
2322 Returns: `ConfigFile` object.
2323 """
2324 return self._config
2326 def get_rebase_state_manager(self) -> "RebaseStateManager":
2327 """Get the appropriate rebase state manager for this repository.
2329 Returns: MemoryRebaseStateManager instance
2330 """
2331 from .rebase import MemoryRebaseStateManager
2333 return MemoryRebaseStateManager(self)
2335 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
2336 """Return a BlobNormalizer object for checkin/checkout operations."""
2337 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2339 # Get fresh configuration and GitAttributes
2340 config_stack = self.get_config_stack()
2341 git_attributes = self.get_gitattributes()
2343 # Lazily create FilterContext if needed
2344 if self.filter_context is None:
2345 filter_registry = FilterRegistry(config_stack, self)
2346 self.filter_context = FilterContext(filter_registry)
2347 else:
2348 # Refresh the context with current config to handle config changes
2349 self.filter_context.refresh_config(config_stack)
2351 # Return a new FilterBlobNormalizer with the context
2352 return FilterBlobNormalizer(
2353 config_stack, git_attributes, filter_context=self.filter_context
2354 )
2356 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
2357 """Read gitattributes for the repository."""
2358 from .attrs import GitAttributes
2360 # Memory repos don't have working trees or gitattributes files
2361 # Return empty GitAttributes
2362 return GitAttributes([])
2364 def close(self) -> None:
2365 """Close any resources opened by this repository."""
2366 # Clean up filter context if it was created
2367 if self.filter_context is not None:
2368 self.filter_context.close()
2369 self.filter_context = None
2371 def do_commit(
2372 self,
2373 message: Optional[bytes] = None,
2374 committer: Optional[bytes] = None,
2375 author: Optional[bytes] = None,
2376 commit_timestamp: Optional[float] = None,
2377 commit_timezone: Optional[int] = None,
2378 author_timestamp: Optional[float] = None,
2379 author_timezone: Optional[int] = None,
2380 tree: Optional[ObjectID] = None,
2381 encoding: Optional[bytes] = None,
2382 ref: Optional[Ref] = b"HEAD",
2383 merge_heads: Optional[list[ObjectID]] = None,
2384 no_verify: bool = False,
2385 sign: bool = False,
2386 ) -> bytes:
2387 """Create a new commit.
2389 This is a simplified implementation for in-memory repositories that
2390 doesn't support worktree operations or hooks.
2392 Args:
2393 message: Commit message
2394 committer: Committer fullname
2395 author: Author fullname
2396 commit_timestamp: Commit timestamp (defaults to now)
2397 commit_timezone: Commit timestamp timezone (defaults to GMT)
2398 author_timestamp: Author timestamp (defaults to commit timestamp)
2399 author_timezone: Author timestamp timezone (defaults to commit timezone)
2400 tree: SHA1 of the tree root to use
2401 encoding: Encoding
2402 ref: Optional ref to commit to (defaults to current branch).
2403 If None, creates a dangling commit without updating any ref.
2404 merge_heads: Merge heads
2405 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo)
2406 sign: GPG Sign the commit (ignored for MemoryRepo)
2408 Returns:
2409 New commit SHA1
2410 """
2411 import time
2413 from .objects import Commit
2415 if tree is None:
2416 raise ValueError("tree must be specified for MemoryRepo")
2418 c = Commit()
2419 if len(tree) != 40:
2420 raise ValueError("tree must be a 40-byte hex sha string")
2421 c.tree = tree
2423 config = self.get_config_stack()
2424 if merge_heads is None:
2425 merge_heads = []
2426 if committer is None:
2427 committer = get_user_identity(config, kind="COMMITTER")
2428 check_user_identity(committer)
2429 c.committer = committer
2430 if commit_timestamp is None:
2431 commit_timestamp = time.time()
2432 c.commit_time = int(commit_timestamp)
2433 if commit_timezone is None:
2434 commit_timezone = 0
2435 c.commit_timezone = commit_timezone
2436 if author is None:
2437 author = get_user_identity(config, kind="AUTHOR")
2438 c.author = author
2439 check_user_identity(author)
2440 if author_timestamp is None:
2441 author_timestamp = commit_timestamp
2442 c.author_time = int(author_timestamp)
2443 if author_timezone is None:
2444 author_timezone = commit_timezone
2445 c.author_timezone = author_timezone
2446 if encoding is None:
2447 try:
2448 encoding = config.get(("i18n",), "commitEncoding")
2449 except KeyError:
2450 pass
2451 if encoding is not None:
2452 c.encoding = encoding
2454 # Handle message (for MemoryRepo, we don't support callable messages)
2455 if callable(message):
2456 message = message(self, c)
2457 if message is None:
2458 raise ValueError("Message callback returned None")
2460 if message is None:
2461 raise ValueError("No commit message specified")
2463 c.message = message
2465 if ref is None:
2466 # Create a dangling commit
2467 c.parents = merge_heads
2468 self.object_store.add_object(c)
2469 else:
2470 try:
2471 old_head = self.refs[ref]
2472 c.parents = [old_head, *merge_heads]
2473 self.object_store.add_object(c)
2474 ok = self.refs.set_if_equals(
2475 ref,
2476 old_head,
2477 c.id,
2478 message=b"commit: " + message,
2479 committer=committer,
2480 timestamp=int(commit_timestamp),
2481 timezone=commit_timezone,
2482 )
2483 except KeyError:
2484 c.parents = merge_heads
2485 self.object_store.add_object(c)
2486 ok = self.refs.add_if_new(
2487 ref,
2488 c.id,
2489 message=b"commit: " + message,
2490 committer=committer,
2491 timestamp=int(commit_timestamp),
2492 timezone=commit_timezone,
2493 )
2494 if not ok:
2495 from .errors import CommitError
2497 raise CommitError(f"{ref!r} changed during commit")
2499 return c.id
2501 @classmethod
2502 def init_bare(
2503 cls,
2504 objects: Iterable[ShaFile],
2505 refs: Mapping[bytes, bytes],
2506 format: Optional[int] = None,
2507 ) -> "MemoryRepo":
2508 """Create a new bare repository in memory.
2510 Args:
2511 objects: Objects for the new repository,
2512 as iterable
2513 refs: Refs as dictionary, mapping names
2514 to object SHA1s
2515 format: Repository format version (defaults to 0)
2516 """
2517 ret = cls()
2518 for obj in objects:
2519 ret.object_store.add_object(obj)
2520 for refname, sha in refs.items():
2521 ret.refs.add_if_new(refname, sha)
2522 ret._init_files(bare=True, format=format)
2523 return ret