Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# repo.py -- For dealing with git repositories.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
24"""Repository access.
26This module contains the base class for git repositories
27(BaseRepo) and an implementation which uses a repository on
28local disk (Repo).
30"""
32import os
33import stat
34import sys
35import time
36import warnings
37from collections.abc import Generator, Iterable, Iterator, Mapping, Sequence
38from io import BytesIO
39from types import TracebackType
40from typing import (
41 TYPE_CHECKING,
42 Any,
43 BinaryIO,
44 Callable,
45 Optional,
46 TypeVar,
47 Union,
48)
50if TYPE_CHECKING:
51 # There are no circular imports here, but we try to defer imports as long
52 # as possible to reduce start-up time for anything that doesn't need
53 # these imports.
54 from .attrs import GitAttributes
55 from .config import ConditionMatcher, ConfigFile, StackedConfig
56 from .diff_tree import RenameDetector
57 from .filters import FilterBlobNormalizer, FilterContext
58 from .index import Index
59 from .notes import Notes
60 from .object_store import BaseObjectStore, GraphWalker
61 from .pack import UnpackedObject
62 from .rebase import RebaseStateManager
63 from .walk import Walker
64 from .worktree import WorkTree
66from . import reflog, replace_me
67from .errors import (
68 NoIndexPresent,
69 NotBlobError,
70 NotCommitError,
71 NotGitRepository,
72 NotTagError,
73 NotTreeError,
74 RefFormatError,
75)
76from .file import GitFile
77from .hooks import (
78 CommitMsgShellHook,
79 Hook,
80 PostCommitShellHook,
81 PostReceiveShellHook,
82 PreCommitShellHook,
83)
84from .object_store import (
85 DiskObjectStore,
86 MemoryObjectStore,
87 MissingObjectFinder,
88 ObjectStoreGraphWalker,
89 PackBasedObjectStore,
90 PackCapableObjectStore,
91 find_shallow,
92 peel_sha,
93)
94from .objects import (
95 Blob,
96 Commit,
97 ObjectID,
98 ShaFile,
99 Tag,
100 Tree,
101 check_hexsha,
102 valid_hexsha,
103)
104from .pack import generate_unpacked_objects
105from .refs import (
106 ANNOTATED_TAG_SUFFIX, # noqa: F401
107 LOCAL_TAG_PREFIX, # noqa: F401
108 SYMREF, # noqa: F401
109 DictRefsContainer,
110 DiskRefsContainer,
111 InfoRefsContainer, # noqa: F401
112 Ref,
113 RefsContainer,
114 _set_default_branch,
115 _set_head,
116 _set_origin_head,
117 check_ref_format, # noqa: F401
118 extract_branch_name,
119 is_per_worktree_ref,
120 local_branch_name,
121 read_packed_refs, # noqa: F401
122 read_packed_refs_with_peeled, # noqa: F401
123 serialize_refs,
124 write_packed_refs, # noqa: F401
125)
127CONTROLDIR = ".git"
128OBJECTDIR = "objects"
129DEFAULT_OFS_DELTA = True
131T = TypeVar("T", bound="ShaFile")
132REFSDIR = "refs"
133REFSDIR_TAGS = "tags"
134REFSDIR_HEADS = "heads"
135INDEX_FILENAME = "index"
136COMMONDIR = "commondir"
137GITDIR = "gitdir"
138WORKTREES = "worktrees"
140BASE_DIRECTORIES = [
141 ["branches"],
142 [REFSDIR],
143 [REFSDIR, REFSDIR_TAGS],
144 [REFSDIR, REFSDIR_HEADS],
145 ["hooks"],
146 ["info"],
147]
149DEFAULT_BRANCH = b"master"
152class InvalidUserIdentity(Exception):
153 """User identity is not of the format 'user <email>'."""
155 def __init__(self, identity: str) -> None:
156 """Initialize InvalidUserIdentity exception."""
157 self.identity = identity
160class DefaultIdentityNotFound(Exception):
161 """Default identity could not be determined."""
164# TODO(jelmer): Cache?
165def _get_default_identity() -> tuple[str, str]:
166 import socket
168 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
169 username = os.environ.get(name)
170 if username:
171 break
172 else:
173 username = None
175 try:
176 import pwd
177 except ImportError:
178 fullname = None
179 else:
180 try:
181 entry = pwd.getpwuid(os.getuid()) # type: ignore[attr-defined,unused-ignore]
182 except KeyError:
183 fullname = None
184 else:
185 if getattr(entry, "gecos", None):
186 fullname = entry.pw_gecos.split(",")[0]
187 else:
188 fullname = None
189 if username is None:
190 username = entry.pw_name
191 if not fullname:
192 if username is None:
193 raise DefaultIdentityNotFound("no username found")
194 fullname = username
195 email = os.environ.get("EMAIL")
196 if email is None:
197 if username is None:
198 raise DefaultIdentityNotFound("no username found")
199 email = f"{username}@{socket.gethostname()}"
200 return (fullname, email)
203def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes:
204 """Determine the identity to use for new commits.
206 If kind is set, this first checks
207 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL.
209 If those variables are not set, then it will fall back
210 to reading the user.name and user.email settings from
211 the specified configuration.
213 If that also fails, then it will fall back to using
214 the current users' identity as obtained from the host
215 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f).
217 Args:
218 config: Configuration stack to read from
219 kind: Optional kind to return identity for,
220 usually either "AUTHOR" or "COMMITTER".
222 Returns:
223 A user identity
224 """
225 user: Optional[bytes] = None
226 email: Optional[bytes] = None
227 if kind:
228 user_uc = os.environ.get("GIT_" + kind + "_NAME")
229 if user_uc is not None:
230 user = user_uc.encode("utf-8")
231 email_uc = os.environ.get("GIT_" + kind + "_EMAIL")
232 if email_uc is not None:
233 email = email_uc.encode("utf-8")
234 if user is None:
235 try:
236 user = config.get(("user",), "name")
237 except KeyError:
238 user = None
239 if email is None:
240 try:
241 email = config.get(("user",), "email")
242 except KeyError:
243 email = None
244 default_user, default_email = _get_default_identity()
245 if user is None:
246 user = default_user.encode("utf-8")
247 if email is None:
248 email = default_email.encode("utf-8")
249 if email.startswith(b"<") and email.endswith(b">"):
250 email = email[1:-1]
251 return user + b" <" + email + b">"
254def check_user_identity(identity: bytes) -> None:
255 """Verify that a user identity is formatted correctly.
257 Args:
258 identity: User identity bytestring
259 Raises:
260 InvalidUserIdentity: Raised when identity is invalid
261 """
262 try:
263 _fst, snd = identity.split(b" <", 1)
264 except ValueError as exc:
265 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc
266 if b">" not in snd:
267 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
268 if b"\0" in identity or b"\n" in identity:
269 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
272def parse_graftpoints(
273 graftpoints: Iterable[bytes],
274) -> dict[bytes, list[bytes]]:
275 """Convert a list of graftpoints into a dict.
277 Args:
278 graftpoints: Iterator of graftpoint lines
280 Each line is formatted as:
281 <commit sha1> <parent sha1> [<parent sha1>]*
283 Resulting dictionary is:
284 <commit sha1>: [<parent sha1>*]
286 https://git.wiki.kernel.org/index.php/GraftPoint
287 """
288 grafts = {}
289 for line in graftpoints:
290 raw_graft = line.split(None, 1)
292 commit = raw_graft[0]
293 if len(raw_graft) == 2:
294 parents = raw_graft[1].split()
295 else:
296 parents = []
298 for sha in [commit, *parents]:
299 check_hexsha(sha, "Invalid graftpoint")
301 grafts[commit] = parents
302 return grafts
305def serialize_graftpoints(graftpoints: Mapping[bytes, Sequence[bytes]]) -> bytes:
306 """Convert a dictionary of grafts into string.
308 The graft dictionary is:
309 <commit sha1>: [<parent sha1>*]
311 Each line is formatted as:
312 <commit sha1> <parent sha1> [<parent sha1>]*
314 https://git.wiki.kernel.org/index.php/GraftPoint
316 """
317 graft_lines = []
318 for commit, parents in graftpoints.items():
319 if parents:
320 graft_lines.append(commit + b" " + b" ".join(parents))
321 else:
322 graft_lines.append(commit)
323 return b"\n".join(graft_lines)
326def _set_filesystem_hidden(path: str) -> None:
327 """Mark path as to be hidden if supported by platform and filesystem.
329 On win32 uses SetFileAttributesW api:
330 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw>
331 """
332 if sys.platform == "win32":
333 import ctypes
334 from ctypes.wintypes import BOOL, DWORD, LPCWSTR
336 FILE_ATTRIBUTE_HIDDEN = 2
337 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)(
338 ("SetFileAttributesW", ctypes.windll.kernel32)
339 )
341 if isinstance(path, bytes):
342 path = os.fsdecode(path)
343 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN):
344 pass # Could raise or log `ctypes.WinError()` here
346 # Could implement other platform specific filesystem hiding here
349class ParentsProvider:
350 """Provider for commit parent information."""
352 def __init__(
353 self,
354 store: "BaseObjectStore",
355 grafts: dict[bytes, list[bytes]] = {},
356 shallows: Iterable[bytes] = [],
357 ) -> None:
358 """Initialize ParentsProvider.
360 Args:
361 store: Object store to use
362 grafts: Graft information
363 shallows: Shallow commit SHAs
364 """
365 self.store = store
366 self.grafts = grafts
367 self.shallows = set(shallows)
369 # Get commit graph once at initialization for performance
370 self.commit_graph = store.get_commit_graph()
372 def get_parents(
373 self, commit_id: bytes, commit: Optional[Commit] = None
374 ) -> list[bytes]:
375 """Get parents for a commit using the parents provider."""
376 try:
377 return self.grafts[commit_id]
378 except KeyError:
379 pass
380 if commit_id in self.shallows:
381 return []
383 # Try to use commit graph for faster parent lookup
384 if self.commit_graph:
385 parents = self.commit_graph.get_parents(commit_id)
386 if parents is not None:
387 return parents
389 # Fallback to reading the commit object
390 if commit is None:
391 obj = self.store[commit_id]
392 assert isinstance(obj, Commit)
393 commit = obj
394 parents = commit.parents
395 assert isinstance(parents, list)
396 return parents
399class BaseRepo:
400 """Base class for a git repository.
402 This base class is meant to be used for Repository implementations that e.g.
403 work on top of a different transport than a standard filesystem path.
405 Attributes:
406 object_store: Dictionary-like object for accessing
407 the objects
408 refs: Dictionary-like object with the refs in this
409 repository
410 """
412 def __init__(
413 self, object_store: "PackCapableObjectStore", refs: RefsContainer
414 ) -> None:
415 """Open a repository.
417 This shouldn't be called directly, but rather through one of the
418 base classes, such as MemoryRepo or Repo.
420 Args:
421 object_store: Object store to use
422 refs: Refs container to use
423 """
424 self.object_store = object_store
425 self.refs = refs
427 self._graftpoints: dict[bytes, list[bytes]] = {}
428 self.hooks: dict[str, Hook] = {}
430 def _determine_file_mode(self) -> bool:
431 """Probe the file-system to determine whether permissions can be trusted.
433 Returns: True if permissions can be trusted, False otherwise.
434 """
435 raise NotImplementedError(self._determine_file_mode)
437 def _determine_symlinks(self) -> bool:
438 """Probe the filesystem to determine whether symlinks can be created.
440 Returns: True if symlinks can be created, False otherwise.
441 """
442 # For now, just mimic the old behaviour
443 return sys.platform != "win32"
445 def _init_files(
446 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None
447 ) -> None:
448 """Initialize a default set of named files."""
449 from .config import ConfigFile
451 self._put_named_file("description", b"Unnamed repository")
452 f = BytesIO()
453 cf = ConfigFile()
454 if format is None:
455 format = 0
456 if format not in (0, 1):
457 raise ValueError(f"Unsupported repository format version: {format}")
458 cf.set("core", "repositoryformatversion", str(format))
459 if self._determine_file_mode():
460 cf.set("core", "filemode", True)
461 else:
462 cf.set("core", "filemode", False)
464 if symlinks is None and not bare:
465 symlinks = self._determine_symlinks()
467 if symlinks is False:
468 cf.set("core", "symlinks", symlinks)
470 cf.set("core", "bare", bare)
471 cf.set("core", "logallrefupdates", True)
472 cf.write_to_file(f)
473 self._put_named_file("config", f.getvalue())
474 self._put_named_file(os.path.join("info", "exclude"), b"")
476 def get_named_file(self, path: str) -> Optional[BinaryIO]:
477 """Get a file from the control dir with a specific name.
479 Although the filename should be interpreted as a filename relative to
480 the control dir in a disk-based Repo, the object returned need not be
481 pointing to a file in that location.
483 Args:
484 path: The path to the file, relative to the control dir.
485 Returns: An open file object, or None if the file does not exist.
486 """
487 raise NotImplementedError(self.get_named_file)
489 def _put_named_file(self, path: str, contents: bytes) -> None:
490 """Write a file to the control dir with the given name and contents.
492 Args:
493 path: The path to the file, relative to the control dir.
494 contents: A string to write to the file.
495 """
496 raise NotImplementedError(self._put_named_file)
498 def _del_named_file(self, path: str) -> None:
499 """Delete a file in the control directory with the given name."""
500 raise NotImplementedError(self._del_named_file)
502 def open_index(self) -> "Index":
503 """Open the index for this repository.
505 Raises:
506 NoIndexPresent: If no index is present
507 Returns: The matching `Index`
508 """
509 raise NotImplementedError(self.open_index)
511 def fetch(
512 self,
513 target: "BaseRepo",
514 determine_wants: Optional[
515 Callable[[Mapping[bytes, bytes], Optional[int]], list[bytes]]
516 ] = None,
517 progress: Optional[Callable[..., None]] = None,
518 depth: Optional[int] = None,
519 ) -> dict[bytes, bytes]:
520 """Fetch objects into another repository.
522 Args:
523 target: The target repository
524 determine_wants: Optional function to determine what refs to
525 fetch.
526 progress: Optional progress function
527 depth: Optional shallow fetch depth
528 Returns: The local refs
529 """
530 if determine_wants is None:
531 determine_wants = target.object_store.determine_wants_all
532 count, pack_data = self.fetch_pack_data(
533 determine_wants,
534 target.get_graph_walker(),
535 progress=progress,
536 depth=depth,
537 )
538 target.object_store.add_pack_data(count, pack_data, progress)
539 return self.get_refs()
541 def fetch_pack_data(
542 self,
543 determine_wants: Callable[[Mapping[bytes, bytes], Optional[int]], list[bytes]],
544 graph_walker: "GraphWalker",
545 progress: Optional[Callable[[bytes], None]],
546 *,
547 get_tagged: Optional[Callable[[], dict[bytes, bytes]]] = None,
548 depth: Optional[int] = None,
549 ) -> tuple[int, Iterator["UnpackedObject"]]:
550 """Fetch the pack data required for a set of revisions.
552 Args:
553 determine_wants: Function that takes a dictionary with heads
554 and returns the list of heads to fetch.
555 graph_walker: Object that can iterate over the list of revisions
556 to fetch and has an "ack" method that will be called to acknowledge
557 that a revision is present.
558 progress: Simple progress function that will be called with
559 updated progress strings.
560 get_tagged: Function that returns a dict of pointed-to sha ->
561 tag sha for including tags.
562 depth: Shallow fetch depth
563 Returns: count and iterator over pack data
564 """
565 missing_objects = self.find_missing_objects(
566 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth
567 )
568 if missing_objects is None:
569 return 0, iter([])
570 remote_has = missing_objects.get_remote_has()
571 object_ids = list(missing_objects)
572 return len(object_ids), generate_unpacked_objects(
573 self.object_store, object_ids, progress=progress, other_haves=remote_has
574 )
576 def find_missing_objects(
577 self,
578 determine_wants: Callable[[Mapping[bytes, bytes], Optional[int]], list[bytes]],
579 graph_walker: "GraphWalker",
580 progress: Optional[Callable[[bytes], None]],
581 *,
582 get_tagged: Optional[Callable[[], dict[bytes, bytes]]] = None,
583 depth: Optional[int] = None,
584 ) -> Optional[MissingObjectFinder]:
585 """Fetch the missing objects required for a set of revisions.
587 Args:
588 determine_wants: Function that takes a dictionary with heads
589 and returns the list of heads to fetch.
590 graph_walker: Object that can iterate over the list of revisions
591 to fetch and has an "ack" method that will be called to acknowledge
592 that a revision is present.
593 progress: Simple progress function that will be called with
594 updated progress strings.
595 get_tagged: Function that returns a dict of pointed-to sha ->
596 tag sha for including tags.
597 depth: Shallow fetch depth
598 Returns: iterator over objects, with __len__ implemented
599 """
600 refs = serialize_refs(self.object_store, self.get_refs())
602 wants = determine_wants(refs, depth)
603 if not isinstance(wants, list):
604 raise TypeError("determine_wants() did not return a list")
606 current_shallow = set(getattr(graph_walker, "shallow", set()))
608 if depth not in (None, 0):
609 assert depth is not None
610 shallow, not_shallow = find_shallow(self.object_store, wants, depth)
611 # Only update if graph_walker has shallow attribute
612 if hasattr(graph_walker, "shallow"):
613 graph_walker.shallow.update(shallow - not_shallow)
614 new_shallow = graph_walker.shallow - current_shallow
615 unshallow = not_shallow & current_shallow
616 setattr(graph_walker, "unshallow", unshallow)
617 if hasattr(graph_walker, "update_shallow"):
618 graph_walker.update_shallow(new_shallow, unshallow)
619 else:
620 unshallow = getattr(graph_walker, "unshallow", set())
622 if wants == []:
623 # TODO(dborowitz): find a way to short-circuit that doesn't change
624 # this interface.
626 if getattr(graph_walker, "shallow", set()) or unshallow:
627 # Do not send a pack in shallow short-circuit path
628 return None
630 # Return an actual MissingObjectFinder with empty wants
631 return MissingObjectFinder(
632 self.object_store,
633 haves=[],
634 wants=[],
635 )
637 # If the graph walker is set up with an implementation that can
638 # ACK/NAK to the wire, it will write data to the client through
639 # this call as a side-effect.
640 haves = self.object_store.find_common_revisions(graph_walker)
642 # Deal with shallow requests separately because the haves do
643 # not reflect what objects are missing
644 if getattr(graph_walker, "shallow", set()) or unshallow:
645 # TODO: filter the haves commits from iter_shas. the specific
646 # commits aren't missing.
647 haves = []
649 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow)
651 def get_parents(commit: Commit) -> list[bytes]:
652 """Get parents for a commit using the parents provider.
654 Args:
655 commit: Commit object
657 Returns:
658 List of parent commit SHAs
659 """
660 return parents_provider.get_parents(commit.id, commit)
662 return MissingObjectFinder(
663 self.object_store,
664 haves=haves,
665 wants=wants,
666 shallow=getattr(graph_walker, "shallow", set()),
667 progress=progress,
668 get_tagged=get_tagged,
669 get_parents=get_parents,
670 )
672 def generate_pack_data(
673 self,
674 have: set[ObjectID],
675 want: set[ObjectID],
676 *,
677 shallow: Optional[set[ObjectID]] = None,
678 progress: Optional[Callable[[str], None]] = None,
679 ofs_delta: Optional[bool] = None,
680 ) -> tuple[int, Iterator["UnpackedObject"]]:
681 """Generate pack data objects for a set of wants/haves.
683 Args:
684 have: List of SHA1s of objects that should not be sent
685 want: List of SHA1s of objects that should be sent
686 shallow: Set of shallow commit SHA1s to skip (defaults to repo's shallow commits)
687 ofs_delta: Whether OFS deltas can be included
688 progress: Optional progress reporting method
689 """
690 if shallow is None:
691 shallow = self.get_shallow()
692 return self.object_store.generate_pack_data(
693 have,
694 want,
695 shallow=shallow,
696 progress=progress,
697 ofs_delta=ofs_delta if ofs_delta is not None else DEFAULT_OFS_DELTA,
698 )
700 def get_graph_walker(
701 self, heads: Optional[list[ObjectID]] = None
702 ) -> ObjectStoreGraphWalker:
703 """Retrieve a graph walker.
705 A graph walker is used by a remote repository (or proxy)
706 to find out which objects are present in this repository.
708 Args:
709 heads: Repository heads to use (optional)
710 Returns: A graph walker object
711 """
712 if heads is None:
713 heads = [
714 sha
715 for sha in self.refs.as_dict(b"refs/heads").values()
716 if sha in self.object_store
717 ]
718 parents_provider = ParentsProvider(self.object_store)
719 return ObjectStoreGraphWalker(
720 heads,
721 parents_provider.get_parents,
722 shallow=self.get_shallow(),
723 update_shallow=self.update_shallow,
724 )
726 def get_refs(self) -> dict[bytes, bytes]:
727 """Get dictionary with all refs.
729 Returns: A ``dict`` mapping ref names to SHA1s
730 """
731 return self.refs.as_dict()
733 def head(self) -> bytes:
734 """Return the SHA1 pointed at by HEAD."""
735 # TODO: move this method to WorkTree
736 return self.refs[b"HEAD"]
738 def _get_object(self, sha: bytes, cls: type[T]) -> T:
739 assert len(sha) in (20, 40)
740 ret = self.get_object(sha)
741 if not isinstance(ret, cls):
742 if cls is Commit:
743 raise NotCommitError(ret.id)
744 elif cls is Blob:
745 raise NotBlobError(ret.id)
746 elif cls is Tree:
747 raise NotTreeError(ret.id)
748 elif cls is Tag:
749 raise NotTagError(ret.id)
750 else:
751 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}")
752 return ret
754 def get_object(self, sha: bytes) -> ShaFile:
755 """Retrieve the object with the specified SHA.
757 Args:
758 sha: SHA to retrieve
759 Returns: A ShaFile object
760 Raises:
761 KeyError: when the object can not be found
762 """
763 return self.object_store[sha]
765 def parents_provider(self) -> ParentsProvider:
766 """Get a parents provider for this repository.
768 Returns:
769 ParentsProvider instance configured with grafts and shallows
770 """
771 return ParentsProvider(
772 self.object_store,
773 grafts=self._graftpoints,
774 shallows=self.get_shallow(),
775 )
777 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]:
778 """Retrieve the parents of a specific commit.
780 If the specific commit is a graftpoint, the graft parents
781 will be returned instead.
783 Args:
784 sha: SHA of the commit for which to retrieve the parents
785 commit: Optional commit matching the sha
786 Returns: List of parents
787 """
788 return self.parents_provider().get_parents(sha, commit)
790 def get_config(self) -> "ConfigFile":
791 """Retrieve the config object.
793 Returns: `ConfigFile` object for the ``.git/config`` file.
794 """
795 raise NotImplementedError(self.get_config)
797 def get_worktree_config(self) -> "ConfigFile":
798 """Retrieve the worktree config object."""
799 raise NotImplementedError(self.get_worktree_config)
801 def get_description(self) -> Optional[bytes]:
802 """Retrieve the description for this repository.
804 Returns: Bytes with the description of the repository
805 as set by the user.
806 """
807 raise NotImplementedError(self.get_description)
809 def set_description(self, description: bytes) -> None:
810 """Set the description for this repository.
812 Args:
813 description: Text to set as description for this repository.
814 """
815 raise NotImplementedError(self.set_description)
817 def get_rebase_state_manager(self) -> "RebaseStateManager":
818 """Get the appropriate rebase state manager for this repository.
820 Returns: RebaseStateManager instance
821 """
822 raise NotImplementedError(self.get_rebase_state_manager)
824 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
825 """Return a BlobNormalizer object for checkin/checkout operations.
827 Returns: BlobNormalizer instance
828 """
829 raise NotImplementedError(self.get_blob_normalizer)
831 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
832 """Read gitattributes for the repository.
834 Args:
835 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
837 Returns:
838 GitAttributes object that can be used to match paths
839 """
840 raise NotImplementedError(self.get_gitattributes)
842 def get_config_stack(self) -> "StackedConfig":
843 """Return a config stack for this repository.
845 This stack accesses the configuration for both this repository
846 itself (.git/config) and the global configuration, which usually
847 lives in ~/.gitconfig.
849 Returns: `Config` instance for this repository
850 """
851 from .config import ConfigFile, StackedConfig
853 local_config = self.get_config()
854 backends: list[ConfigFile] = [local_config]
855 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
856 backends.append(self.get_worktree_config())
858 backends += StackedConfig.default_backends()
859 return StackedConfig(backends, writable=local_config)
861 def get_shallow(self) -> set[ObjectID]:
862 """Get the set of shallow commits.
864 Returns: Set of shallow commits.
865 """
866 f = self.get_named_file("shallow")
867 if f is None:
868 return set()
869 with f:
870 return {line.strip() for line in f}
872 def update_shallow(
873 self, new_shallow: Optional[set[bytes]], new_unshallow: Optional[set[bytes]]
874 ) -> None:
875 """Update the list of shallow objects.
877 Args:
878 new_shallow: Newly shallow objects
879 new_unshallow: Newly no longer shallow objects
880 """
881 shallow = self.get_shallow()
882 if new_shallow:
883 shallow.update(new_shallow)
884 if new_unshallow:
885 shallow.difference_update(new_unshallow)
886 if shallow:
887 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow]))
888 else:
889 self._del_named_file("shallow")
891 def get_peeled(self, ref: Ref) -> ObjectID:
892 """Get the peeled value of a ref.
894 Args:
895 ref: The refname to peel.
896 Returns: The fully-peeled SHA1 of a tag object, after peeling all
897 intermediate tags; if the original ref does not point to a tag,
898 this will equal the original SHA1.
899 """
900 cached = self.refs.get_peeled(ref)
901 if cached is not None:
902 return cached
903 return peel_sha(self.object_store, self.refs[ref])[1].id
905 @property
906 def notes(self) -> "Notes":
907 """Access notes functionality for this repository.
909 Returns:
910 Notes object for accessing notes
911 """
912 from .notes import Notes
914 return Notes(self.object_store, self.refs)
916 def get_walker(
917 self,
918 include: Optional[Sequence[bytes]] = None,
919 exclude: Optional[Sequence[bytes]] = None,
920 order: str = "date",
921 reverse: bool = False,
922 max_entries: Optional[int] = None,
923 paths: Optional[Sequence[bytes]] = None,
924 rename_detector: Optional["RenameDetector"] = None,
925 follow: bool = False,
926 since: Optional[int] = None,
927 until: Optional[int] = None,
928 queue_cls: Optional[type] = None,
929 ) -> "Walker":
930 """Obtain a walker for this repository.
932 Args:
933 include: Iterable of SHAs of commits to include along with their
934 ancestors. Defaults to [HEAD]
935 exclude: Iterable of SHAs of commits to exclude along with their
936 ancestors, overriding includes.
937 order: ORDER_* constant specifying the order of results.
938 Anything other than ORDER_DATE may result in O(n) memory usage.
939 reverse: If True, reverse the order of output, requiring O(n)
940 memory.
941 max_entries: The maximum number of entries to yield, or None for
942 no limit.
943 paths: Iterable of file or subtree paths to show entries for.
944 rename_detector: diff.RenameDetector object for detecting
945 renames.
946 follow: If True, follow path across renames/copies. Forces a
947 default rename_detector.
948 since: Timestamp to list commits after.
949 until: Timestamp to list commits before.
950 queue_cls: A class to use for a queue of commits, supporting the
951 iterator protocol. The constructor takes a single argument, the Walker.
953 Returns: A `Walker` object
954 """
955 from .walk import Walker, _CommitTimeQueue
957 if include is None:
958 include = [self.head()]
960 # Pass all arguments to Walker explicitly to avoid type issues with **kwargs
961 return Walker(
962 self.object_store,
963 include,
964 exclude=exclude,
965 order=order,
966 reverse=reverse,
967 max_entries=max_entries,
968 paths=paths,
969 rename_detector=rename_detector,
970 follow=follow,
971 since=since,
972 until=until,
973 get_parents=lambda commit: self.get_parents(commit.id, commit),
974 queue_cls=queue_cls if queue_cls is not None else _CommitTimeQueue,
975 )
977 def __getitem__(self, name: Union[ObjectID, Ref]) -> "ShaFile":
978 """Retrieve a Git object by SHA1 or ref.
980 Args:
981 name: A Git object SHA1 or a ref name
982 Returns: A `ShaFile` object, such as a Commit or Blob
983 Raises:
984 KeyError: when the specified ref or object does not exist
985 """
986 if not isinstance(name, bytes):
987 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}")
988 if len(name) in (20, 40):
989 try:
990 return self.object_store[name]
991 except (KeyError, ValueError):
992 pass
993 try:
994 return self.object_store[self.refs[name]]
995 except RefFormatError as exc:
996 raise KeyError(name) from exc
998 def __contains__(self, name: bytes) -> bool:
999 """Check if a specific Git object or ref is present.
1001 Args:
1002 name: Git object SHA1 or ref name
1003 """
1004 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)):
1005 return name in self.object_store or name in self.refs
1006 else:
1007 return name in self.refs
1009 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None:
1010 """Set a ref.
1012 Args:
1013 name: ref name
1014 value: Ref value - either a ShaFile object, or a hex sha
1015 """
1016 if name.startswith(b"refs/") or name == b"HEAD":
1017 if isinstance(value, ShaFile):
1018 self.refs[name] = value.id
1019 elif isinstance(value, bytes):
1020 self.refs[name] = value
1021 else:
1022 raise TypeError(value)
1023 else:
1024 raise ValueError(name)
1026 def __delitem__(self, name: bytes) -> None:
1027 """Remove a ref.
1029 Args:
1030 name: Name of the ref to remove
1031 """
1032 if name.startswith(b"refs/") or name == b"HEAD":
1033 del self.refs[name]
1034 else:
1035 raise ValueError(name)
1037 def _get_user_identity(
1038 self, config: "StackedConfig", kind: Optional[str] = None
1039 ) -> bytes:
1040 """Determine the identity to use for new commits."""
1041 warnings.warn(
1042 "use get_user_identity() rather than Repo._get_user_identity",
1043 DeprecationWarning,
1044 )
1045 return get_user_identity(config)
1047 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None:
1048 """Add or modify graftpoints.
1050 Args:
1051 updated_graftpoints: Dict of commit shas to list of parent shas
1052 """
1053 # Simple validation
1054 for commit, parents in updated_graftpoints.items():
1055 for sha in [commit, *parents]:
1056 check_hexsha(sha, "Invalid graftpoint")
1058 self._graftpoints.update(updated_graftpoints)
1060 def _remove_graftpoints(self, to_remove: Sequence[bytes] = ()) -> None:
1061 """Remove graftpoints.
1063 Args:
1064 to_remove: List of commit shas
1065 """
1066 for sha in to_remove:
1067 del self._graftpoints[sha]
1069 def _read_heads(self, name: str) -> list[bytes]:
1070 f = self.get_named_file(name)
1071 if f is None:
1072 return []
1073 with f:
1074 return [line.strip() for line in f.readlines() if line.strip()]
1076 def get_worktree(self) -> "WorkTree":
1077 """Get the working tree for this repository.
1079 Returns:
1080 WorkTree instance for performing working tree operations
1082 Raises:
1083 NotImplementedError: If the repository doesn't support working trees
1084 """
1085 raise NotImplementedError(
1086 "Working tree operations not supported by this repository type"
1087 )
1089 @replace_me(remove_in="0.26.0")
1090 def do_commit(
1091 self,
1092 message: Optional[bytes] = None,
1093 committer: Optional[bytes] = None,
1094 author: Optional[bytes] = None,
1095 commit_timestamp: Optional[float] = None,
1096 commit_timezone: Optional[int] = None,
1097 author_timestamp: Optional[float] = None,
1098 author_timezone: Optional[int] = None,
1099 tree: Optional[ObjectID] = None,
1100 encoding: Optional[bytes] = None,
1101 ref: Optional[Ref] = b"HEAD",
1102 merge_heads: Optional[list[ObjectID]] = None,
1103 no_verify: bool = False,
1104 sign: bool = False,
1105 ) -> bytes:
1106 """Create a new commit.
1108 If not specified, committer and author default to
1109 get_user_identity(..., 'COMMITTER')
1110 and get_user_identity(..., 'AUTHOR') respectively.
1112 Args:
1113 message: Commit message (bytes or callable that takes (repo, commit)
1114 and returns bytes)
1115 committer: Committer fullname
1116 author: Author fullname
1117 commit_timestamp: Commit timestamp (defaults to now)
1118 commit_timezone: Commit timestamp timezone (defaults to GMT)
1119 author_timestamp: Author timestamp (defaults to commit
1120 timestamp)
1121 author_timezone: Author timestamp timezone
1122 (defaults to commit timestamp timezone)
1123 tree: SHA1 of the tree root to use (if not specified the
1124 current index will be committed).
1125 encoding: Encoding
1126 ref: Optional ref to commit to (defaults to current branch).
1127 If None, creates a dangling commit without updating any ref.
1128 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
1129 no_verify: Skip pre-commit and commit-msg hooks
1130 sign: GPG Sign the commit (bool, defaults to False,
1131 pass True to use default GPG key,
1132 pass a str containing Key ID to use a specific GPG key)
1134 Returns:
1135 New commit SHA1
1136 """
1137 return self.get_worktree().commit(
1138 message=message,
1139 committer=committer,
1140 author=author,
1141 commit_timestamp=commit_timestamp,
1142 commit_timezone=commit_timezone,
1143 author_timestamp=author_timestamp,
1144 author_timezone=author_timezone,
1145 tree=tree,
1146 encoding=encoding,
1147 ref=ref,
1148 merge_heads=merge_heads,
1149 no_verify=no_verify,
1150 sign=sign,
1151 )
1154def read_gitfile(f: BinaryIO) -> str:
1155 """Read a ``.git`` file.
1157 The first line of the file should start with "gitdir: "
1159 Args:
1160 f: File-like object to read from
1161 Returns: A path
1162 """
1163 cs = f.read()
1164 if not cs.startswith(b"gitdir: "):
1165 raise ValueError("Expected file to start with 'gitdir: '")
1166 return cs[len(b"gitdir: ") :].rstrip(b"\r\n").decode("utf-8")
1169class UnsupportedVersion(Exception):
1170 """Unsupported repository version."""
1172 def __init__(self, version: int) -> None:
1173 """Initialize UnsupportedVersion exception.
1175 Args:
1176 version: The unsupported repository version
1177 """
1178 self.version = version
1181class UnsupportedExtension(Exception):
1182 """Unsupported repository extension."""
1184 def __init__(self, extension: str) -> None:
1185 """Initialize UnsupportedExtension exception.
1187 Args:
1188 extension: The unsupported repository extension
1189 """
1190 self.extension = extension
1193class Repo(BaseRepo):
1194 """A git repository backed by local disk.
1196 To open an existing repository, call the constructor with
1197 the path of the repository.
1199 To create a new repository, use the Repo.init class method.
1201 Note that a repository object may hold on to resources such
1202 as file handles for performance reasons; call .close() to free
1203 up those resources.
1205 Attributes:
1206 path: Path to the working copy (if it exists) or repository control
1207 directory (if the repository is bare)
1208 bare: Whether this is a bare repository
1209 """
1211 path: str
1212 bare: bool
1213 object_store: DiskObjectStore
1214 filter_context: Optional["FilterContext"]
1216 def __init__(
1217 self,
1218 root: Union[str, bytes, os.PathLike[str]],
1219 object_store: Optional[PackBasedObjectStore] = None,
1220 bare: Optional[bool] = None,
1221 ) -> None:
1222 """Open a repository on disk.
1224 Args:
1225 root: Path to the repository's root.
1226 object_store: ObjectStore to use; if omitted, we use the
1227 repository's default object store
1228 bare: True if this is a bare repository.
1229 """
1230 root = os.fspath(root)
1231 if isinstance(root, bytes):
1232 root = os.fsdecode(root)
1233 hidden_path = os.path.join(root, CONTROLDIR)
1234 if bare is None:
1235 if os.path.isfile(hidden_path) or os.path.isdir(
1236 os.path.join(hidden_path, OBJECTDIR)
1237 ):
1238 bare = False
1239 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(
1240 os.path.join(root, REFSDIR)
1241 ):
1242 bare = True
1243 else:
1244 raise NotGitRepository(
1245 "No git repository was found at {path}".format(**dict(path=root))
1246 )
1248 self.bare = bare
1249 if bare is False:
1250 if os.path.isfile(hidden_path):
1251 with open(hidden_path, "rb") as f:
1252 path = read_gitfile(f)
1253 self._controldir = os.path.join(root, path)
1254 else:
1255 self._controldir = hidden_path
1256 else:
1257 self._controldir = root
1258 commondir = self.get_named_file(COMMONDIR)
1259 if commondir is not None:
1260 with commondir:
1261 self._commondir = os.path.join(
1262 self.controldir(),
1263 os.fsdecode(commondir.read().rstrip(b"\r\n")),
1264 )
1265 else:
1266 self._commondir = self._controldir
1267 self.path = root
1269 # Initialize refs early so they're available for config condition matchers
1270 self.refs = DiskRefsContainer(
1271 self.commondir(), self._controldir, logger=self._write_reflog
1272 )
1274 # Initialize worktrees container
1275 from .worktree import WorkTreeContainer
1277 self.worktrees = WorkTreeContainer(self)
1279 config = self.get_config()
1280 try:
1281 repository_format_version = config.get("core", "repositoryformatversion")
1282 format_version = (
1283 0
1284 if repository_format_version is None
1285 else int(repository_format_version)
1286 )
1287 except KeyError:
1288 format_version = 0
1290 if format_version not in (0, 1):
1291 raise UnsupportedVersion(format_version)
1293 # Track extensions we encounter
1294 has_reftable_extension = False
1295 for extension, value in config.items((b"extensions",)):
1296 if extension.lower() == b"refstorage":
1297 if value == b"reftable":
1298 has_reftable_extension = True
1299 else:
1300 raise UnsupportedExtension(f"refStorage = {value.decode()}")
1301 elif extension.lower() not in (b"worktreeconfig",):
1302 raise UnsupportedExtension(extension.decode("utf-8"))
1304 if object_store is None:
1305 object_store = DiskObjectStore.from_config(
1306 os.path.join(self.commondir(), OBJECTDIR), config
1307 )
1309 # Use reftable if extension is configured
1310 if has_reftable_extension:
1311 from .reftable import ReftableRefsContainer
1313 self.refs = ReftableRefsContainer(self.commondir())
1314 # Update worktrees container after refs change
1315 self.worktrees = WorkTreeContainer(self)
1316 BaseRepo.__init__(self, object_store, self.refs)
1318 self._graftpoints = {}
1319 graft_file = self.get_named_file(
1320 os.path.join("info", "grafts"), basedir=self.commondir()
1321 )
1322 if graft_file:
1323 with graft_file:
1324 self._graftpoints.update(parse_graftpoints(graft_file))
1325 graft_file = self.get_named_file("shallow", basedir=self.commondir())
1326 if graft_file:
1327 with graft_file:
1328 self._graftpoints.update(parse_graftpoints(graft_file))
1330 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir())
1331 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir())
1332 self.hooks["post-commit"] = PostCommitShellHook(self.controldir())
1333 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir())
1335 # Initialize filter context as None, will be created lazily
1336 self.filter_context = None
1338 def get_worktree(self) -> "WorkTree":
1339 """Get the working tree for this repository.
1341 Returns:
1342 WorkTree instance for performing working tree operations
1343 """
1344 from .worktree import WorkTree
1346 return WorkTree(self, self.path)
1348 def _write_reflog(
1349 self,
1350 ref: bytes,
1351 old_sha: bytes,
1352 new_sha: bytes,
1353 committer: Optional[bytes],
1354 timestamp: Optional[int],
1355 timezone: Optional[int],
1356 message: bytes,
1357 ) -> None:
1358 from .reflog import format_reflog_line
1360 path = self._reflog_path(ref)
1361 try:
1362 os.makedirs(os.path.dirname(path))
1363 except FileExistsError:
1364 pass
1365 if committer is None:
1366 config = self.get_config_stack()
1367 committer = get_user_identity(config)
1368 check_user_identity(committer)
1369 if timestamp is None:
1370 timestamp = int(time.time())
1371 if timezone is None:
1372 timezone = 0 # FIXME
1373 with open(path, "ab") as f:
1374 f.write(
1375 format_reflog_line(
1376 old_sha, new_sha, committer, timestamp, timezone, message
1377 )
1378 + b"\n"
1379 )
1381 def _reflog_path(self, ref: bytes) -> str:
1382 if ref.startswith((b"main-worktree/", b"worktrees/")):
1383 raise NotImplementedError(f"refs {ref.decode()} are not supported")
1385 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir()
1386 return os.path.join(base, "logs", os.fsdecode(ref))
1388 def read_reflog(self, ref: bytes) -> Generator[reflog.Entry, None, None]:
1389 """Read reflog entries for a reference.
1391 Args:
1392 ref: Reference name (e.g. b'HEAD', b'refs/heads/master')
1394 Yields:
1395 reflog.Entry objects in chronological order (oldest first)
1396 """
1397 from .reflog import read_reflog
1399 path = self._reflog_path(ref)
1400 try:
1401 with open(path, "rb") as f:
1402 yield from read_reflog(f)
1403 except FileNotFoundError:
1404 return
1406 @classmethod
1407 def discover(cls, start: Union[str, bytes, os.PathLike[str]] = ".") -> "Repo":
1408 """Iterate parent directories to discover a repository.
1410 Return a Repo object for the first parent directory that looks like a
1411 Git repository.
1413 Args:
1414 start: The directory to start discovery from (defaults to '.')
1415 """
1416 path = os.path.abspath(start)
1417 while True:
1418 try:
1419 return cls(path)
1420 except NotGitRepository:
1421 new_path, _tail = os.path.split(path)
1422 if new_path == path: # Root reached
1423 break
1424 path = new_path
1425 start_str = os.fspath(start)
1426 if isinstance(start_str, bytes):
1427 start_str = start_str.decode("utf-8")
1428 raise NotGitRepository(f"No git repository was found at {start_str}")
1430 def controldir(self) -> str:
1431 """Return the path of the control directory."""
1432 return self._controldir
1434 def commondir(self) -> str:
1435 """Return the path of the common directory.
1437 For a main working tree, it is identical to controldir().
1439 For a linked working tree, it is the control directory of the
1440 main working tree.
1441 """
1442 return self._commondir
1444 def _determine_file_mode(self) -> bool:
1445 """Probe the file-system to determine whether permissions can be trusted.
1447 Returns: True if permissions can be trusted, False otherwise.
1448 """
1449 fname = os.path.join(self.path, ".probe-permissions")
1450 with open(fname, "w") as f:
1451 f.write("")
1453 st1 = os.lstat(fname)
1454 try:
1455 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR)
1456 except PermissionError:
1457 return False
1458 st2 = os.lstat(fname)
1460 os.unlink(fname)
1462 mode_differs = st1.st_mode != st2.st_mode
1463 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0
1465 return mode_differs and st2_has_exec
1467 def _determine_symlinks(self) -> bool:
1468 """Probe the filesystem to determine whether symlinks can be created.
1470 Returns: True if symlinks can be created, False otherwise.
1471 """
1472 # TODO(jelmer): Actually probe disk / look at filesystem
1473 return sys.platform != "win32"
1475 def _put_named_file(self, path: str, contents: bytes) -> None:
1476 """Write a file to the control dir with the given name and contents.
1478 Args:
1479 path: The path to the file, relative to the control dir.
1480 contents: A string to write to the file.
1481 """
1482 path = path.lstrip(os.path.sep)
1483 with GitFile(os.path.join(self.controldir(), path), "wb") as f:
1484 f.write(contents)
1486 def _del_named_file(self, path: str) -> None:
1487 try:
1488 os.unlink(os.path.join(self.controldir(), path))
1489 except FileNotFoundError:
1490 return
1492 def get_named_file(
1493 self,
1494 path: Union[str, bytes],
1495 basedir: Optional[str] = None,
1496 ) -> Optional[BinaryIO]:
1497 """Get a file from the control dir with a specific name.
1499 Although the filename should be interpreted as a filename relative to
1500 the control dir in a disk-based Repo, the object returned need not be
1501 pointing to a file in that location.
1503 Args:
1504 path: The path to the file, relative to the control dir.
1505 basedir: Optional argument that specifies an alternative to the
1506 control dir.
1507 Returns: An open file object, or None if the file does not exist.
1508 """
1509 # TODO(dborowitz): sanitize filenames, since this is used directly by
1510 # the dumb web serving code.
1511 if basedir is None:
1512 basedir = self.controldir()
1513 if isinstance(path, bytes):
1514 path = path.decode("utf-8")
1515 path = path.lstrip(os.path.sep)
1516 try:
1517 return open(os.path.join(basedir, path), "rb")
1518 except FileNotFoundError:
1519 return None
1521 def index_path(self) -> str:
1522 """Return path to the index file."""
1523 return os.path.join(self.controldir(), INDEX_FILENAME)
1525 def open_index(self) -> "Index":
1526 """Open the index for this repository.
1528 Raises:
1529 NoIndexPresent: If no index is present
1530 Returns: The matching `Index`
1531 """
1532 from .index import Index
1534 if not self.has_index():
1535 raise NoIndexPresent
1537 # Check for manyFiles feature configuration
1538 config = self.get_config_stack()
1539 many_files = config.get_boolean(b"feature", b"manyFiles", False)
1540 skip_hash = False
1541 index_version = None
1543 if many_files:
1544 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true
1545 try:
1546 index_version_str = config.get(b"index", b"version")
1547 index_version = int(index_version_str)
1548 except KeyError:
1549 index_version = 4 # Default to version 4 for manyFiles
1550 skip_hash = config.get_boolean(b"index", b"skipHash", True)
1551 else:
1552 # Check for explicit index settings
1553 try:
1554 index_version_str = config.get(b"index", b"version")
1555 index_version = int(index_version_str)
1556 except KeyError:
1557 index_version = None
1558 skip_hash = config.get_boolean(b"index", b"skipHash", False)
1560 return Index(self.index_path(), skip_hash=skip_hash, version=index_version)
1562 def has_index(self) -> bool:
1563 """Check if an index is present."""
1564 # Bare repos must never have index files; non-bare repos may have a
1565 # missing index file, which is treated as empty.
1566 return not self.bare
1568 @replace_me(remove_in="0.26.0")
1569 def stage(
1570 self,
1571 fs_paths: Union[
1572 str, bytes, os.PathLike[str], Iterable[Union[str, bytes, os.PathLike[str]]]
1573 ],
1574 ) -> None:
1575 """Stage a set of paths.
1577 Args:
1578 fs_paths: List of paths, relative to the repository path
1579 """
1580 return self.get_worktree().stage(fs_paths)
1582 @replace_me(remove_in="0.26.0")
1583 def unstage(self, fs_paths: Sequence[str]) -> None:
1584 """Unstage specific file in the index.
1586 Args:
1587 fs_paths: a list of files to unstage,
1588 relative to the repository path.
1589 """
1590 return self.get_worktree().unstage(fs_paths)
1592 def clone(
1593 self,
1594 target_path: Union[str, bytes, os.PathLike[str]],
1595 *,
1596 mkdir: bool = True,
1597 bare: bool = False,
1598 origin: bytes = b"origin",
1599 checkout: Optional[bool] = None,
1600 branch: Optional[bytes] = None,
1601 progress: Optional[Callable[[str], None]] = None,
1602 depth: Optional[int] = None,
1603 symlinks: Optional[bool] = None,
1604 ) -> "Repo":
1605 """Clone this repository.
1607 Args:
1608 target_path: Target path
1609 mkdir: Create the target directory
1610 bare: Whether to create a bare repository
1611 checkout: Whether or not to check-out HEAD after cloning
1612 origin: Base name for refs in target repository
1613 cloned from this repository
1614 branch: Optional branch or tag to be used as HEAD in the new repository
1615 instead of this repository's HEAD.
1616 progress: Optional progress function
1617 depth: Depth at which to fetch
1618 symlinks: Symlinks setting (default to autodetect)
1619 Returns: Created repository as `Repo`
1620 """
1621 encoded_path = os.fsencode(self.path)
1623 if mkdir:
1624 os.mkdir(target_path)
1626 try:
1627 if not bare:
1628 target = Repo.init(target_path, symlinks=symlinks)
1629 if checkout is None:
1630 checkout = True
1631 else:
1632 if checkout:
1633 raise ValueError("checkout and bare are incompatible")
1634 target = Repo.init_bare(target_path)
1636 try:
1637 target_config = target.get_config()
1638 target_config.set((b"remote", origin), b"url", encoded_path)
1639 target_config.set(
1640 (b"remote", origin),
1641 b"fetch",
1642 b"+refs/heads/*:refs/remotes/" + origin + b"/*",
1643 )
1644 target_config.write_to_path()
1646 ref_message = b"clone: from " + encoded_path
1647 self.fetch(target, depth=depth)
1648 target.refs.import_refs(
1649 b"refs/remotes/" + origin,
1650 self.refs.as_dict(b"refs/heads"),
1651 message=ref_message,
1652 )
1653 target.refs.import_refs(
1654 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message
1655 )
1657 head_chain, origin_sha = self.refs.follow(b"HEAD")
1658 origin_head = head_chain[-1] if head_chain else None
1659 if origin_sha and not origin_head:
1660 # set detached HEAD
1661 target.refs[b"HEAD"] = origin_sha
1662 else:
1663 _set_origin_head(target.refs, origin, origin_head)
1664 head_ref = _set_default_branch(
1665 target.refs, origin, origin_head, branch, ref_message
1666 )
1668 # Update target head
1669 if head_ref:
1670 head = _set_head(target.refs, head_ref, ref_message)
1671 else:
1672 head = None
1674 if checkout and head is not None:
1675 target.get_worktree().reset_index()
1676 except BaseException:
1677 target.close()
1678 raise
1679 except BaseException:
1680 if mkdir:
1681 import shutil
1683 shutil.rmtree(target_path)
1684 raise
1685 return target
1687 @replace_me(remove_in="0.26.0")
1688 def reset_index(self, tree: Optional[bytes] = None) -> None:
1689 """Reset the index back to a specific tree.
1691 Args:
1692 tree: Tree SHA to reset to, None for current HEAD tree.
1693 """
1694 return self.get_worktree().reset_index(tree)
1696 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]:
1697 """Get condition matchers for includeIf conditions.
1699 Returns a dict of condition prefix to matcher function.
1700 """
1701 from pathlib import Path
1703 from .config import ConditionMatcher, match_glob_pattern
1705 # Add gitdir matchers
1706 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool:
1707 """Match gitdir against a pattern.
1709 Args:
1710 pattern: Pattern to match against
1711 case_sensitive: Whether to match case-sensitively
1713 Returns:
1714 True if gitdir matches pattern
1715 """
1716 # Handle relative patterns (starting with ./)
1717 if pattern.startswith("./"):
1718 # Can't handle relative patterns without config directory context
1719 return False
1721 # Normalize repository path
1722 try:
1723 repo_path = str(Path(self._controldir).resolve())
1724 except (OSError, ValueError):
1725 return False
1727 # Expand ~ in pattern and normalize
1728 pattern = os.path.expanduser(pattern)
1730 # Normalize pattern following Git's rules
1731 pattern = pattern.replace("\\", "/")
1732 if not pattern.startswith(("~/", "./", "/", "**")):
1733 # Check for Windows absolute path
1734 if len(pattern) >= 2 and pattern[1] == ":":
1735 pass
1736 else:
1737 pattern = "**/" + pattern
1738 if pattern.endswith("/"):
1739 pattern = pattern + "**"
1741 # Use the existing _match_gitdir_pattern function
1742 from .config import _match_gitdir_pattern
1744 pattern_bytes = pattern.encode("utf-8", errors="replace")
1745 repo_path_bytes = repo_path.encode("utf-8", errors="replace")
1747 return _match_gitdir_pattern(
1748 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive
1749 )
1751 # Add onbranch matcher
1752 def match_onbranch(pattern: str) -> bool:
1753 """Match current branch against a pattern.
1755 Args:
1756 pattern: Pattern to match against
1758 Returns:
1759 True if current branch matches pattern
1760 """
1761 try:
1762 # Get the current branch using refs
1763 ref_chain, _ = self.refs.follow(b"HEAD")
1764 head_ref = ref_chain[-1] # Get the final resolved ref
1765 except KeyError:
1766 pass
1767 else:
1768 if head_ref and head_ref.startswith(b"refs/heads/"):
1769 # Extract branch name from ref
1770 branch = extract_branch_name(head_ref).decode(
1771 "utf-8", errors="replace"
1772 )
1773 return match_glob_pattern(branch, pattern)
1774 return False
1776 matchers: dict[str, ConditionMatcher] = {
1777 "onbranch:": match_onbranch,
1778 "gitdir:": lambda pattern: match_gitdir(pattern, True),
1779 "gitdir/i:": lambda pattern: match_gitdir(pattern, False),
1780 }
1782 return matchers
1784 def get_worktree_config(self) -> "ConfigFile":
1785 """Get the worktree-specific config.
1787 Returns:
1788 ConfigFile object for the worktree config
1789 """
1790 from .config import ConfigFile
1792 path = os.path.join(self.commondir(), "config.worktree")
1793 try:
1794 # Pass condition matchers for includeIf evaluation
1795 condition_matchers = self._get_config_condition_matchers()
1796 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1797 except FileNotFoundError:
1798 cf = ConfigFile()
1799 cf.path = path
1800 return cf
1802 def get_config(self) -> "ConfigFile":
1803 """Retrieve the config object.
1805 Returns: `ConfigFile` object for the ``.git/config`` file.
1806 """
1807 from .config import ConfigFile
1809 path = os.path.join(self._commondir, "config")
1810 try:
1811 # Pass condition matchers for includeIf evaluation
1812 condition_matchers = self._get_config_condition_matchers()
1813 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1814 except FileNotFoundError:
1815 ret = ConfigFile()
1816 ret.path = path
1817 return ret
1819 def get_rebase_state_manager(self) -> "RebaseStateManager":
1820 """Get the appropriate rebase state manager for this repository.
1822 Returns: DiskRebaseStateManager instance
1823 """
1824 import os
1826 from .rebase import DiskRebaseStateManager
1828 path = os.path.join(self.controldir(), "rebase-merge")
1829 return DiskRebaseStateManager(path)
1831 def get_description(self) -> Optional[bytes]:
1832 """Retrieve the description of this repository.
1834 Returns: Description as bytes or None.
1835 """
1836 path = os.path.join(self._controldir, "description")
1837 try:
1838 with GitFile(path, "rb") as f:
1839 return f.read()
1840 except FileNotFoundError:
1841 return None
1843 def __repr__(self) -> str:
1844 """Return string representation of this repository."""
1845 return f"<Repo at {self.path!r}>"
1847 def set_description(self, description: bytes) -> None:
1848 """Set the description for this repository.
1850 Args:
1851 description: Text to set as description for this repository.
1852 """
1853 self._put_named_file("description", description)
1855 @classmethod
1856 def _init_maybe_bare(
1857 cls,
1858 path: Union[str, bytes, os.PathLike[str]],
1859 controldir: Union[str, bytes, os.PathLike[str]],
1860 bare: bool,
1861 object_store: Optional[PackBasedObjectStore] = None,
1862 config: Optional["StackedConfig"] = None,
1863 default_branch: Optional[bytes] = None,
1864 symlinks: Optional[bool] = None,
1865 format: Optional[int] = None,
1866 ) -> "Repo":
1867 path = os.fspath(path)
1868 if isinstance(path, bytes):
1869 path = os.fsdecode(path)
1870 controldir = os.fspath(controldir)
1871 if isinstance(controldir, bytes):
1872 controldir = os.fsdecode(controldir)
1873 for d in BASE_DIRECTORIES:
1874 os.mkdir(os.path.join(controldir, *d))
1875 if object_store is None:
1876 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR))
1877 ret = cls(path, bare=bare, object_store=object_store)
1878 if default_branch is None:
1879 if config is None:
1880 from .config import StackedConfig
1882 config = StackedConfig.default()
1883 try:
1884 default_branch = config.get("init", "defaultBranch")
1885 except KeyError:
1886 default_branch = DEFAULT_BRANCH
1887 ret.refs.set_symbolic_ref(b"HEAD", local_branch_name(default_branch))
1888 ret._init_files(bare=bare, symlinks=symlinks, format=format)
1889 return ret
1891 @classmethod
1892 def init(
1893 cls,
1894 path: Union[str, bytes, os.PathLike[str]],
1895 *,
1896 mkdir: bool = False,
1897 config: Optional["StackedConfig"] = None,
1898 default_branch: Optional[bytes] = None,
1899 symlinks: Optional[bool] = None,
1900 format: Optional[int] = None,
1901 ) -> "Repo":
1902 """Create a new repository.
1904 Args:
1905 path: Path in which to create the repository
1906 mkdir: Whether to create the directory
1907 config: Configuration object
1908 default_branch: Default branch name
1909 symlinks: Whether to support symlinks
1910 format: Repository format version (defaults to 0)
1911 Returns: `Repo` instance
1912 """
1913 path = os.fspath(path)
1914 if isinstance(path, bytes):
1915 path = os.fsdecode(path)
1916 if mkdir:
1917 os.mkdir(path)
1918 controldir = os.path.join(path, CONTROLDIR)
1919 os.mkdir(controldir)
1920 _set_filesystem_hidden(controldir)
1921 return cls._init_maybe_bare(
1922 path,
1923 controldir,
1924 False,
1925 config=config,
1926 default_branch=default_branch,
1927 symlinks=symlinks,
1928 format=format,
1929 )
1931 @classmethod
1932 def _init_new_working_directory(
1933 cls,
1934 path: Union[str, bytes, os.PathLike[str]],
1935 main_repo: "Repo",
1936 identifier: Optional[str] = None,
1937 mkdir: bool = False,
1938 ) -> "Repo":
1939 """Create a new working directory linked to a repository.
1941 Args:
1942 path: Path in which to create the working tree.
1943 main_repo: Main repository to reference
1944 identifier: Worktree identifier
1945 mkdir: Whether to create the directory
1946 Returns: `Repo` instance
1947 """
1948 path = os.fspath(path)
1949 if isinstance(path, bytes):
1950 path = os.fsdecode(path)
1951 if mkdir:
1952 os.mkdir(path)
1953 if identifier is None:
1954 identifier = os.path.basename(path)
1955 # Ensure we use absolute path for the worktree control directory
1956 main_controldir = os.path.abspath(main_repo.controldir())
1957 main_worktreesdir = os.path.join(main_controldir, WORKTREES)
1958 worktree_controldir = os.path.join(main_worktreesdir, identifier)
1959 gitdirfile = os.path.join(path, CONTROLDIR)
1960 with open(gitdirfile, "wb") as f:
1961 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n")
1962 try:
1963 os.mkdir(main_worktreesdir)
1964 except FileExistsError:
1965 pass
1966 try:
1967 os.mkdir(worktree_controldir)
1968 except FileExistsError:
1969 pass
1970 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f:
1971 f.write(os.fsencode(gitdirfile) + b"\n")
1972 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f:
1973 f.write(b"../..\n")
1974 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f:
1975 f.write(main_repo.head() + b"\n")
1976 r = cls(os.path.normpath(path))
1977 r.get_worktree().reset_index()
1978 return r
1980 @classmethod
1981 def init_bare(
1982 cls,
1983 path: Union[str, bytes, os.PathLike[str]],
1984 *,
1985 mkdir: bool = False,
1986 object_store: Optional[PackBasedObjectStore] = None,
1987 config: Optional["StackedConfig"] = None,
1988 default_branch: Optional[bytes] = None,
1989 format: Optional[int] = None,
1990 ) -> "Repo":
1991 """Create a new bare repository.
1993 ``path`` should already exist and be an empty directory.
1995 Args:
1996 path: Path to create bare repository in
1997 mkdir: Whether to create the directory
1998 object_store: Object store to use
1999 config: Configuration object
2000 default_branch: Default branch name
2001 format: Repository format version (defaults to 0)
2002 Returns: a `Repo` instance
2003 """
2004 path = os.fspath(path)
2005 if isinstance(path, bytes):
2006 path = os.fsdecode(path)
2007 if mkdir:
2008 os.mkdir(path)
2009 return cls._init_maybe_bare(
2010 path,
2011 path,
2012 True,
2013 object_store=object_store,
2014 config=config,
2015 default_branch=default_branch,
2016 format=format,
2017 )
2019 create = init_bare
2021 def close(self) -> None:
2022 """Close any files opened by this repository."""
2023 self.object_store.close()
2024 # Clean up filter context if it was created
2025 if self.filter_context is not None:
2026 self.filter_context.close()
2027 self.filter_context = None
2029 def __enter__(self) -> "Repo":
2030 """Enter context manager."""
2031 return self
2033 def __exit__(
2034 self,
2035 exc_type: Optional[type[BaseException]],
2036 exc_val: Optional[BaseException],
2037 exc_tb: Optional[TracebackType],
2038 ) -> None:
2039 """Exit context manager and close repository."""
2040 self.close()
2042 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]:
2043 """Read .gitattributes file from working tree.
2045 Returns:
2046 Dictionary mapping file patterns to attributes
2047 """
2048 gitattributes = {}
2049 gitattributes_path = os.path.join(self.path, ".gitattributes")
2051 if os.path.exists(gitattributes_path):
2052 with open(gitattributes_path, "rb") as f:
2053 for line in f:
2054 line = line.strip()
2055 if not line or line.startswith(b"#"):
2056 continue
2058 parts = line.split()
2059 if len(parts) < 2:
2060 continue
2062 pattern = parts[0]
2063 attrs = {}
2065 for attr in parts[1:]:
2066 if attr.startswith(b"-"):
2067 # Unset attribute
2068 attrs[attr[1:]] = b"false"
2069 elif b"=" in attr:
2070 # Set to value
2071 key, value = attr.split(b"=", 1)
2072 attrs[key] = value
2073 else:
2074 # Set attribute
2075 attrs[attr] = b"true"
2077 gitattributes[pattern] = attrs
2079 return gitattributes
2081 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
2082 """Return a BlobNormalizer object."""
2083 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2085 # Get fresh configuration and GitAttributes
2086 config_stack = self.get_config_stack()
2087 git_attributes = self.get_gitattributes()
2089 # Lazily create FilterContext if needed
2090 if self.filter_context is None:
2091 filter_registry = FilterRegistry(config_stack, self)
2092 self.filter_context = FilterContext(filter_registry)
2093 else:
2094 # Refresh the context with current config to handle config changes
2095 self.filter_context.refresh_config(config_stack)
2097 # Return a new FilterBlobNormalizer with the context
2098 return FilterBlobNormalizer(
2099 config_stack, git_attributes, filter_context=self.filter_context
2100 )
2102 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
2103 """Read gitattributes for the repository.
2105 Args:
2106 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
2108 Returns:
2109 GitAttributes object that can be used to match paths
2110 """
2111 from .attrs import (
2112 GitAttributes,
2113 Pattern,
2114 parse_git_attributes,
2115 )
2117 patterns = []
2119 # Read system gitattributes (TODO: implement this)
2120 # Read global gitattributes (TODO: implement this)
2122 # Read repository .gitattributes from index/tree
2123 if tree is None:
2124 try:
2125 # Try to get from HEAD
2126 head = self[b"HEAD"]
2127 if isinstance(head, Tag):
2128 _cls, obj = head.object
2129 head = self.get_object(obj)
2130 assert isinstance(head, Commit)
2131 tree = head.tree
2132 except KeyError:
2133 # No HEAD, no attributes from tree
2134 pass
2136 if tree is not None:
2137 try:
2138 tree_obj = self[tree]
2139 assert isinstance(tree_obj, Tree)
2140 if b".gitattributes" in tree_obj:
2141 _, attrs_sha = tree_obj[b".gitattributes"]
2142 attrs_blob = self[attrs_sha]
2143 if isinstance(attrs_blob, Blob):
2144 attrs_data = BytesIO(attrs_blob.data)
2145 for pattern_bytes, attrs in parse_git_attributes(attrs_data):
2146 pattern = Pattern(pattern_bytes)
2147 patterns.append((pattern, attrs))
2148 except (KeyError, NotTreeError):
2149 pass
2151 # Read .git/info/attributes
2152 info_attrs_path = os.path.join(self.controldir(), "info", "attributes")
2153 if os.path.exists(info_attrs_path):
2154 with open(info_attrs_path, "rb") as f:
2155 for pattern_bytes, attrs in parse_git_attributes(f):
2156 pattern = Pattern(pattern_bytes)
2157 patterns.append((pattern, attrs))
2159 # Read .gitattributes from working directory (if it exists)
2160 working_attrs_path = os.path.join(self.path, ".gitattributes")
2161 if os.path.exists(working_attrs_path):
2162 with open(working_attrs_path, "rb") as f:
2163 for pattern_bytes, attrs in parse_git_attributes(f):
2164 pattern = Pattern(pattern_bytes)
2165 patterns.append((pattern, attrs))
2167 return GitAttributes(patterns)
2169 @replace_me(remove_in="0.26.0")
2170 def _sparse_checkout_file_path(self) -> str:
2171 """Return the path of the sparse-checkout file in this repo's control dir."""
2172 return self.get_worktree()._sparse_checkout_file_path()
2174 @replace_me(remove_in="0.26.0")
2175 def configure_for_cone_mode(self) -> None:
2176 """Ensure the repository is configured for cone-mode sparse-checkout."""
2177 return self.get_worktree().configure_for_cone_mode()
2179 @replace_me(remove_in="0.26.0")
2180 def infer_cone_mode(self) -> bool:
2181 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
2182 return self.get_worktree().infer_cone_mode()
2184 @replace_me(remove_in="0.26.0")
2185 def get_sparse_checkout_patterns(self) -> list[str]:
2186 """Return a list of sparse-checkout patterns from info/sparse-checkout.
2188 Returns:
2189 A list of patterns. Returns an empty list if the file is missing.
2190 """
2191 return self.get_worktree().get_sparse_checkout_patterns()
2193 @replace_me(remove_in="0.26.0")
2194 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None:
2195 """Write the given sparse-checkout patterns into info/sparse-checkout.
2197 Creates the info/ directory if it does not exist.
2199 Args:
2200 patterns: A list of gitignore-style patterns to store.
2201 """
2202 return self.get_worktree().set_sparse_checkout_patterns(patterns)
2204 @replace_me(remove_in="0.26.0")
2205 def set_cone_mode_patterns(self, dirs: Union[Sequence[str], None] = None) -> None:
2206 """Write the given cone-mode directory patterns into info/sparse-checkout.
2208 For each directory to include, add an inclusion line that "undoes" the prior
2209 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
2210 Never add the same line twice.
2211 """
2212 return self.get_worktree().set_cone_mode_patterns(dirs)
2215class MemoryRepo(BaseRepo):
2216 """Repo that stores refs, objects, and named files in memory.
2218 MemoryRepos are always bare: they have no working tree and no index, since
2219 those have a stronger dependency on the filesystem.
2220 """
2222 filter_context: Optional["FilterContext"]
2224 def __init__(self) -> None:
2225 """Create a new repository in memory."""
2226 from .config import ConfigFile
2228 self._reflog: list[Any] = []
2229 refs_container = DictRefsContainer({}, logger=self._append_reflog)
2230 BaseRepo.__init__(self, MemoryObjectStore(), refs_container)
2231 self._named_files: dict[str, bytes] = {}
2232 self.bare = True
2233 self._config = ConfigFile()
2234 self._description: Optional[bytes] = None
2235 self.filter_context = None
2237 def _append_reflog(
2238 self,
2239 ref: bytes,
2240 old_sha: Optional[bytes],
2241 new_sha: Optional[bytes],
2242 committer: Optional[bytes],
2243 timestamp: Optional[int],
2244 timezone: Optional[int],
2245 message: Optional[bytes],
2246 ) -> None:
2247 self._reflog.append(
2248 (ref, old_sha, new_sha, committer, timestamp, timezone, message)
2249 )
2251 def set_description(self, description: bytes) -> None:
2252 """Set the description for this repository.
2254 Args:
2255 description: Text to set as description
2256 """
2257 self._description = description
2259 def get_description(self) -> Optional[bytes]:
2260 """Get the description of this repository.
2262 Returns:
2263 Repository description as bytes
2264 """
2265 return self._description
2267 def _determine_file_mode(self) -> bool:
2268 """Probe the file-system to determine whether permissions can be trusted.
2270 Returns: True if permissions can be trusted, False otherwise.
2271 """
2272 return sys.platform != "win32"
2274 def _determine_symlinks(self) -> bool:
2275 """Probe the file-system to determine whether permissions can be trusted.
2277 Returns: True if permissions can be trusted, False otherwise.
2278 """
2279 return sys.platform != "win32"
2281 def _put_named_file(self, path: str, contents: bytes) -> None:
2282 """Write a file to the control dir with the given name and contents.
2284 Args:
2285 path: The path to the file, relative to the control dir.
2286 contents: A string to write to the file.
2287 """
2288 self._named_files[path] = contents
2290 def _del_named_file(self, path: str) -> None:
2291 try:
2292 del self._named_files[path]
2293 except KeyError:
2294 pass
2296 def get_named_file(
2297 self,
2298 path: Union[str, bytes],
2299 basedir: Optional[str] = None,
2300 ) -> Optional[BytesIO]:
2301 """Get a file from the control dir with a specific name.
2303 Although the filename should be interpreted as a filename relative to
2304 the control dir in a disk-baked Repo, the object returned need not be
2305 pointing to a file in that location.
2307 Args:
2308 path: The path to the file, relative to the control dir.
2309 basedir: Optional base directory for the path
2310 Returns: An open file object, or None if the file does not exist.
2311 """
2312 path_str = path.decode() if isinstance(path, bytes) else path
2313 contents = self._named_files.get(path_str, None)
2314 if contents is None:
2315 return None
2316 return BytesIO(contents)
2318 def open_index(self) -> "Index":
2319 """Fail to open index for this repo, since it is bare.
2321 Raises:
2322 NoIndexPresent: Raised when no index is present
2323 """
2324 raise NoIndexPresent
2326 def get_config(self) -> "ConfigFile":
2327 """Retrieve the config object.
2329 Returns: `ConfigFile` object.
2330 """
2331 return self._config
2333 def get_rebase_state_manager(self) -> "RebaseStateManager":
2334 """Get the appropriate rebase state manager for this repository.
2336 Returns: MemoryRebaseStateManager instance
2337 """
2338 from .rebase import MemoryRebaseStateManager
2340 return MemoryRebaseStateManager(self)
2342 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
2343 """Return a BlobNormalizer object for checkin/checkout operations."""
2344 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2346 # Get fresh configuration and GitAttributes
2347 config_stack = self.get_config_stack()
2348 git_attributes = self.get_gitattributes()
2350 # Lazily create FilterContext if needed
2351 if self.filter_context is None:
2352 filter_registry = FilterRegistry(config_stack, self)
2353 self.filter_context = FilterContext(filter_registry)
2354 else:
2355 # Refresh the context with current config to handle config changes
2356 self.filter_context.refresh_config(config_stack)
2358 # Return a new FilterBlobNormalizer with the context
2359 return FilterBlobNormalizer(
2360 config_stack, git_attributes, filter_context=self.filter_context
2361 )
2363 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
2364 """Read gitattributes for the repository."""
2365 from .attrs import GitAttributes
2367 # Memory repos don't have working trees or gitattributes files
2368 # Return empty GitAttributes
2369 return GitAttributes([])
2371 def close(self) -> None:
2372 """Close any resources opened by this repository."""
2373 # Clean up filter context if it was created
2374 if self.filter_context is not None:
2375 self.filter_context.close()
2376 self.filter_context = None
2378 def do_commit(
2379 self,
2380 message: Optional[bytes] = None,
2381 committer: Optional[bytes] = None,
2382 author: Optional[bytes] = None,
2383 commit_timestamp: Optional[float] = None,
2384 commit_timezone: Optional[int] = None,
2385 author_timestamp: Optional[float] = None,
2386 author_timezone: Optional[int] = None,
2387 tree: Optional[ObjectID] = None,
2388 encoding: Optional[bytes] = None,
2389 ref: Optional[Ref] = b"HEAD",
2390 merge_heads: Optional[list[ObjectID]] = None,
2391 no_verify: bool = False,
2392 sign: bool = False,
2393 ) -> bytes:
2394 """Create a new commit.
2396 This is a simplified implementation for in-memory repositories that
2397 doesn't support worktree operations or hooks.
2399 Args:
2400 message: Commit message
2401 committer: Committer fullname
2402 author: Author fullname
2403 commit_timestamp: Commit timestamp (defaults to now)
2404 commit_timezone: Commit timestamp timezone (defaults to GMT)
2405 author_timestamp: Author timestamp (defaults to commit timestamp)
2406 author_timezone: Author timestamp timezone (defaults to commit timezone)
2407 tree: SHA1 of the tree root to use
2408 encoding: Encoding
2409 ref: Optional ref to commit to (defaults to current branch).
2410 If None, creates a dangling commit without updating any ref.
2411 merge_heads: Merge heads
2412 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo)
2413 sign: GPG Sign the commit (ignored for MemoryRepo)
2415 Returns:
2416 New commit SHA1
2417 """
2418 import time
2420 from .objects import Commit
2422 if tree is None:
2423 raise ValueError("tree must be specified for MemoryRepo")
2425 c = Commit()
2426 if len(tree) != 40:
2427 raise ValueError("tree must be a 40-byte hex sha string")
2428 c.tree = tree
2430 config = self.get_config_stack()
2431 if merge_heads is None:
2432 merge_heads = []
2433 if committer is None:
2434 committer = get_user_identity(config, kind="COMMITTER")
2435 check_user_identity(committer)
2436 c.committer = committer
2437 if commit_timestamp is None:
2438 commit_timestamp = time.time()
2439 c.commit_time = int(commit_timestamp)
2440 if commit_timezone is None:
2441 commit_timezone = 0
2442 c.commit_timezone = commit_timezone
2443 if author is None:
2444 author = get_user_identity(config, kind="AUTHOR")
2445 c.author = author
2446 check_user_identity(author)
2447 if author_timestamp is None:
2448 author_timestamp = commit_timestamp
2449 c.author_time = int(author_timestamp)
2450 if author_timezone is None:
2451 author_timezone = commit_timezone
2452 c.author_timezone = author_timezone
2453 if encoding is None:
2454 try:
2455 encoding = config.get(("i18n",), "commitEncoding")
2456 except KeyError:
2457 pass
2458 if encoding is not None:
2459 c.encoding = encoding
2461 # Handle message (for MemoryRepo, we don't support callable messages)
2462 if callable(message):
2463 message = message(self, c)
2464 if message is None:
2465 raise ValueError("Message callback returned None")
2467 if message is None:
2468 raise ValueError("No commit message specified")
2470 c.message = message
2472 if ref is None:
2473 # Create a dangling commit
2474 c.parents = merge_heads
2475 self.object_store.add_object(c)
2476 else:
2477 try:
2478 old_head = self.refs[ref]
2479 c.parents = [old_head, *merge_heads]
2480 self.object_store.add_object(c)
2481 ok = self.refs.set_if_equals(
2482 ref,
2483 old_head,
2484 c.id,
2485 message=b"commit: " + message,
2486 committer=committer,
2487 timestamp=int(commit_timestamp),
2488 timezone=commit_timezone,
2489 )
2490 except KeyError:
2491 c.parents = merge_heads
2492 self.object_store.add_object(c)
2493 ok = self.refs.add_if_new(
2494 ref,
2495 c.id,
2496 message=b"commit: " + message,
2497 committer=committer,
2498 timestamp=int(commit_timestamp),
2499 timezone=commit_timezone,
2500 )
2501 if not ok:
2502 from .errors import CommitError
2504 raise CommitError(f"{ref!r} changed during commit")
2506 return c.id
2508 @classmethod
2509 def init_bare(
2510 cls,
2511 objects: Iterable[ShaFile],
2512 refs: Mapping[bytes, bytes],
2513 format: Optional[int] = None,
2514 ) -> "MemoryRepo":
2515 """Create a new bare repository in memory.
2517 Args:
2518 objects: Objects for the new repository,
2519 as iterable
2520 refs: Refs as dictionary, mapping names
2521 to object SHA1s
2522 format: Repository format version (defaults to 0)
2523 """
2524 ret = cls()
2525 for obj in objects:
2526 ret.object_store.add_object(obj)
2527 for refname, sha in refs.items():
2528 ret.refs.add_if_new(refname, sha)
2529 ret._init_files(bare=True, format=format)
2530 return ret