Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# repo.py -- For dealing with git repositories.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
24"""Repository access.
26This module contains the base class for git repositories
27(BaseRepo) and an implementation which uses a repository on
28local disk (Repo).
30"""
32import os
33import stat
34import sys
35import time
36import warnings
37from collections.abc import Callable, Generator, Iterable, Iterator, Mapping, Sequence
38from io import BytesIO
39from types import TracebackType
40from typing import (
41 TYPE_CHECKING,
42 Any,
43 BinaryIO,
44 TypeVar,
45)
47if TYPE_CHECKING:
48 # There are no circular imports here, but we try to defer imports as long
49 # as possible to reduce start-up time for anything that doesn't need
50 # these imports.
51 from .attrs import GitAttributes
52 from .config import ConditionMatcher, ConfigFile, StackedConfig
53 from .diff_tree import RenameDetector
54 from .filters import FilterBlobNormalizer, FilterContext
55 from .index import Index
56 from .notes import Notes
57 from .object_store import BaseObjectStore, GraphWalker
58 from .pack import UnpackedObject
59 from .rebase import RebaseStateManager
60 from .walk import Walker
61 from .worktree import WorkTree
63from . import reflog, replace_me
64from .errors import (
65 NoIndexPresent,
66 NotBlobError,
67 NotCommitError,
68 NotGitRepository,
69 NotTagError,
70 NotTreeError,
71 RefFormatError,
72)
73from .file import GitFile
74from .hooks import (
75 CommitMsgShellHook,
76 Hook,
77 PostCommitShellHook,
78 PostReceiveShellHook,
79 PreCommitShellHook,
80)
81from .object_store import (
82 DiskObjectStore,
83 MemoryObjectStore,
84 MissingObjectFinder,
85 ObjectStoreGraphWalker,
86 PackBasedObjectStore,
87 PackCapableObjectStore,
88 find_shallow,
89 peel_sha,
90)
91from .objects import (
92 Blob,
93 Commit,
94 ObjectID,
95 ShaFile,
96 Tag,
97 Tree,
98 check_hexsha,
99 valid_hexsha,
100)
101from .pack import generate_unpacked_objects
102from .refs import (
103 ANNOTATED_TAG_SUFFIX, # noqa: F401
104 LOCAL_TAG_PREFIX, # noqa: F401
105 SYMREF, # noqa: F401
106 DictRefsContainer,
107 DiskRefsContainer,
108 InfoRefsContainer, # noqa: F401
109 Ref,
110 RefsContainer,
111 _set_default_branch,
112 _set_head,
113 _set_origin_head,
114 check_ref_format, # noqa: F401
115 extract_branch_name,
116 is_per_worktree_ref,
117 local_branch_name,
118 read_packed_refs, # noqa: F401
119 read_packed_refs_with_peeled, # noqa: F401
120 serialize_refs,
121 write_packed_refs, # noqa: F401
122)
124CONTROLDIR = ".git"
125OBJECTDIR = "objects"
126DEFAULT_OFS_DELTA = True
128T = TypeVar("T", bound="ShaFile")
129REFSDIR = "refs"
130REFSDIR_TAGS = "tags"
131REFSDIR_HEADS = "heads"
132INDEX_FILENAME = "index"
133COMMONDIR = "commondir"
134GITDIR = "gitdir"
135WORKTREES = "worktrees"
137BASE_DIRECTORIES = [
138 ["branches"],
139 [REFSDIR],
140 [REFSDIR, REFSDIR_TAGS],
141 [REFSDIR, REFSDIR_HEADS],
142 ["hooks"],
143 ["info"],
144]
146DEFAULT_BRANCH = b"master"
149class InvalidUserIdentity(Exception):
150 """User identity is not of the format 'user <email>'."""
152 def __init__(self, identity: str) -> None:
153 """Initialize InvalidUserIdentity exception."""
154 self.identity = identity
157class DefaultIdentityNotFound(Exception):
158 """Default identity could not be determined."""
161# TODO(jelmer): Cache?
162def _get_default_identity() -> tuple[str, str]:
163 import socket
165 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
166 username = os.environ.get(name)
167 if username:
168 break
169 else:
170 username = None
172 try:
173 import pwd
174 except ImportError:
175 fullname = None
176 else:
177 try:
178 entry = pwd.getpwuid(os.getuid()) # type: ignore[attr-defined,unused-ignore]
179 except KeyError:
180 fullname = None
181 else:
182 if getattr(entry, "gecos", None):
183 fullname = entry.pw_gecos.split(",")[0]
184 else:
185 fullname = None
186 if username is None:
187 username = entry.pw_name
188 if not fullname:
189 if username is None:
190 raise DefaultIdentityNotFound("no username found")
191 fullname = username
192 email = os.environ.get("EMAIL")
193 if email is None:
194 if username is None:
195 raise DefaultIdentityNotFound("no username found")
196 email = f"{username}@{socket.gethostname()}"
197 return (fullname, email)
200def get_user_identity(config: "StackedConfig", kind: str | None = None) -> bytes:
201 """Determine the identity to use for new commits.
203 If kind is set, this first checks
204 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL.
206 If those variables are not set, then it will fall back
207 to reading the user.name and user.email settings from
208 the specified configuration.
210 If that also fails, then it will fall back to using
211 the current users' identity as obtained from the host
212 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f).
214 Args:
215 config: Configuration stack to read from
216 kind: Optional kind to return identity for,
217 usually either "AUTHOR" or "COMMITTER".
219 Returns:
220 A user identity
221 """
222 user: bytes | None = None
223 email: bytes | None = None
224 if kind:
225 user_uc = os.environ.get("GIT_" + kind + "_NAME")
226 if user_uc is not None:
227 user = user_uc.encode("utf-8")
228 email_uc = os.environ.get("GIT_" + kind + "_EMAIL")
229 if email_uc is not None:
230 email = email_uc.encode("utf-8")
231 if user is None:
232 try:
233 user = config.get(("user",), "name")
234 except KeyError:
235 user = None
236 if email is None:
237 try:
238 email = config.get(("user",), "email")
239 except KeyError:
240 email = None
241 default_user, default_email = _get_default_identity()
242 if user is None:
243 user = default_user.encode("utf-8")
244 if email is None:
245 email = default_email.encode("utf-8")
246 if email.startswith(b"<") and email.endswith(b">"):
247 email = email[1:-1]
248 return user + b" <" + email + b">"
251def check_user_identity(identity: bytes) -> None:
252 """Verify that a user identity is formatted correctly.
254 Args:
255 identity: User identity bytestring
256 Raises:
257 InvalidUserIdentity: Raised when identity is invalid
258 """
259 try:
260 _fst, snd = identity.split(b" <", 1)
261 except ValueError as exc:
262 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc
263 if b">" not in snd:
264 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
265 if b"\0" in identity or b"\n" in identity:
266 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
269def parse_graftpoints(
270 graftpoints: Iterable[bytes],
271) -> dict[bytes, list[bytes]]:
272 """Convert a list of graftpoints into a dict.
274 Args:
275 graftpoints: Iterator of graftpoint lines
277 Each line is formatted as:
278 <commit sha1> <parent sha1> [<parent sha1>]*
280 Resulting dictionary is:
281 <commit sha1>: [<parent sha1>*]
283 https://git.wiki.kernel.org/index.php/GraftPoint
284 """
285 grafts = {}
286 for line in graftpoints:
287 raw_graft = line.split(None, 1)
289 commit = raw_graft[0]
290 if len(raw_graft) == 2:
291 parents = raw_graft[1].split()
292 else:
293 parents = []
295 for sha in [commit, *parents]:
296 check_hexsha(sha, "Invalid graftpoint")
298 grafts[commit] = parents
299 return grafts
302def serialize_graftpoints(graftpoints: Mapping[bytes, Sequence[bytes]]) -> bytes:
303 """Convert a dictionary of grafts into string.
305 The graft dictionary is:
306 <commit sha1>: [<parent sha1>*]
308 Each line is formatted as:
309 <commit sha1> <parent sha1> [<parent sha1>]*
311 https://git.wiki.kernel.org/index.php/GraftPoint
313 """
314 graft_lines = []
315 for commit, parents in graftpoints.items():
316 if parents:
317 graft_lines.append(commit + b" " + b" ".join(parents))
318 else:
319 graft_lines.append(commit)
320 return b"\n".join(graft_lines)
323def _set_filesystem_hidden(path: str) -> None:
324 """Mark path as to be hidden if supported by platform and filesystem.
326 On win32 uses SetFileAttributesW api:
327 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw>
328 """
329 if sys.platform == "win32":
330 import ctypes
331 from ctypes.wintypes import BOOL, DWORD, LPCWSTR
333 FILE_ATTRIBUTE_HIDDEN = 2
334 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)(
335 ("SetFileAttributesW", ctypes.windll.kernel32)
336 )
338 if isinstance(path, bytes):
339 path = os.fsdecode(path)
340 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN):
341 pass # Could raise or log `ctypes.WinError()` here
343 # Could implement other platform specific filesystem hiding here
346def parse_shared_repository(
347 value: str | bytes | bool,
348) -> tuple[int | None, int | None]:
349 """Parse core.sharedRepository configuration value.
351 Args:
352 value: Configuration value (string, bytes, or boolean)
354 Returns:
355 tuple of (file_mask, directory_mask) or (None, None) if not shared
357 The masks are permission bits to apply via chmod.
358 """
359 if isinstance(value, bytes):
360 value = value.decode("utf-8", errors="replace")
362 # Handle boolean values
363 if isinstance(value, bool):
364 if value:
365 # true = group (same as "group")
366 return (0o664, 0o2775)
367 else:
368 # false = umask (use system umask, no adjustment)
369 return (None, None)
371 # Handle string values
372 value_lower = value.lower()
374 if value_lower in ("false", "0", ""):
375 # Use umask (no adjustment)
376 return (None, None)
378 if value_lower in ("true", "1", "group"):
379 # Group writable (with setgid bit)
380 return (0o664, 0o2775)
382 if value_lower in ("all", "world", "everybody", "2"):
383 # World readable/writable (with setgid bit)
384 return (0o666, 0o2777)
386 if value_lower == "umask":
387 # Explicitly use umask
388 return (None, None)
390 # Try to parse as octal
391 if value.startswith("0"):
392 try:
393 mode = int(value, 8)
394 # For directories, add execute bits where read bits are set
395 # and add setgid bit for shared repositories
396 dir_mode = mode | 0o2000 # Add setgid bit
397 if mode & 0o004:
398 dir_mode |= 0o001
399 if mode & 0o040:
400 dir_mode |= 0o010
401 if mode & 0o400:
402 dir_mode |= 0o100
403 return (mode, dir_mode)
404 except ValueError:
405 pass
407 # Default to umask for unrecognized values
408 return (None, None)
411class ParentsProvider:
412 """Provider for commit parent information."""
414 def __init__(
415 self,
416 store: "BaseObjectStore",
417 grafts: dict[bytes, list[bytes]] = {},
418 shallows: Iterable[bytes] = [],
419 ) -> None:
420 """Initialize ParentsProvider.
422 Args:
423 store: Object store to use
424 grafts: Graft information
425 shallows: Shallow commit SHAs
426 """
427 self.store = store
428 self.grafts = grafts
429 self.shallows = set(shallows)
431 # Get commit graph once at initialization for performance
432 self.commit_graph = store.get_commit_graph()
434 def get_parents(
435 self, commit_id: bytes, commit: Commit | None = None
436 ) -> list[bytes]:
437 """Get parents for a commit using the parents provider."""
438 try:
439 return self.grafts[commit_id]
440 except KeyError:
441 pass
442 if commit_id in self.shallows:
443 return []
445 # Try to use commit graph for faster parent lookup
446 if self.commit_graph:
447 parents = self.commit_graph.get_parents(commit_id)
448 if parents is not None:
449 return parents
451 # Fallback to reading the commit object
452 if commit is None:
453 obj = self.store[commit_id]
454 assert isinstance(obj, Commit)
455 commit = obj
456 parents = commit.parents
457 assert isinstance(parents, list)
458 return parents
461class BaseRepo:
462 """Base class for a git repository.
464 This base class is meant to be used for Repository implementations that e.g.
465 work on top of a different transport than a standard filesystem path.
467 Attributes:
468 object_store: Dictionary-like object for accessing
469 the objects
470 refs: Dictionary-like object with the refs in this
471 repository
472 """
474 def __init__(
475 self, object_store: "PackCapableObjectStore", refs: RefsContainer
476 ) -> None:
477 """Open a repository.
479 This shouldn't be called directly, but rather through one of the
480 base classes, such as MemoryRepo or Repo.
482 Args:
483 object_store: Object store to use
484 refs: Refs container to use
485 """
486 self.object_store = object_store
487 self.refs = refs
489 self._graftpoints: dict[bytes, list[bytes]] = {}
490 self.hooks: dict[str, Hook] = {}
492 def _determine_file_mode(self) -> bool:
493 """Probe the file-system to determine whether permissions can be trusted.
495 Returns: True if permissions can be trusted, False otherwise.
496 """
497 raise NotImplementedError(self._determine_file_mode)
499 def _determine_symlinks(self) -> bool:
500 """Probe the filesystem to determine whether symlinks can be created.
502 Returns: True if symlinks can be created, False otherwise.
503 """
504 # For now, just mimic the old behaviour
505 return sys.platform != "win32"
507 def _init_files(
508 self,
509 bare: bool,
510 symlinks: bool | None = None,
511 format: int | None = None,
512 shared_repository: str | bool | None = None,
513 ) -> None:
514 """Initialize a default set of named files."""
515 from .config import ConfigFile
517 self._put_named_file("description", b"Unnamed repository")
518 f = BytesIO()
519 cf = ConfigFile()
520 if format is None:
521 format = 0
522 if format not in (0, 1):
523 raise ValueError(f"Unsupported repository format version: {format}")
524 cf.set("core", "repositoryformatversion", str(format))
525 if self._determine_file_mode():
526 cf.set("core", "filemode", True)
527 else:
528 cf.set("core", "filemode", False)
530 if symlinks is None and not bare:
531 symlinks = self._determine_symlinks()
533 if symlinks is False:
534 cf.set("core", "symlinks", symlinks)
536 cf.set("core", "bare", bare)
537 cf.set("core", "logallrefupdates", True)
539 # Set shared repository if specified
540 if shared_repository is not None:
541 if isinstance(shared_repository, bool):
542 cf.set("core", "sharedRepository", shared_repository)
543 else:
544 cf.set("core", "sharedRepository", shared_repository)
546 cf.write_to_file(f)
547 self._put_named_file("config", f.getvalue())
548 self._put_named_file(os.path.join("info", "exclude"), b"")
550 def get_named_file(self, path: str) -> BinaryIO | None:
551 """Get a file from the control dir with a specific name.
553 Although the filename should be interpreted as a filename relative to
554 the control dir in a disk-based Repo, the object returned need not be
555 pointing to a file in that location.
557 Args:
558 path: The path to the file, relative to the control dir.
559 Returns: An open file object, or None if the file does not exist.
560 """
561 raise NotImplementedError(self.get_named_file)
563 def _put_named_file(self, path: str, contents: bytes) -> None:
564 """Write a file to the control dir with the given name and contents.
566 Args:
567 path: The path to the file, relative to the control dir.
568 contents: A string to write to the file.
569 """
570 raise NotImplementedError(self._put_named_file)
572 def _del_named_file(self, path: str) -> None:
573 """Delete a file in the control directory with the given name."""
574 raise NotImplementedError(self._del_named_file)
576 def open_index(self) -> "Index":
577 """Open the index for this repository.
579 Raises:
580 NoIndexPresent: If no index is present
581 Returns: The matching `Index`
582 """
583 raise NotImplementedError(self.open_index)
585 def fetch(
586 self,
587 target: "BaseRepo",
588 determine_wants: Callable[[Mapping[bytes, bytes], int | None], list[bytes]]
589 | None = None,
590 progress: Callable[..., None] | None = None,
591 depth: int | None = None,
592 ) -> dict[bytes, bytes]:
593 """Fetch objects into another repository.
595 Args:
596 target: The target repository
597 determine_wants: Optional function to determine what refs to
598 fetch.
599 progress: Optional progress function
600 depth: Optional shallow fetch depth
601 Returns: The local refs
602 """
603 if determine_wants is None:
604 determine_wants = target.object_store.determine_wants_all
605 count, pack_data = self.fetch_pack_data(
606 determine_wants,
607 target.get_graph_walker(),
608 progress=progress,
609 depth=depth,
610 )
611 target.object_store.add_pack_data(count, pack_data, progress)
612 return self.get_refs()
614 def fetch_pack_data(
615 self,
616 determine_wants: Callable[[Mapping[bytes, bytes], int | None], list[bytes]],
617 graph_walker: "GraphWalker",
618 progress: Callable[[bytes], None] | None,
619 *,
620 get_tagged: Callable[[], dict[bytes, bytes]] | None = None,
621 depth: int | None = None,
622 ) -> tuple[int, Iterator["UnpackedObject"]]:
623 """Fetch the pack data required for a set of revisions.
625 Args:
626 determine_wants: Function that takes a dictionary with heads
627 and returns the list of heads to fetch.
628 graph_walker: Object that can iterate over the list of revisions
629 to fetch and has an "ack" method that will be called to acknowledge
630 that a revision is present.
631 progress: Simple progress function that will be called with
632 updated progress strings.
633 get_tagged: Function that returns a dict of pointed-to sha ->
634 tag sha for including tags.
635 depth: Shallow fetch depth
636 Returns: count and iterator over pack data
637 """
638 missing_objects = self.find_missing_objects(
639 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth
640 )
641 if missing_objects is None:
642 return 0, iter([])
643 remote_has = missing_objects.get_remote_has()
644 object_ids = list(missing_objects)
645 return len(object_ids), generate_unpacked_objects(
646 self.object_store, object_ids, progress=progress, other_haves=remote_has
647 )
649 def find_missing_objects(
650 self,
651 determine_wants: Callable[[Mapping[bytes, bytes], int | None], list[bytes]],
652 graph_walker: "GraphWalker",
653 progress: Callable[[bytes], None] | None,
654 *,
655 get_tagged: Callable[[], dict[bytes, bytes]] | None = None,
656 depth: int | None = None,
657 ) -> MissingObjectFinder | None:
658 """Fetch the missing objects required for a set of revisions.
660 Args:
661 determine_wants: Function that takes a dictionary with heads
662 and returns the list of heads to fetch.
663 graph_walker: Object that can iterate over the list of revisions
664 to fetch and has an "ack" method that will be called to acknowledge
665 that a revision is present.
666 progress: Simple progress function that will be called with
667 updated progress strings.
668 get_tagged: Function that returns a dict of pointed-to sha ->
669 tag sha for including tags.
670 depth: Shallow fetch depth
671 Returns: iterator over objects, with __len__ implemented
672 """
673 refs = serialize_refs(self.object_store, self.get_refs())
675 wants = determine_wants(refs, depth)
676 if not isinstance(wants, list):
677 raise TypeError("determine_wants() did not return a list")
679 current_shallow = set(getattr(graph_walker, "shallow", set()))
681 if depth not in (None, 0):
682 assert depth is not None
683 shallow, not_shallow = find_shallow(self.object_store, wants, depth)
684 # Only update if graph_walker has shallow attribute
685 if hasattr(graph_walker, "shallow"):
686 graph_walker.shallow.update(shallow - not_shallow)
687 new_shallow = graph_walker.shallow - current_shallow
688 unshallow = not_shallow & current_shallow
689 setattr(graph_walker, "unshallow", unshallow)
690 if hasattr(graph_walker, "update_shallow"):
691 graph_walker.update_shallow(new_shallow, unshallow)
692 else:
693 unshallow = getattr(graph_walker, "unshallow", set())
695 if wants == []:
696 # TODO(dborowitz): find a way to short-circuit that doesn't change
697 # this interface.
699 if getattr(graph_walker, "shallow", set()) or unshallow:
700 # Do not send a pack in shallow short-circuit path
701 return None
703 # Return an actual MissingObjectFinder with empty wants
704 return MissingObjectFinder(
705 self.object_store,
706 haves=[],
707 wants=[],
708 )
710 # If the graph walker is set up with an implementation that can
711 # ACK/NAK to the wire, it will write data to the client through
712 # this call as a side-effect.
713 haves = self.object_store.find_common_revisions(graph_walker)
715 # Deal with shallow requests separately because the haves do
716 # not reflect what objects are missing
717 if getattr(graph_walker, "shallow", set()) or unshallow:
718 # TODO: filter the haves commits from iter_shas. the specific
719 # commits aren't missing.
720 haves = []
722 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow)
724 def get_parents(commit: Commit) -> list[bytes]:
725 """Get parents for a commit using the parents provider.
727 Args:
728 commit: Commit object
730 Returns:
731 List of parent commit SHAs
732 """
733 return parents_provider.get_parents(commit.id, commit)
735 return MissingObjectFinder(
736 self.object_store,
737 haves=haves,
738 wants=wants,
739 shallow=getattr(graph_walker, "shallow", set()),
740 progress=progress,
741 get_tagged=get_tagged,
742 get_parents=get_parents,
743 )
745 def generate_pack_data(
746 self,
747 have: set[ObjectID],
748 want: set[ObjectID],
749 *,
750 shallow: set[ObjectID] | None = None,
751 progress: Callable[[str], None] | None = None,
752 ofs_delta: bool | None = None,
753 ) -> tuple[int, Iterator["UnpackedObject"]]:
754 """Generate pack data objects for a set of wants/haves.
756 Args:
757 have: List of SHA1s of objects that should not be sent
758 want: List of SHA1s of objects that should be sent
759 shallow: Set of shallow commit SHA1s to skip (defaults to repo's shallow commits)
760 ofs_delta: Whether OFS deltas can be included
761 progress: Optional progress reporting method
762 """
763 if shallow is None:
764 shallow = self.get_shallow()
765 return self.object_store.generate_pack_data(
766 have,
767 want,
768 shallow=shallow,
769 progress=progress,
770 ofs_delta=ofs_delta if ofs_delta is not None else DEFAULT_OFS_DELTA,
771 )
773 def get_graph_walker(
774 self, heads: list[ObjectID] | None = None
775 ) -> ObjectStoreGraphWalker:
776 """Retrieve a graph walker.
778 A graph walker is used by a remote repository (or proxy)
779 to find out which objects are present in this repository.
781 Args:
782 heads: Repository heads to use (optional)
783 Returns: A graph walker object
784 """
785 if heads is None:
786 heads = [
787 sha
788 for sha in self.refs.as_dict(b"refs/heads").values()
789 if sha in self.object_store
790 ]
791 parents_provider = ParentsProvider(self.object_store)
792 return ObjectStoreGraphWalker(
793 heads,
794 parents_provider.get_parents,
795 shallow=self.get_shallow(),
796 update_shallow=self.update_shallow,
797 )
799 def get_refs(self) -> dict[bytes, bytes]:
800 """Get dictionary with all refs.
802 Returns: A ``dict`` mapping ref names to SHA1s
803 """
804 return self.refs.as_dict()
806 def head(self) -> bytes:
807 """Return the SHA1 pointed at by HEAD."""
808 # TODO: move this method to WorkTree
809 return self.refs[b"HEAD"]
811 def _get_object(self, sha: bytes, cls: type[T]) -> T:
812 assert len(sha) in (20, 40)
813 ret = self.get_object(sha)
814 if not isinstance(ret, cls):
815 if cls is Commit:
816 raise NotCommitError(ret.id)
817 elif cls is Blob:
818 raise NotBlobError(ret.id)
819 elif cls is Tree:
820 raise NotTreeError(ret.id)
821 elif cls is Tag:
822 raise NotTagError(ret.id)
823 else:
824 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}")
825 return ret
827 def get_object(self, sha: bytes) -> ShaFile:
828 """Retrieve the object with the specified SHA.
830 Args:
831 sha: SHA to retrieve
832 Returns: A ShaFile object
833 Raises:
834 KeyError: when the object can not be found
835 """
836 return self.object_store[sha]
838 def parents_provider(self) -> ParentsProvider:
839 """Get a parents provider for this repository.
841 Returns:
842 ParentsProvider instance configured with grafts and shallows
843 """
844 return ParentsProvider(
845 self.object_store,
846 grafts=self._graftpoints,
847 shallows=self.get_shallow(),
848 )
850 def get_parents(self, sha: bytes, commit: Commit | None = None) -> list[bytes]:
851 """Retrieve the parents of a specific commit.
853 If the specific commit is a graftpoint, the graft parents
854 will be returned instead.
856 Args:
857 sha: SHA of the commit for which to retrieve the parents
858 commit: Optional commit matching the sha
859 Returns: List of parents
860 """
861 return self.parents_provider().get_parents(sha, commit)
863 def get_config(self) -> "ConfigFile":
864 """Retrieve the config object.
866 Returns: `ConfigFile` object for the ``.git/config`` file.
867 """
868 raise NotImplementedError(self.get_config)
870 def get_worktree_config(self) -> "ConfigFile":
871 """Retrieve the worktree config object."""
872 raise NotImplementedError(self.get_worktree_config)
874 def get_description(self) -> bytes | None:
875 """Retrieve the description for this repository.
877 Returns: Bytes with the description of the repository
878 as set by the user.
879 """
880 raise NotImplementedError(self.get_description)
882 def set_description(self, description: bytes) -> None:
883 """Set the description for this repository.
885 Args:
886 description: Text to set as description for this repository.
887 """
888 raise NotImplementedError(self.set_description)
890 def get_rebase_state_manager(self) -> "RebaseStateManager":
891 """Get the appropriate rebase state manager for this repository.
893 Returns: RebaseStateManager instance
894 """
895 raise NotImplementedError(self.get_rebase_state_manager)
897 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
898 """Return a BlobNormalizer object for checkin/checkout operations.
900 Returns: BlobNormalizer instance
901 """
902 raise NotImplementedError(self.get_blob_normalizer)
904 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
905 """Read gitattributes for the repository.
907 Args:
908 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
910 Returns:
911 GitAttributes object that can be used to match paths
912 """
913 raise NotImplementedError(self.get_gitattributes)
915 def get_config_stack(self) -> "StackedConfig":
916 """Return a config stack for this repository.
918 This stack accesses the configuration for both this repository
919 itself (.git/config) and the global configuration, which usually
920 lives in ~/.gitconfig.
922 Returns: `Config` instance for this repository
923 """
924 from .config import ConfigFile, StackedConfig
926 local_config = self.get_config()
927 backends: list[ConfigFile] = [local_config]
928 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
929 backends.append(self.get_worktree_config())
931 backends += StackedConfig.default_backends()
932 return StackedConfig(backends, writable=local_config)
934 def get_shallow(self) -> set[ObjectID]:
935 """Get the set of shallow commits.
937 Returns: Set of shallow commits.
938 """
939 f = self.get_named_file("shallow")
940 if f is None:
941 return set()
942 with f:
943 return {line.strip() for line in f}
945 def update_shallow(
946 self, new_shallow: set[bytes] | None, new_unshallow: set[bytes] | None
947 ) -> None:
948 """Update the list of shallow objects.
950 Args:
951 new_shallow: Newly shallow objects
952 new_unshallow: Newly no longer shallow objects
953 """
954 shallow = self.get_shallow()
955 if new_shallow:
956 shallow.update(new_shallow)
957 if new_unshallow:
958 shallow.difference_update(new_unshallow)
959 if shallow:
960 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow]))
961 else:
962 self._del_named_file("shallow")
964 def get_peeled(self, ref: Ref) -> ObjectID:
965 """Get the peeled value of a ref.
967 Args:
968 ref: The refname to peel.
969 Returns: The fully-peeled SHA1 of a tag object, after peeling all
970 intermediate tags; if the original ref does not point to a tag,
971 this will equal the original SHA1.
972 """
973 cached = self.refs.get_peeled(ref)
974 if cached is not None:
975 return cached
976 return peel_sha(self.object_store, self.refs[ref])[1].id
978 @property
979 def notes(self) -> "Notes":
980 """Access notes functionality for this repository.
982 Returns:
983 Notes object for accessing notes
984 """
985 from .notes import Notes
987 return Notes(self.object_store, self.refs)
989 def get_walker(
990 self,
991 include: Sequence[bytes] | None = None,
992 exclude: Sequence[bytes] | None = None,
993 order: str = "date",
994 reverse: bool = False,
995 max_entries: int | None = None,
996 paths: Sequence[bytes] | None = None,
997 rename_detector: "RenameDetector | None" = None,
998 follow: bool = False,
999 since: int | None = None,
1000 until: int | None = None,
1001 queue_cls: type | None = None,
1002 ) -> "Walker":
1003 """Obtain a walker for this repository.
1005 Args:
1006 include: Iterable of SHAs of commits to include along with their
1007 ancestors. Defaults to [HEAD]
1008 exclude: Iterable of SHAs of commits to exclude along with their
1009 ancestors, overriding includes.
1010 order: ORDER_* constant specifying the order of results.
1011 Anything other than ORDER_DATE may result in O(n) memory usage.
1012 reverse: If True, reverse the order of output, requiring O(n)
1013 memory.
1014 max_entries: The maximum number of entries to yield, or None for
1015 no limit.
1016 paths: Iterable of file or subtree paths to show entries for.
1017 rename_detector: diff.RenameDetector object for detecting
1018 renames.
1019 follow: If True, follow path across renames/copies. Forces a
1020 default rename_detector.
1021 since: Timestamp to list commits after.
1022 until: Timestamp to list commits before.
1023 queue_cls: A class to use for a queue of commits, supporting the
1024 iterator protocol. The constructor takes a single argument, the Walker.
1026 Returns: A `Walker` object
1027 """
1028 from .walk import Walker, _CommitTimeQueue
1030 if include is None:
1031 include = [self.head()]
1033 # Pass all arguments to Walker explicitly to avoid type issues with **kwargs
1034 return Walker(
1035 self.object_store,
1036 include,
1037 exclude=exclude,
1038 order=order,
1039 reverse=reverse,
1040 max_entries=max_entries,
1041 paths=paths,
1042 rename_detector=rename_detector,
1043 follow=follow,
1044 since=since,
1045 until=until,
1046 get_parents=lambda commit: self.get_parents(commit.id, commit),
1047 queue_cls=queue_cls if queue_cls is not None else _CommitTimeQueue,
1048 )
1050 def __getitem__(self, name: ObjectID | Ref) -> "ShaFile":
1051 """Retrieve a Git object by SHA1 or ref.
1053 Args:
1054 name: A Git object SHA1 or a ref name
1055 Returns: A `ShaFile` object, such as a Commit or Blob
1056 Raises:
1057 KeyError: when the specified ref or object does not exist
1058 """
1059 if not isinstance(name, bytes):
1060 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}")
1061 if len(name) in (20, 40):
1062 try:
1063 return self.object_store[name]
1064 except (KeyError, ValueError):
1065 pass
1066 try:
1067 return self.object_store[self.refs[name]]
1068 except RefFormatError as exc:
1069 raise KeyError(name) from exc
1071 def __contains__(self, name: bytes) -> bool:
1072 """Check if a specific Git object or ref is present.
1074 Args:
1075 name: Git object SHA1 or ref name
1076 """
1077 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)):
1078 return name in self.object_store or name in self.refs
1079 else:
1080 return name in self.refs
1082 def __setitem__(self, name: bytes, value: ShaFile | bytes) -> None:
1083 """Set a ref.
1085 Args:
1086 name: ref name
1087 value: Ref value - either a ShaFile object, or a hex sha
1088 """
1089 if name.startswith(b"refs/") or name == b"HEAD":
1090 if isinstance(value, ShaFile):
1091 self.refs[name] = value.id
1092 elif isinstance(value, bytes):
1093 self.refs[name] = value
1094 else:
1095 raise TypeError(value)
1096 else:
1097 raise ValueError(name)
1099 def __delitem__(self, name: bytes) -> None:
1100 """Remove a ref.
1102 Args:
1103 name: Name of the ref to remove
1104 """
1105 if name.startswith(b"refs/") or name == b"HEAD":
1106 del self.refs[name]
1107 else:
1108 raise ValueError(name)
1110 def _get_user_identity(
1111 self, config: "StackedConfig", kind: str | None = None
1112 ) -> bytes:
1113 """Determine the identity to use for new commits."""
1114 warnings.warn(
1115 "use get_user_identity() rather than Repo._get_user_identity",
1116 DeprecationWarning,
1117 )
1118 return get_user_identity(config)
1120 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None:
1121 """Add or modify graftpoints.
1123 Args:
1124 updated_graftpoints: Dict of commit shas to list of parent shas
1125 """
1126 # Simple validation
1127 for commit, parents in updated_graftpoints.items():
1128 for sha in [commit, *parents]:
1129 check_hexsha(sha, "Invalid graftpoint")
1131 self._graftpoints.update(updated_graftpoints)
1133 def _remove_graftpoints(self, to_remove: Sequence[bytes] = ()) -> None:
1134 """Remove graftpoints.
1136 Args:
1137 to_remove: List of commit shas
1138 """
1139 for sha in to_remove:
1140 del self._graftpoints[sha]
1142 def _read_heads(self, name: str) -> list[bytes]:
1143 f = self.get_named_file(name)
1144 if f is None:
1145 return []
1146 with f:
1147 return [line.strip() for line in f.readlines() if line.strip()]
1149 def get_worktree(self) -> "WorkTree":
1150 """Get the working tree for this repository.
1152 Returns:
1153 WorkTree instance for performing working tree operations
1155 Raises:
1156 NotImplementedError: If the repository doesn't support working trees
1157 """
1158 raise NotImplementedError(
1159 "Working tree operations not supported by this repository type"
1160 )
1162 @replace_me(remove_in="0.26.0")
1163 def do_commit(
1164 self,
1165 message: bytes | None = None,
1166 committer: bytes | None = None,
1167 author: bytes | None = None,
1168 commit_timestamp: float | None = None,
1169 commit_timezone: int | None = None,
1170 author_timestamp: float | None = None,
1171 author_timezone: int | None = None,
1172 tree: ObjectID | None = None,
1173 encoding: bytes | None = None,
1174 ref: Ref | None = b"HEAD",
1175 merge_heads: list[ObjectID] | None = None,
1176 no_verify: bool = False,
1177 sign: bool = False,
1178 ) -> bytes:
1179 """Create a new commit.
1181 If not specified, committer and author default to
1182 get_user_identity(..., 'COMMITTER')
1183 and get_user_identity(..., 'AUTHOR') respectively.
1185 Args:
1186 message: Commit message (bytes or callable that takes (repo, commit)
1187 and returns bytes)
1188 committer: Committer fullname
1189 author: Author fullname
1190 commit_timestamp: Commit timestamp (defaults to now)
1191 commit_timezone: Commit timestamp timezone (defaults to GMT)
1192 author_timestamp: Author timestamp (defaults to commit
1193 timestamp)
1194 author_timezone: Author timestamp timezone
1195 (defaults to commit timestamp timezone)
1196 tree: SHA1 of the tree root to use (if not specified the
1197 current index will be committed).
1198 encoding: Encoding
1199 ref: Optional ref to commit to (defaults to current branch).
1200 If None, creates a dangling commit without updating any ref.
1201 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
1202 no_verify: Skip pre-commit and commit-msg hooks
1203 sign: GPG Sign the commit (bool, defaults to False,
1204 pass True to use default GPG key,
1205 pass a str containing Key ID to use a specific GPG key)
1207 Returns:
1208 New commit SHA1
1209 """
1210 return self.get_worktree().commit(
1211 message=message,
1212 committer=committer,
1213 author=author,
1214 commit_timestamp=commit_timestamp,
1215 commit_timezone=commit_timezone,
1216 author_timestamp=author_timestamp,
1217 author_timezone=author_timezone,
1218 tree=tree,
1219 encoding=encoding,
1220 ref=ref,
1221 merge_heads=merge_heads,
1222 no_verify=no_verify,
1223 sign=sign,
1224 )
1227def read_gitfile(f: BinaryIO) -> str:
1228 """Read a ``.git`` file.
1230 The first line of the file should start with "gitdir: "
1232 Args:
1233 f: File-like object to read from
1234 Returns: A path
1235 """
1236 cs = f.read()
1237 if not cs.startswith(b"gitdir: "):
1238 raise ValueError("Expected file to start with 'gitdir: '")
1239 return cs[len(b"gitdir: ") :].rstrip(b"\r\n").decode("utf-8")
1242class UnsupportedVersion(Exception):
1243 """Unsupported repository version."""
1245 def __init__(self, version: int) -> None:
1246 """Initialize UnsupportedVersion exception.
1248 Args:
1249 version: The unsupported repository version
1250 """
1251 self.version = version
1254class UnsupportedExtension(Exception):
1255 """Unsupported repository extension."""
1257 def __init__(self, extension: str) -> None:
1258 """Initialize UnsupportedExtension exception.
1260 Args:
1261 extension: The unsupported repository extension
1262 """
1263 self.extension = extension
1266class Repo(BaseRepo):
1267 """A git repository backed by local disk.
1269 To open an existing repository, call the constructor with
1270 the path of the repository.
1272 To create a new repository, use the Repo.init class method.
1274 Note that a repository object may hold on to resources such
1275 as file handles for performance reasons; call .close() to free
1276 up those resources.
1278 Attributes:
1279 path: Path to the working copy (if it exists) or repository control
1280 directory (if the repository is bare)
1281 bare: Whether this is a bare repository
1282 """
1284 path: str
1285 bare: bool
1286 object_store: DiskObjectStore
1287 filter_context: "FilterContext | None"
1289 def __init__(
1290 self,
1291 root: str | bytes | os.PathLike[str],
1292 object_store: PackBasedObjectStore | None = None,
1293 bare: bool | None = None,
1294 ) -> None:
1295 """Open a repository on disk.
1297 Args:
1298 root: Path to the repository's root.
1299 object_store: ObjectStore to use; if omitted, we use the
1300 repository's default object store
1301 bare: True if this is a bare repository.
1302 """
1303 root = os.fspath(root)
1304 if isinstance(root, bytes):
1305 root = os.fsdecode(root)
1306 hidden_path = os.path.join(root, CONTROLDIR)
1307 if bare is None:
1308 if os.path.isfile(hidden_path) or os.path.isdir(
1309 os.path.join(hidden_path, OBJECTDIR)
1310 ):
1311 bare = False
1312 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(
1313 os.path.join(root, REFSDIR)
1314 ):
1315 bare = True
1316 else:
1317 raise NotGitRepository(
1318 "No git repository was found at {path}".format(**dict(path=root))
1319 )
1321 self.bare = bare
1322 if bare is False:
1323 if os.path.isfile(hidden_path):
1324 with open(hidden_path, "rb") as f:
1325 path = read_gitfile(f)
1326 self._controldir = os.path.join(root, path)
1327 else:
1328 self._controldir = hidden_path
1329 else:
1330 self._controldir = root
1331 commondir = self.get_named_file(COMMONDIR)
1332 if commondir is not None:
1333 with commondir:
1334 self._commondir = os.path.join(
1335 self.controldir(),
1336 os.fsdecode(commondir.read().rstrip(b"\r\n")),
1337 )
1338 else:
1339 self._commondir = self._controldir
1340 self.path = root
1342 # Initialize refs early so they're available for config condition matchers
1343 self.refs = DiskRefsContainer(
1344 self.commondir(), self._controldir, logger=self._write_reflog
1345 )
1347 # Initialize worktrees container
1348 from .worktree import WorkTreeContainer
1350 self.worktrees = WorkTreeContainer(self)
1352 config = self.get_config()
1353 try:
1354 repository_format_version = config.get("core", "repositoryformatversion")
1355 format_version = (
1356 0
1357 if repository_format_version is None
1358 else int(repository_format_version)
1359 )
1360 except KeyError:
1361 format_version = 0
1363 if format_version not in (0, 1):
1364 raise UnsupportedVersion(format_version)
1366 # Track extensions we encounter
1367 has_reftable_extension = False
1368 for extension, value in config.items((b"extensions",)):
1369 if extension.lower() == b"refstorage":
1370 if value == b"reftable":
1371 has_reftable_extension = True
1372 else:
1373 raise UnsupportedExtension(f"refStorage = {value.decode()}")
1374 elif extension.lower() not in (b"worktreeconfig",):
1375 raise UnsupportedExtension(extension.decode("utf-8"))
1377 if object_store is None:
1378 # Get shared repository permissions from config
1379 try:
1380 shared_value = config.get(("core",), "sharedRepository")
1381 file_mode, dir_mode = parse_shared_repository(shared_value)
1382 except KeyError:
1383 file_mode, dir_mode = None, None
1385 object_store = DiskObjectStore.from_config(
1386 os.path.join(self.commondir(), OBJECTDIR),
1387 config,
1388 file_mode=file_mode,
1389 dir_mode=dir_mode,
1390 )
1392 # Use reftable if extension is configured
1393 if has_reftable_extension:
1394 from .reftable import ReftableRefsContainer
1396 self.refs = ReftableRefsContainer(self.commondir())
1397 # Update worktrees container after refs change
1398 self.worktrees = WorkTreeContainer(self)
1399 BaseRepo.__init__(self, object_store, self.refs)
1401 self._graftpoints = {}
1402 graft_file = self.get_named_file(
1403 os.path.join("info", "grafts"), basedir=self.commondir()
1404 )
1405 if graft_file:
1406 with graft_file:
1407 self._graftpoints.update(parse_graftpoints(graft_file))
1408 graft_file = self.get_named_file("shallow", basedir=self.commondir())
1409 if graft_file:
1410 with graft_file:
1411 self._graftpoints.update(parse_graftpoints(graft_file))
1413 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir())
1414 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir())
1415 self.hooks["post-commit"] = PostCommitShellHook(self.controldir())
1416 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir())
1418 # Initialize filter context as None, will be created lazily
1419 self.filter_context = None
1421 def get_worktree(self) -> "WorkTree":
1422 """Get the working tree for this repository.
1424 Returns:
1425 WorkTree instance for performing working tree operations
1426 """
1427 from .worktree import WorkTree
1429 return WorkTree(self, self.path)
1431 def _write_reflog(
1432 self,
1433 ref: bytes,
1434 old_sha: bytes,
1435 new_sha: bytes,
1436 committer: bytes | None,
1437 timestamp: int | None,
1438 timezone: int | None,
1439 message: bytes,
1440 ) -> None:
1441 from .reflog import format_reflog_line
1443 path = self._reflog_path(ref)
1445 # Get shared repository permissions
1446 file_mode, dir_mode = self._get_shared_repository_permissions()
1448 # Create directory with appropriate permissions
1449 parent_dir = os.path.dirname(path)
1450 # Create directory tree, setting permissions on each level if needed
1451 parts = []
1452 current = parent_dir
1453 while current and not os.path.exists(current):
1454 parts.append(current)
1455 current = os.path.dirname(current)
1456 parts.reverse()
1457 for part in parts:
1458 os.mkdir(part)
1459 if dir_mode is not None:
1460 os.chmod(part, dir_mode)
1461 if committer is None:
1462 config = self.get_config_stack()
1463 committer = get_user_identity(config)
1464 check_user_identity(committer)
1465 if timestamp is None:
1466 timestamp = int(time.time())
1467 if timezone is None:
1468 timezone = 0 # FIXME
1469 with open(path, "ab") as f:
1470 f.write(
1471 format_reflog_line(
1472 old_sha, new_sha, committer, timestamp, timezone, message
1473 )
1474 + b"\n"
1475 )
1477 # Set file permissions (open() respects umask, so we need chmod to set the actual mode)
1478 # Always chmod to ensure correct permissions even if file already existed
1479 if file_mode is not None:
1480 os.chmod(path, file_mode)
1482 def _reflog_path(self, ref: bytes) -> str:
1483 if ref.startswith((b"main-worktree/", b"worktrees/")):
1484 raise NotImplementedError(f"refs {ref.decode()} are not supported")
1486 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir()
1487 return os.path.join(base, "logs", os.fsdecode(ref))
1489 def read_reflog(self, ref: bytes) -> Generator[reflog.Entry, None, None]:
1490 """Read reflog entries for a reference.
1492 Args:
1493 ref: Reference name (e.g. b'HEAD', b'refs/heads/master')
1495 Yields:
1496 reflog.Entry objects in chronological order (oldest first)
1497 """
1498 from .reflog import read_reflog
1500 path = self._reflog_path(ref)
1501 try:
1502 with open(path, "rb") as f:
1503 yield from read_reflog(f)
1504 except FileNotFoundError:
1505 return
1507 @classmethod
1508 def discover(cls, start: str | bytes | os.PathLike[str] = ".") -> "Repo":
1509 """Iterate parent directories to discover a repository.
1511 Return a Repo object for the first parent directory that looks like a
1512 Git repository.
1514 Args:
1515 start: The directory to start discovery from (defaults to '.')
1516 """
1517 path = os.path.abspath(start)
1518 while True:
1519 try:
1520 return cls(path)
1521 except NotGitRepository:
1522 new_path, _tail = os.path.split(path)
1523 if new_path == path: # Root reached
1524 break
1525 path = new_path
1526 start_str = os.fspath(start)
1527 if isinstance(start_str, bytes):
1528 start_str = start_str.decode("utf-8")
1529 raise NotGitRepository(f"No git repository was found at {start_str}")
1531 def controldir(self) -> str:
1532 """Return the path of the control directory."""
1533 return self._controldir
1535 def commondir(self) -> str:
1536 """Return the path of the common directory.
1538 For a main working tree, it is identical to controldir().
1540 For a linked working tree, it is the control directory of the
1541 main working tree.
1542 """
1543 return self._commondir
1545 def _determine_file_mode(self) -> bool:
1546 """Probe the file-system to determine whether permissions can be trusted.
1548 Returns: True if permissions can be trusted, False otherwise.
1549 """
1550 fname = os.path.join(self.path, ".probe-permissions")
1551 with open(fname, "w") as f:
1552 f.write("")
1554 st1 = os.lstat(fname)
1555 try:
1556 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR)
1557 except PermissionError:
1558 return False
1559 st2 = os.lstat(fname)
1561 os.unlink(fname)
1563 mode_differs = st1.st_mode != st2.st_mode
1564 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0
1566 return mode_differs and st2_has_exec
1568 def _determine_symlinks(self) -> bool:
1569 """Probe the filesystem to determine whether symlinks can be created.
1571 Returns: True if symlinks can be created, False otherwise.
1572 """
1573 # TODO(jelmer): Actually probe disk / look at filesystem
1574 return sys.platform != "win32"
1576 def _get_shared_repository_permissions(
1577 self,
1578 ) -> tuple[int | None, int | None]:
1579 """Get shared repository file and directory permissions from config.
1581 Returns:
1582 tuple of (file_mask, directory_mask) or (None, None) if not shared
1583 """
1584 try:
1585 config = self.get_config()
1586 value = config.get(("core",), "sharedRepository")
1587 return parse_shared_repository(value)
1588 except KeyError:
1589 return (None, None)
1591 def _put_named_file(self, path: str, contents: bytes) -> None:
1592 """Write a file to the control dir with the given name and contents.
1594 Args:
1595 path: The path to the file, relative to the control dir.
1596 contents: A string to write to the file.
1597 """
1598 path = path.lstrip(os.path.sep)
1600 # Get shared repository permissions
1601 file_mode, _ = self._get_shared_repository_permissions()
1603 # Create file with appropriate permissions
1604 if file_mode is not None:
1605 with GitFile(
1606 os.path.join(self.controldir(), path), "wb", mask=file_mode
1607 ) as f:
1608 f.write(contents)
1609 else:
1610 with GitFile(os.path.join(self.controldir(), path), "wb") as f:
1611 f.write(contents)
1613 def _del_named_file(self, path: str) -> None:
1614 try:
1615 os.unlink(os.path.join(self.controldir(), path))
1616 except FileNotFoundError:
1617 return
1619 def get_named_file(
1620 self,
1621 path: str | bytes,
1622 basedir: str | None = None,
1623 ) -> BinaryIO | None:
1624 """Get a file from the control dir with a specific name.
1626 Although the filename should be interpreted as a filename relative to
1627 the control dir in a disk-based Repo, the object returned need not be
1628 pointing to a file in that location.
1630 Args:
1631 path: The path to the file, relative to the control dir.
1632 basedir: Optional argument that specifies an alternative to the
1633 control dir.
1634 Returns: An open file object, or None if the file does not exist.
1635 """
1636 # TODO(dborowitz): sanitize filenames, since this is used directly by
1637 # the dumb web serving code.
1638 if basedir is None:
1639 basedir = self.controldir()
1640 if isinstance(path, bytes):
1641 path = path.decode("utf-8")
1642 path = path.lstrip(os.path.sep)
1643 try:
1644 return open(os.path.join(basedir, path), "rb")
1645 except FileNotFoundError:
1646 return None
1648 def index_path(self) -> str:
1649 """Return path to the index file."""
1650 return os.path.join(self.controldir(), INDEX_FILENAME)
1652 def open_index(self) -> "Index":
1653 """Open the index for this repository.
1655 Raises:
1656 NoIndexPresent: If no index is present
1657 Returns: The matching `Index`
1658 """
1659 from .index import Index
1661 if not self.has_index():
1662 raise NoIndexPresent
1664 # Check for manyFiles feature configuration
1665 config = self.get_config_stack()
1666 many_files = config.get_boolean(b"feature", b"manyFiles", False)
1667 skip_hash = False
1668 index_version = None
1670 if many_files:
1671 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true
1672 try:
1673 index_version_str = config.get(b"index", b"version")
1674 index_version = int(index_version_str)
1675 except KeyError:
1676 index_version = 4 # Default to version 4 for manyFiles
1677 skip_hash = config.get_boolean(b"index", b"skipHash", True)
1678 else:
1679 # Check for explicit index settings
1680 try:
1681 index_version_str = config.get(b"index", b"version")
1682 index_version = int(index_version_str)
1683 except KeyError:
1684 index_version = None
1685 skip_hash = config.get_boolean(b"index", b"skipHash", False)
1687 # Get shared repository permissions for index file
1688 file_mode, _ = self._get_shared_repository_permissions()
1690 return Index(
1691 self.index_path(),
1692 skip_hash=skip_hash,
1693 version=index_version,
1694 file_mode=file_mode,
1695 )
1697 def has_index(self) -> bool:
1698 """Check if an index is present."""
1699 # Bare repos must never have index files; non-bare repos may have a
1700 # missing index file, which is treated as empty.
1701 return not self.bare
1703 @replace_me(remove_in="0.26.0")
1704 def stage(
1705 self,
1706 fs_paths: str
1707 | bytes
1708 | os.PathLike[str]
1709 | Iterable[str | bytes | os.PathLike[str]],
1710 ) -> None:
1711 """Stage a set of paths.
1713 Args:
1714 fs_paths: List of paths, relative to the repository path
1715 """
1716 return self.get_worktree().stage(fs_paths)
1718 @replace_me(remove_in="0.26.0")
1719 def unstage(self, fs_paths: Sequence[str]) -> None:
1720 """Unstage specific file in the index.
1722 Args:
1723 fs_paths: a list of files to unstage,
1724 relative to the repository path.
1725 """
1726 return self.get_worktree().unstage(fs_paths)
1728 def clone(
1729 self,
1730 target_path: str | bytes | os.PathLike[str],
1731 *,
1732 mkdir: bool = True,
1733 bare: bool = False,
1734 origin: bytes = b"origin",
1735 checkout: bool | None = None,
1736 branch: bytes | None = None,
1737 progress: Callable[[str], None] | None = None,
1738 depth: int | None = None,
1739 symlinks: bool | None = None,
1740 ) -> "Repo":
1741 """Clone this repository.
1743 Args:
1744 target_path: Target path
1745 mkdir: Create the target directory
1746 bare: Whether to create a bare repository
1747 checkout: Whether or not to check-out HEAD after cloning
1748 origin: Base name for refs in target repository
1749 cloned from this repository
1750 branch: Optional branch or tag to be used as HEAD in the new repository
1751 instead of this repository's HEAD.
1752 progress: Optional progress function
1753 depth: Depth at which to fetch
1754 symlinks: Symlinks setting (default to autodetect)
1755 Returns: Created repository as `Repo`
1756 """
1757 encoded_path = os.fsencode(self.path)
1759 if mkdir:
1760 os.mkdir(target_path)
1762 try:
1763 if not bare:
1764 target = Repo.init(target_path, symlinks=symlinks)
1765 if checkout is None:
1766 checkout = True
1767 else:
1768 if checkout:
1769 raise ValueError("checkout and bare are incompatible")
1770 target = Repo.init_bare(target_path)
1772 try:
1773 target_config = target.get_config()
1774 target_config.set((b"remote", origin), b"url", encoded_path)
1775 target_config.set(
1776 (b"remote", origin),
1777 b"fetch",
1778 b"+refs/heads/*:refs/remotes/" + origin + b"/*",
1779 )
1780 target_config.write_to_path()
1782 ref_message = b"clone: from " + encoded_path
1783 self.fetch(target, depth=depth)
1784 target.refs.import_refs(
1785 b"refs/remotes/" + origin,
1786 self.refs.as_dict(b"refs/heads"),
1787 message=ref_message,
1788 )
1789 target.refs.import_refs(
1790 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message
1791 )
1793 head_chain, origin_sha = self.refs.follow(b"HEAD")
1794 origin_head = head_chain[-1] if head_chain else None
1795 if origin_sha and not origin_head:
1796 # set detached HEAD
1797 target.refs[b"HEAD"] = origin_sha
1798 else:
1799 _set_origin_head(target.refs, origin, origin_head)
1800 head_ref = _set_default_branch(
1801 target.refs, origin, origin_head, branch, ref_message
1802 )
1804 # Update target head
1805 if head_ref:
1806 head = _set_head(target.refs, head_ref, ref_message)
1807 else:
1808 head = None
1810 if checkout and head is not None:
1811 target.get_worktree().reset_index()
1812 except BaseException:
1813 target.close()
1814 raise
1815 except BaseException:
1816 if mkdir:
1817 import shutil
1819 shutil.rmtree(target_path)
1820 raise
1821 return target
1823 @replace_me(remove_in="0.26.0")
1824 def reset_index(self, tree: bytes | None = None) -> None:
1825 """Reset the index back to a specific tree.
1827 Args:
1828 tree: Tree SHA to reset to, None for current HEAD tree.
1829 """
1830 return self.get_worktree().reset_index(tree)
1832 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]:
1833 """Get condition matchers for includeIf conditions.
1835 Returns a dict of condition prefix to matcher function.
1836 """
1837 from pathlib import Path
1839 from .config import ConditionMatcher, match_glob_pattern
1841 # Add gitdir matchers
1842 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool:
1843 """Match gitdir against a pattern.
1845 Args:
1846 pattern: Pattern to match against
1847 case_sensitive: Whether to match case-sensitively
1849 Returns:
1850 True if gitdir matches pattern
1851 """
1852 # Handle relative patterns (starting with ./)
1853 if pattern.startswith("./"):
1854 # Can't handle relative patterns without config directory context
1855 return False
1857 # Normalize repository path
1858 try:
1859 repo_path = str(Path(self._controldir).resolve())
1860 except (OSError, ValueError):
1861 return False
1863 # Expand ~ in pattern and normalize
1864 pattern = os.path.expanduser(pattern)
1866 # Normalize pattern following Git's rules
1867 pattern = pattern.replace("\\", "/")
1868 if not pattern.startswith(("~/", "./", "/", "**")):
1869 # Check for Windows absolute path
1870 if len(pattern) >= 2 and pattern[1] == ":":
1871 pass
1872 else:
1873 pattern = "**/" + pattern
1874 if pattern.endswith("/"):
1875 pattern = pattern + "**"
1877 # Use the existing _match_gitdir_pattern function
1878 from .config import _match_gitdir_pattern
1880 pattern_bytes = pattern.encode("utf-8", errors="replace")
1881 repo_path_bytes = repo_path.encode("utf-8", errors="replace")
1883 return _match_gitdir_pattern(
1884 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive
1885 )
1887 # Add onbranch matcher
1888 def match_onbranch(pattern: str) -> bool:
1889 """Match current branch against a pattern.
1891 Args:
1892 pattern: Pattern to match against
1894 Returns:
1895 True if current branch matches pattern
1896 """
1897 try:
1898 # Get the current branch using refs
1899 ref_chain, _ = self.refs.follow(b"HEAD")
1900 head_ref = ref_chain[-1] # Get the final resolved ref
1901 except KeyError:
1902 pass
1903 else:
1904 if head_ref and head_ref.startswith(b"refs/heads/"):
1905 # Extract branch name from ref
1906 branch = extract_branch_name(head_ref).decode(
1907 "utf-8", errors="replace"
1908 )
1909 return match_glob_pattern(branch, pattern)
1910 return False
1912 matchers: dict[str, ConditionMatcher] = {
1913 "onbranch:": match_onbranch,
1914 "gitdir:": lambda pattern: match_gitdir(pattern, True),
1915 "gitdir/i:": lambda pattern: match_gitdir(pattern, False),
1916 }
1918 return matchers
1920 def get_worktree_config(self) -> "ConfigFile":
1921 """Get the worktree-specific config.
1923 Returns:
1924 ConfigFile object for the worktree config
1925 """
1926 from .config import ConfigFile
1928 path = os.path.join(self.commondir(), "config.worktree")
1929 try:
1930 # Pass condition matchers for includeIf evaluation
1931 condition_matchers = self._get_config_condition_matchers()
1932 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1933 except FileNotFoundError:
1934 cf = ConfigFile()
1935 cf.path = path
1936 return cf
1938 def get_config(self) -> "ConfigFile":
1939 """Retrieve the config object.
1941 Returns: `ConfigFile` object for the ``.git/config`` file.
1942 """
1943 from .config import ConfigFile
1945 path = os.path.join(self._commondir, "config")
1946 try:
1947 # Pass condition matchers for includeIf evaluation
1948 condition_matchers = self._get_config_condition_matchers()
1949 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1950 except FileNotFoundError:
1951 ret = ConfigFile()
1952 ret.path = path
1953 return ret
1955 def get_rebase_state_manager(self) -> "RebaseStateManager":
1956 """Get the appropriate rebase state manager for this repository.
1958 Returns: DiskRebaseStateManager instance
1959 """
1960 import os
1962 from .rebase import DiskRebaseStateManager
1964 path = os.path.join(self.controldir(), "rebase-merge")
1965 return DiskRebaseStateManager(path)
1967 def get_description(self) -> bytes | None:
1968 """Retrieve the description of this repository.
1970 Returns: Description as bytes or None.
1971 """
1972 path = os.path.join(self._controldir, "description")
1973 try:
1974 with GitFile(path, "rb") as f:
1975 return f.read()
1976 except FileNotFoundError:
1977 return None
1979 def __repr__(self) -> str:
1980 """Return string representation of this repository."""
1981 return f"<Repo at {self.path!r}>"
1983 def set_description(self, description: bytes) -> None:
1984 """Set the description for this repository.
1986 Args:
1987 description: Text to set as description for this repository.
1988 """
1989 self._put_named_file("description", description)
1991 @classmethod
1992 def _init_maybe_bare(
1993 cls,
1994 path: str | bytes | os.PathLike[str],
1995 controldir: str | bytes | os.PathLike[str],
1996 bare: bool,
1997 object_store: PackBasedObjectStore | None = None,
1998 config: "StackedConfig | None" = None,
1999 default_branch: bytes | None = None,
2000 symlinks: bool | None = None,
2001 format: int | None = None,
2002 shared_repository: str | bool | None = None,
2003 ) -> "Repo":
2004 path = os.fspath(path)
2005 if isinstance(path, bytes):
2006 path = os.fsdecode(path)
2007 controldir = os.fspath(controldir)
2008 if isinstance(controldir, bytes):
2009 controldir = os.fsdecode(controldir)
2011 # Determine shared repository permissions early
2012 file_mode: int | None = None
2013 dir_mode: int | None = None
2014 if shared_repository is not None:
2015 file_mode, dir_mode = parse_shared_repository(shared_repository)
2017 # Create base directories with appropriate permissions
2018 for d in BASE_DIRECTORIES:
2019 dir_path = os.path.join(controldir, *d)
2020 os.mkdir(dir_path)
2021 if dir_mode is not None:
2022 os.chmod(dir_path, dir_mode)
2024 if object_store is None:
2025 object_store = DiskObjectStore.init(
2026 os.path.join(controldir, OBJECTDIR),
2027 file_mode=file_mode,
2028 dir_mode=dir_mode,
2029 )
2030 ret = cls(path, bare=bare, object_store=object_store)
2031 if default_branch is None:
2032 if config is None:
2033 from .config import StackedConfig
2035 config = StackedConfig.default()
2036 try:
2037 default_branch = config.get("init", "defaultBranch")
2038 except KeyError:
2039 default_branch = DEFAULT_BRANCH
2040 ret.refs.set_symbolic_ref(b"HEAD", local_branch_name(default_branch))
2041 ret._init_files(
2042 bare=bare,
2043 symlinks=symlinks,
2044 format=format,
2045 shared_repository=shared_repository,
2046 )
2047 return ret
2049 @classmethod
2050 def init(
2051 cls,
2052 path: str | bytes | os.PathLike[str],
2053 *,
2054 mkdir: bool = False,
2055 config: "StackedConfig | None" = None,
2056 default_branch: bytes | None = None,
2057 symlinks: bool | None = None,
2058 format: int | None = None,
2059 shared_repository: str | bool | None = None,
2060 ) -> "Repo":
2061 """Create a new repository.
2063 Args:
2064 path: Path in which to create the repository
2065 mkdir: Whether to create the directory
2066 config: Configuration object
2067 default_branch: Default branch name
2068 symlinks: Whether to support symlinks
2069 format: Repository format version (defaults to 0)
2070 shared_repository: Shared repository setting (group, all, umask, or octal)
2071 Returns: `Repo` instance
2072 """
2073 path = os.fspath(path)
2074 if isinstance(path, bytes):
2075 path = os.fsdecode(path)
2076 if mkdir:
2077 os.mkdir(path)
2078 controldir = os.path.join(path, CONTROLDIR)
2079 os.mkdir(controldir)
2080 _set_filesystem_hidden(controldir)
2081 return cls._init_maybe_bare(
2082 path,
2083 controldir,
2084 False,
2085 config=config,
2086 default_branch=default_branch,
2087 symlinks=symlinks,
2088 format=format,
2089 shared_repository=shared_repository,
2090 )
2092 @classmethod
2093 def _init_new_working_directory(
2094 cls,
2095 path: str | bytes | os.PathLike[str],
2096 main_repo: "Repo",
2097 identifier: str | None = None,
2098 mkdir: bool = False,
2099 ) -> "Repo":
2100 """Create a new working directory linked to a repository.
2102 Args:
2103 path: Path in which to create the working tree.
2104 main_repo: Main repository to reference
2105 identifier: Worktree identifier
2106 mkdir: Whether to create the directory
2107 Returns: `Repo` instance
2108 """
2109 path = os.fspath(path)
2110 if isinstance(path, bytes):
2111 path = os.fsdecode(path)
2112 if mkdir:
2113 os.mkdir(path)
2114 if identifier is None:
2115 identifier = os.path.basename(path)
2116 # Ensure we use absolute path for the worktree control directory
2117 main_controldir = os.path.abspath(main_repo.controldir())
2118 main_worktreesdir = os.path.join(main_controldir, WORKTREES)
2119 worktree_controldir = os.path.join(main_worktreesdir, identifier)
2120 gitdirfile = os.path.join(path, CONTROLDIR)
2121 with open(gitdirfile, "wb") as f:
2122 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n")
2124 # Get shared repository permissions from main repository
2125 _, dir_mode = main_repo._get_shared_repository_permissions()
2127 # Create directories with appropriate permissions
2128 try:
2129 os.mkdir(main_worktreesdir)
2130 if dir_mode is not None:
2131 os.chmod(main_worktreesdir, dir_mode)
2132 except FileExistsError:
2133 pass
2134 try:
2135 os.mkdir(worktree_controldir)
2136 if dir_mode is not None:
2137 os.chmod(worktree_controldir, dir_mode)
2138 except FileExistsError:
2139 pass
2140 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f:
2141 f.write(os.fsencode(gitdirfile) + b"\n")
2142 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f:
2143 f.write(b"../..\n")
2144 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f:
2145 f.write(main_repo.head() + b"\n")
2146 r = cls(os.path.normpath(path))
2147 r.get_worktree().reset_index()
2148 return r
2150 @classmethod
2151 def init_bare(
2152 cls,
2153 path: str | bytes | os.PathLike[str],
2154 *,
2155 mkdir: bool = False,
2156 object_store: PackBasedObjectStore | None = None,
2157 config: "StackedConfig | None" = None,
2158 default_branch: bytes | None = None,
2159 format: int | None = None,
2160 shared_repository: str | bool | None = None,
2161 ) -> "Repo":
2162 """Create a new bare repository.
2164 ``path`` should already exist and be an empty directory.
2166 Args:
2167 path: Path to create bare repository in
2168 mkdir: Whether to create the directory
2169 object_store: Object store to use
2170 config: Configuration object
2171 default_branch: Default branch name
2172 format: Repository format version (defaults to 0)
2173 shared_repository: Shared repository setting (group, all, umask, or octal)
2174 Returns: a `Repo` instance
2175 """
2176 path = os.fspath(path)
2177 if isinstance(path, bytes):
2178 path = os.fsdecode(path)
2179 if mkdir:
2180 os.mkdir(path)
2181 return cls._init_maybe_bare(
2182 path,
2183 path,
2184 True,
2185 object_store=object_store,
2186 config=config,
2187 default_branch=default_branch,
2188 format=format,
2189 shared_repository=shared_repository,
2190 )
2192 create = init_bare
2194 def close(self) -> None:
2195 """Close any files opened by this repository."""
2196 self.object_store.close()
2197 # Clean up filter context if it was created
2198 if self.filter_context is not None:
2199 self.filter_context.close()
2200 self.filter_context = None
2202 def __enter__(self) -> "Repo":
2203 """Enter context manager."""
2204 return self
2206 def __exit__(
2207 self,
2208 exc_type: type[BaseException] | None,
2209 exc_val: BaseException | None,
2210 exc_tb: TracebackType | None,
2211 ) -> None:
2212 """Exit context manager and close repository."""
2213 self.close()
2215 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]:
2216 """Read .gitattributes file from working tree.
2218 Returns:
2219 Dictionary mapping file patterns to attributes
2220 """
2221 gitattributes = {}
2222 gitattributes_path = os.path.join(self.path, ".gitattributes")
2224 if os.path.exists(gitattributes_path):
2225 with open(gitattributes_path, "rb") as f:
2226 for line in f:
2227 line = line.strip()
2228 if not line or line.startswith(b"#"):
2229 continue
2231 parts = line.split()
2232 if len(parts) < 2:
2233 continue
2235 pattern = parts[0]
2236 attrs = {}
2238 for attr in parts[1:]:
2239 if attr.startswith(b"-"):
2240 # Unset attribute
2241 attrs[attr[1:]] = b"false"
2242 elif b"=" in attr:
2243 # Set to value
2244 key, value = attr.split(b"=", 1)
2245 attrs[key] = value
2246 else:
2247 # Set attribute
2248 attrs[attr] = b"true"
2250 gitattributes[pattern] = attrs
2252 return gitattributes
2254 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
2255 """Return a BlobNormalizer object."""
2256 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2258 # Get fresh configuration and GitAttributes
2259 config_stack = self.get_config_stack()
2260 git_attributes = self.get_gitattributes()
2262 # Lazily create FilterContext if needed
2263 if self.filter_context is None:
2264 filter_registry = FilterRegistry(config_stack, self)
2265 self.filter_context = FilterContext(filter_registry)
2266 else:
2267 # Refresh the context with current config to handle config changes
2268 self.filter_context.refresh_config(config_stack)
2270 # Return a new FilterBlobNormalizer with the context
2271 return FilterBlobNormalizer(
2272 config_stack, git_attributes, filter_context=self.filter_context
2273 )
2275 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
2276 """Read gitattributes for the repository.
2278 Args:
2279 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
2281 Returns:
2282 GitAttributes object that can be used to match paths
2283 """
2284 from .attrs import (
2285 GitAttributes,
2286 Pattern,
2287 parse_git_attributes,
2288 )
2290 patterns = []
2292 # Read system gitattributes (TODO: implement this)
2293 # Read global gitattributes (TODO: implement this)
2295 # Read repository .gitattributes from index/tree
2296 if tree is None:
2297 try:
2298 # Try to get from HEAD
2299 head = self[b"HEAD"]
2300 if isinstance(head, Tag):
2301 _cls, obj = head.object
2302 head = self.get_object(obj)
2303 assert isinstance(head, Commit)
2304 tree = head.tree
2305 except KeyError:
2306 # No HEAD, no attributes from tree
2307 pass
2309 if tree is not None:
2310 try:
2311 tree_obj = self[tree]
2312 assert isinstance(tree_obj, Tree)
2313 if b".gitattributes" in tree_obj:
2314 _, attrs_sha = tree_obj[b".gitattributes"]
2315 attrs_blob = self[attrs_sha]
2316 if isinstance(attrs_blob, Blob):
2317 attrs_data = BytesIO(attrs_blob.data)
2318 for pattern_bytes, attrs in parse_git_attributes(attrs_data):
2319 pattern = Pattern(pattern_bytes)
2320 patterns.append((pattern, attrs))
2321 except (KeyError, NotTreeError):
2322 pass
2324 # Read .git/info/attributes
2325 info_attrs_path = os.path.join(self.controldir(), "info", "attributes")
2326 if os.path.exists(info_attrs_path):
2327 with open(info_attrs_path, "rb") as f:
2328 for pattern_bytes, attrs in parse_git_attributes(f):
2329 pattern = Pattern(pattern_bytes)
2330 patterns.append((pattern, attrs))
2332 # Read .gitattributes from working directory (if it exists)
2333 working_attrs_path = os.path.join(self.path, ".gitattributes")
2334 if os.path.exists(working_attrs_path):
2335 with open(working_attrs_path, "rb") as f:
2336 for pattern_bytes, attrs in parse_git_attributes(f):
2337 pattern = Pattern(pattern_bytes)
2338 patterns.append((pattern, attrs))
2340 return GitAttributes(patterns)
2342 @replace_me(remove_in="0.26.0")
2343 def _sparse_checkout_file_path(self) -> str:
2344 """Return the path of the sparse-checkout file in this repo's control dir."""
2345 return self.get_worktree()._sparse_checkout_file_path()
2347 @replace_me(remove_in="0.26.0")
2348 def configure_for_cone_mode(self) -> None:
2349 """Ensure the repository is configured for cone-mode sparse-checkout."""
2350 return self.get_worktree().configure_for_cone_mode()
2352 @replace_me(remove_in="0.26.0")
2353 def infer_cone_mode(self) -> bool:
2354 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
2355 return self.get_worktree().infer_cone_mode()
2357 @replace_me(remove_in="0.26.0")
2358 def get_sparse_checkout_patterns(self) -> list[str]:
2359 """Return a list of sparse-checkout patterns from info/sparse-checkout.
2361 Returns:
2362 A list of patterns. Returns an empty list if the file is missing.
2363 """
2364 return self.get_worktree().get_sparse_checkout_patterns()
2366 @replace_me(remove_in="0.26.0")
2367 def set_sparse_checkout_patterns(self, patterns: Sequence[str]) -> None:
2368 """Write the given sparse-checkout patterns into info/sparse-checkout.
2370 Creates the info/ directory if it does not exist.
2372 Args:
2373 patterns: A list of gitignore-style patterns to store.
2374 """
2375 return self.get_worktree().set_sparse_checkout_patterns(patterns)
2377 @replace_me(remove_in="0.26.0")
2378 def set_cone_mode_patterns(self, dirs: Sequence[str] | None = None) -> None:
2379 """Write the given cone-mode directory patterns into info/sparse-checkout.
2381 For each directory to include, add an inclusion line that "undoes" the prior
2382 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
2383 Never add the same line twice.
2384 """
2385 return self.get_worktree().set_cone_mode_patterns(dirs)
2388class MemoryRepo(BaseRepo):
2389 """Repo that stores refs, objects, and named files in memory.
2391 MemoryRepos are always bare: they have no working tree and no index, since
2392 those have a stronger dependency on the filesystem.
2393 """
2395 filter_context: "FilterContext | None"
2397 def __init__(self) -> None:
2398 """Create a new repository in memory."""
2399 from .config import ConfigFile
2401 self._reflog: list[Any] = []
2402 refs_container = DictRefsContainer({}, logger=self._append_reflog)
2403 BaseRepo.__init__(self, MemoryObjectStore(), refs_container)
2404 self._named_files: dict[str, bytes] = {}
2405 self.bare = True
2406 self._config = ConfigFile()
2407 self._description: bytes | None = None
2408 self.filter_context = None
2410 def _append_reflog(
2411 self,
2412 ref: bytes,
2413 old_sha: bytes | None,
2414 new_sha: bytes | None,
2415 committer: bytes | None,
2416 timestamp: int | None,
2417 timezone: int | None,
2418 message: bytes | None,
2419 ) -> None:
2420 self._reflog.append(
2421 (ref, old_sha, new_sha, committer, timestamp, timezone, message)
2422 )
2424 def set_description(self, description: bytes) -> None:
2425 """Set the description for this repository.
2427 Args:
2428 description: Text to set as description
2429 """
2430 self._description = description
2432 def get_description(self) -> bytes | None:
2433 """Get the description of this repository.
2435 Returns:
2436 Repository description as bytes
2437 """
2438 return self._description
2440 def _determine_file_mode(self) -> bool:
2441 """Probe the file-system to determine whether permissions can be trusted.
2443 Returns: True if permissions can be trusted, False otherwise.
2444 """
2445 return sys.platform != "win32"
2447 def _determine_symlinks(self) -> bool:
2448 """Probe the file-system to determine whether permissions can be trusted.
2450 Returns: True if permissions can be trusted, False otherwise.
2451 """
2452 return sys.platform != "win32"
2454 def _put_named_file(self, path: str, contents: bytes) -> None:
2455 """Write a file to the control dir with the given name and contents.
2457 Args:
2458 path: The path to the file, relative to the control dir.
2459 contents: A string to write to the file.
2460 """
2461 self._named_files[path] = contents
2463 def _del_named_file(self, path: str) -> None:
2464 try:
2465 del self._named_files[path]
2466 except KeyError:
2467 pass
2469 def get_named_file(
2470 self,
2471 path: str | bytes,
2472 basedir: str | None = None,
2473 ) -> BytesIO | None:
2474 """Get a file from the control dir with a specific name.
2476 Although the filename should be interpreted as a filename relative to
2477 the control dir in a disk-baked Repo, the object returned need not be
2478 pointing to a file in that location.
2480 Args:
2481 path: The path to the file, relative to the control dir.
2482 basedir: Optional base directory for the path
2483 Returns: An open file object, or None if the file does not exist.
2484 """
2485 path_str = path.decode() if isinstance(path, bytes) else path
2486 contents = self._named_files.get(path_str, None)
2487 if contents is None:
2488 return None
2489 return BytesIO(contents)
2491 def open_index(self) -> "Index":
2492 """Fail to open index for this repo, since it is bare.
2494 Raises:
2495 NoIndexPresent: Raised when no index is present
2496 """
2497 raise NoIndexPresent
2499 def get_config(self) -> "ConfigFile":
2500 """Retrieve the config object.
2502 Returns: `ConfigFile` object.
2503 """
2504 return self._config
2506 def get_rebase_state_manager(self) -> "RebaseStateManager":
2507 """Get the appropriate rebase state manager for this repository.
2509 Returns: MemoryRebaseStateManager instance
2510 """
2511 from .rebase import MemoryRebaseStateManager
2513 return MemoryRebaseStateManager(self)
2515 def get_blob_normalizer(self) -> "FilterBlobNormalizer":
2516 """Return a BlobNormalizer object for checkin/checkout operations."""
2517 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2519 # Get fresh configuration and GitAttributes
2520 config_stack = self.get_config_stack()
2521 git_attributes = self.get_gitattributes()
2523 # Lazily create FilterContext if needed
2524 if self.filter_context is None:
2525 filter_registry = FilterRegistry(config_stack, self)
2526 self.filter_context = FilterContext(filter_registry)
2527 else:
2528 # Refresh the context with current config to handle config changes
2529 self.filter_context.refresh_config(config_stack)
2531 # Return a new FilterBlobNormalizer with the context
2532 return FilterBlobNormalizer(
2533 config_stack, git_attributes, filter_context=self.filter_context
2534 )
2536 def get_gitattributes(self, tree: bytes | None = None) -> "GitAttributes":
2537 """Read gitattributes for the repository."""
2538 from .attrs import GitAttributes
2540 # Memory repos don't have working trees or gitattributes files
2541 # Return empty GitAttributes
2542 return GitAttributes([])
2544 def close(self) -> None:
2545 """Close any resources opened by this repository."""
2546 # Clean up filter context if it was created
2547 if self.filter_context is not None:
2548 self.filter_context.close()
2549 self.filter_context = None
2551 def do_commit(
2552 self,
2553 message: bytes | None = None,
2554 committer: bytes | None = None,
2555 author: bytes | None = None,
2556 commit_timestamp: float | None = None,
2557 commit_timezone: int | None = None,
2558 author_timestamp: float | None = None,
2559 author_timezone: int | None = None,
2560 tree: ObjectID | None = None,
2561 encoding: bytes | None = None,
2562 ref: Ref | None = b"HEAD",
2563 merge_heads: list[ObjectID] | None = None,
2564 no_verify: bool = False,
2565 sign: bool = False,
2566 ) -> bytes:
2567 """Create a new commit.
2569 This is a simplified implementation for in-memory repositories that
2570 doesn't support worktree operations or hooks.
2572 Args:
2573 message: Commit message
2574 committer: Committer fullname
2575 author: Author fullname
2576 commit_timestamp: Commit timestamp (defaults to now)
2577 commit_timezone: Commit timestamp timezone (defaults to GMT)
2578 author_timestamp: Author timestamp (defaults to commit timestamp)
2579 author_timezone: Author timestamp timezone (defaults to commit timezone)
2580 tree: SHA1 of the tree root to use
2581 encoding: Encoding
2582 ref: Optional ref to commit to (defaults to current branch).
2583 If None, creates a dangling commit without updating any ref.
2584 merge_heads: Merge heads
2585 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo)
2586 sign: GPG Sign the commit (ignored for MemoryRepo)
2588 Returns:
2589 New commit SHA1
2590 """
2591 import time
2593 from .objects import Commit
2595 if tree is None:
2596 raise ValueError("tree must be specified for MemoryRepo")
2598 c = Commit()
2599 if len(tree) != 40:
2600 raise ValueError("tree must be a 40-byte hex sha string")
2601 c.tree = tree
2603 config = self.get_config_stack()
2604 if merge_heads is None:
2605 merge_heads = []
2606 if committer is None:
2607 committer = get_user_identity(config, kind="COMMITTER")
2608 check_user_identity(committer)
2609 c.committer = committer
2610 if commit_timestamp is None:
2611 commit_timestamp = time.time()
2612 c.commit_time = int(commit_timestamp)
2613 if commit_timezone is None:
2614 commit_timezone = 0
2615 c.commit_timezone = commit_timezone
2616 if author is None:
2617 author = get_user_identity(config, kind="AUTHOR")
2618 c.author = author
2619 check_user_identity(author)
2620 if author_timestamp is None:
2621 author_timestamp = commit_timestamp
2622 c.author_time = int(author_timestamp)
2623 if author_timezone is None:
2624 author_timezone = commit_timezone
2625 c.author_timezone = author_timezone
2626 if encoding is None:
2627 try:
2628 encoding = config.get(("i18n",), "commitEncoding")
2629 except KeyError:
2630 pass
2631 if encoding is not None:
2632 c.encoding = encoding
2634 # Handle message (for MemoryRepo, we don't support callable messages)
2635 if callable(message):
2636 message = message(self, c)
2637 if message is None:
2638 raise ValueError("Message callback returned None")
2640 if message is None:
2641 raise ValueError("No commit message specified")
2643 c.message = message
2645 if ref is None:
2646 # Create a dangling commit
2647 c.parents = merge_heads
2648 self.object_store.add_object(c)
2649 else:
2650 try:
2651 old_head = self.refs[ref]
2652 c.parents = [old_head, *merge_heads]
2653 self.object_store.add_object(c)
2654 ok = self.refs.set_if_equals(
2655 ref,
2656 old_head,
2657 c.id,
2658 message=b"commit: " + message,
2659 committer=committer,
2660 timestamp=int(commit_timestamp),
2661 timezone=commit_timezone,
2662 )
2663 except KeyError:
2664 c.parents = merge_heads
2665 self.object_store.add_object(c)
2666 ok = self.refs.add_if_new(
2667 ref,
2668 c.id,
2669 message=b"commit: " + message,
2670 committer=committer,
2671 timestamp=int(commit_timestamp),
2672 timezone=commit_timezone,
2673 )
2674 if not ok:
2675 from .errors import CommitError
2677 raise CommitError(f"{ref!r} changed during commit")
2679 return c.id
2681 @classmethod
2682 def init_bare(
2683 cls,
2684 objects: Iterable[ShaFile],
2685 refs: Mapping[bytes, bytes],
2686 format: int | None = None,
2687 ) -> "MemoryRepo":
2688 """Create a new bare repository in memory.
2690 Args:
2691 objects: Objects for the new repository,
2692 as iterable
2693 refs: Refs as dictionary, mapping names
2694 to object SHA1s
2695 format: Repository format version (defaults to 0)
2696 """
2697 ret = cls()
2698 for obj in objects:
2699 ret.object_store.add_object(obj)
2700 for refname, sha in refs.items():
2701 ret.refs.add_if_new(refname, sha)
2702 ret._init_files(bare=True, format=format)
2703 return ret