Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 41%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# repo.py -- For dealing with git repositories.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as public by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
24"""Repository access.
26This module contains the base class for git repositories
27(BaseRepo) and an implementation which uses a repository on
28local disk (Repo).
30"""
32import os
33import stat
34import sys
35import time
36import warnings
37from collections.abc import Iterable
38from io import BytesIO
39from typing import (
40 TYPE_CHECKING,
41 Any,
42 BinaryIO,
43 Callable,
44 Optional,
45 Union,
46)
48if TYPE_CHECKING:
49 # There are no circular imports here, but we try to defer imports as long
50 # as possible to reduce start-up time for anything that doesn't need
51 # these imports.
52 from .config import ConditionMatcher, ConfigFile, StackedConfig
53 from .index import Index
54 from .notes import Notes
56from .errors import (
57 CommitError,
58 HookError,
59 NoIndexPresent,
60 NotBlobError,
61 NotCommitError,
62 NotGitRepository,
63 NotTagError,
64 NotTreeError,
65 RefFormatError,
66)
67from .file import GitFile
68from .hooks import (
69 CommitMsgShellHook,
70 Hook,
71 PostCommitShellHook,
72 PostReceiveShellHook,
73 PreCommitShellHook,
74)
75from .line_ending import BlobNormalizer, TreeBlobNormalizer
76from .object_store import (
77 DiskObjectStore,
78 MemoryObjectStore,
79 MissingObjectFinder,
80 ObjectStoreGraphWalker,
81 PackBasedObjectStore,
82 find_shallow,
83 peel_sha,
84)
85from .objects import (
86 Blob,
87 Commit,
88 ObjectID,
89 ShaFile,
90 Tag,
91 Tree,
92 check_hexsha,
93 valid_hexsha,
94)
95from .pack import generate_unpacked_objects
96from .refs import (
97 ANNOTATED_TAG_SUFFIX, # noqa: F401
98 LOCAL_BRANCH_PREFIX,
99 LOCAL_TAG_PREFIX, # noqa: F401
100 SYMREF, # noqa: F401
101 DictRefsContainer,
102 DiskRefsContainer,
103 InfoRefsContainer, # noqa: F401
104 Ref,
105 RefsContainer,
106 _set_default_branch,
107 _set_head,
108 _set_origin_head,
109 check_ref_format, # noqa: F401
110 read_packed_refs, # noqa: F401
111 read_packed_refs_with_peeled, # noqa: F401
112 serialize_refs,
113 write_packed_refs, # noqa: F401
114)
116CONTROLDIR = ".git"
117OBJECTDIR = "objects"
118REFSDIR = "refs"
119REFSDIR_TAGS = "tags"
120REFSDIR_HEADS = "heads"
121INDEX_FILENAME = "index"
122COMMONDIR = "commondir"
123GITDIR = "gitdir"
124WORKTREES = "worktrees"
126BASE_DIRECTORIES = [
127 ["branches"],
128 [REFSDIR],
129 [REFSDIR, REFSDIR_TAGS],
130 [REFSDIR, REFSDIR_HEADS],
131 ["hooks"],
132 ["info"],
133]
135DEFAULT_BRANCH = b"master"
138class InvalidUserIdentity(Exception):
139 """User identity is not of the format 'user <email>'."""
141 def __init__(self, identity) -> None:
142 self.identity = identity
145class DefaultIdentityNotFound(Exception):
146 """Default identity could not be determined."""
149# TODO(jelmer): Cache?
150def _get_default_identity() -> tuple[str, str]:
151 import socket
153 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
154 username = os.environ.get(name)
155 if username:
156 break
157 else:
158 username = None
160 try:
161 import pwd
162 except ImportError:
163 fullname = None
164 else:
165 try:
166 entry = pwd.getpwuid(os.getuid()) # type: ignore
167 except KeyError:
168 fullname = None
169 else:
170 if getattr(entry, "gecos", None):
171 fullname = entry.pw_gecos.split(",")[0]
172 else:
173 fullname = None
174 if username is None:
175 username = entry.pw_name
176 if not fullname:
177 if username is None:
178 raise DefaultIdentityNotFound("no username found")
179 fullname = username
180 email = os.environ.get("EMAIL")
181 if email is None:
182 if username is None:
183 raise DefaultIdentityNotFound("no username found")
184 email = f"{username}@{socket.gethostname()}"
185 return (fullname, email)
188def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes:
189 """Determine the identity to use for new commits.
191 If kind is set, this first checks
192 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL.
194 If those variables are not set, then it will fall back
195 to reading the user.name and user.email settings from
196 the specified configuration.
198 If that also fails, then it will fall back to using
199 the current users' identity as obtained from the host
200 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f).
202 Args:
203 kind: Optional kind to return identity for,
204 usually either "AUTHOR" or "COMMITTER".
206 Returns:
207 A user identity
208 """
209 user: Optional[bytes] = None
210 email: Optional[bytes] = None
211 if kind:
212 user_uc = os.environ.get("GIT_" + kind + "_NAME")
213 if user_uc is not None:
214 user = user_uc.encode("utf-8")
215 email_uc = os.environ.get("GIT_" + kind + "_EMAIL")
216 if email_uc is not None:
217 email = email_uc.encode("utf-8")
218 if user is None:
219 try:
220 user = config.get(("user",), "name")
221 except KeyError:
222 user = None
223 if email is None:
224 try:
225 email = config.get(("user",), "email")
226 except KeyError:
227 email = None
228 default_user, default_email = _get_default_identity()
229 if user is None:
230 user = default_user.encode("utf-8")
231 if email is None:
232 email = default_email.encode("utf-8")
233 if email.startswith(b"<") and email.endswith(b">"):
234 email = email[1:-1]
235 return user + b" <" + email + b">"
238def check_user_identity(identity) -> None:
239 """Verify that a user identity is formatted correctly.
241 Args:
242 identity: User identity bytestring
243 Raises:
244 InvalidUserIdentity: Raised when identity is invalid
245 """
246 try:
247 fst, snd = identity.split(b" <", 1)
248 except ValueError as exc:
249 raise InvalidUserIdentity(identity) from exc
250 if b">" not in snd:
251 raise InvalidUserIdentity(identity)
252 if b"\0" in identity or b"\n" in identity:
253 raise InvalidUserIdentity(identity)
256def parse_graftpoints(
257 graftpoints: Iterable[bytes],
258) -> dict[bytes, list[bytes]]:
259 """Convert a list of graftpoints into a dict.
261 Args:
262 graftpoints: Iterator of graftpoint lines
264 Each line is formatted as:
265 <commit sha1> <parent sha1> [<parent sha1>]*
267 Resulting dictionary is:
268 <commit sha1>: [<parent sha1>*]
270 https://git.wiki.kernel.org/index.php/GraftPoint
271 """
272 grafts = {}
273 for line in graftpoints:
274 raw_graft = line.split(None, 1)
276 commit = raw_graft[0]
277 if len(raw_graft) == 2:
278 parents = raw_graft[1].split()
279 else:
280 parents = []
282 for sha in [commit, *parents]:
283 check_hexsha(sha, "Invalid graftpoint")
285 grafts[commit] = parents
286 return grafts
289def serialize_graftpoints(graftpoints: dict[bytes, list[bytes]]) -> bytes:
290 """Convert a dictionary of grafts into string.
292 The graft dictionary is:
293 <commit sha1>: [<parent sha1>*]
295 Each line is formatted as:
296 <commit sha1> <parent sha1> [<parent sha1>]*
298 https://git.wiki.kernel.org/index.php/GraftPoint
300 """
301 graft_lines = []
302 for commit, parents in graftpoints.items():
303 if parents:
304 graft_lines.append(commit + b" " + b" ".join(parents))
305 else:
306 graft_lines.append(commit)
307 return b"\n".join(graft_lines)
310def _set_filesystem_hidden(path) -> None:
311 """Mark path as to be hidden if supported by platform and filesystem.
313 On win32 uses SetFileAttributesW api:
314 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw>
315 """
316 if sys.platform == "win32":
317 import ctypes
318 from ctypes.wintypes import BOOL, DWORD, LPCWSTR
320 FILE_ATTRIBUTE_HIDDEN = 2
321 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)(
322 ("SetFileAttributesW", ctypes.windll.kernel32)
323 )
325 if isinstance(path, bytes):
326 path = os.fsdecode(path)
327 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN):
328 pass # Could raise or log `ctypes.WinError()` here
330 # Could implement other platform specific filesystem hiding here
333class ParentsProvider:
334 def __init__(self, store, grafts={}, shallows=[]) -> None:
335 self.store = store
336 self.grafts = grafts
337 self.shallows = set(shallows)
339 # Get commit graph once at initialization for performance
340 self.commit_graph = store.get_commit_graph()
342 def get_parents(self, commit_id, commit=None):
343 try:
344 return self.grafts[commit_id]
345 except KeyError:
346 pass
347 if commit_id in self.shallows:
348 return []
350 # Try to use commit graph for faster parent lookup
351 if self.commit_graph:
352 parents = self.commit_graph.get_parents(commit_id)
353 if parents is not None:
354 return parents
356 # Fallback to reading the commit object
357 if commit is None:
358 commit = self.store[commit_id]
359 return commit.parents
362class BaseRepo:
363 """Base class for a git repository.
365 This base class is meant to be used for Repository implementations that e.g.
366 work on top of a different transport than a standard filesystem path.
368 Attributes:
369 object_store: Dictionary-like object for accessing
370 the objects
371 refs: Dictionary-like object with the refs in this
372 repository
373 """
375 def __init__(self, object_store: PackBasedObjectStore, refs: RefsContainer) -> None:
376 """Open a repository.
378 This shouldn't be called directly, but rather through one of the
379 base classes, such as MemoryRepo or Repo.
381 Args:
382 object_store: Object store to use
383 refs: Refs container to use
384 """
385 self.object_store = object_store
386 self.refs = refs
388 self._graftpoints: dict[bytes, list[bytes]] = {}
389 self.hooks: dict[str, Hook] = {}
391 def _determine_file_mode(self) -> bool:
392 """Probe the file-system to determine whether permissions can be trusted.
394 Returns: True if permissions can be trusted, False otherwise.
395 """
396 raise NotImplementedError(self._determine_file_mode)
398 def _determine_symlinks(self) -> bool:
399 """Probe the filesystem to determine whether symlinks can be created.
401 Returns: True if symlinks can be created, False otherwise.
402 """
403 # For now, just mimic the old behaviour
404 return sys.platform != "win32"
406 def _init_files(
407 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None
408 ) -> None:
409 """Initialize a default set of named files."""
410 from .config import ConfigFile
412 self._put_named_file("description", b"Unnamed repository")
413 f = BytesIO()
414 cf = ConfigFile()
415 if format is None:
416 format = 0
417 if format not in (0, 1):
418 raise ValueError(f"Unsupported repository format version: {format}")
419 cf.set("core", "repositoryformatversion", str(format))
420 if self._determine_file_mode():
421 cf.set("core", "filemode", True)
422 else:
423 cf.set("core", "filemode", False)
425 if symlinks is None and not bare:
426 symlinks = self._determine_symlinks()
428 if symlinks is False:
429 cf.set("core", "symlinks", symlinks)
431 cf.set("core", "bare", bare)
432 cf.set("core", "logallrefupdates", True)
433 cf.write_to_file(f)
434 self._put_named_file("config", f.getvalue())
435 self._put_named_file(os.path.join("info", "exclude"), b"")
437 def get_named_file(self, path: str) -> Optional[BinaryIO]:
438 """Get a file from the control dir with a specific name.
440 Although the filename should be interpreted as a filename relative to
441 the control dir in a disk-based Repo, the object returned need not be
442 pointing to a file in that location.
444 Args:
445 path: The path to the file, relative to the control dir.
446 Returns: An open file object, or None if the file does not exist.
447 """
448 raise NotImplementedError(self.get_named_file)
450 def _put_named_file(self, path: str, contents: bytes) -> None:
451 """Write a file to the control dir with the given name and contents.
453 Args:
454 path: The path to the file, relative to the control dir.
455 contents: A string to write to the file.
456 """
457 raise NotImplementedError(self._put_named_file)
459 def _del_named_file(self, path: str) -> None:
460 """Delete a file in the control directory with the given name."""
461 raise NotImplementedError(self._del_named_file)
463 def open_index(self) -> "Index":
464 """Open the index for this repository.
466 Raises:
467 NoIndexPresent: If no index is present
468 Returns: The matching `Index`
469 """
470 raise NotImplementedError(self.open_index)
472 def fetch(
473 self, target, determine_wants=None, progress=None, depth: Optional[int] = None
474 ):
475 """Fetch objects into another repository.
477 Args:
478 target: The target repository
479 determine_wants: Optional function to determine what refs to
480 fetch.
481 progress: Optional progress function
482 depth: Optional shallow fetch depth
483 Returns: The local refs
484 """
485 if determine_wants is None:
486 determine_wants = target.object_store.determine_wants_all
487 count, pack_data = self.fetch_pack_data(
488 determine_wants,
489 target.get_graph_walker(),
490 progress=progress,
491 depth=depth,
492 )
493 target.object_store.add_pack_data(count, pack_data, progress)
494 return self.get_refs()
496 def fetch_pack_data(
497 self,
498 determine_wants,
499 graph_walker,
500 progress,
501 *,
502 get_tagged=None,
503 depth: Optional[int] = None,
504 ):
505 """Fetch the pack data required for a set of revisions.
507 Args:
508 determine_wants: Function that takes a dictionary with heads
509 and returns the list of heads to fetch.
510 graph_walker: Object that can iterate over the list of revisions
511 to fetch and has an "ack" method that will be called to acknowledge
512 that a revision is present.
513 progress: Simple progress function that will be called with
514 updated progress strings.
515 get_tagged: Function that returns a dict of pointed-to sha ->
516 tag sha for including tags.
517 depth: Shallow fetch depth
518 Returns: count and iterator over pack data
519 """
520 missing_objects = self.find_missing_objects(
521 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth
522 )
523 if missing_objects is None:
524 return 0, iter([])
525 remote_has = missing_objects.get_remote_has()
526 object_ids = list(missing_objects)
527 return len(object_ids), generate_unpacked_objects(
528 self.object_store, object_ids, progress=progress, other_haves=remote_has
529 )
531 def find_missing_objects(
532 self,
533 determine_wants,
534 graph_walker,
535 progress,
536 *,
537 get_tagged=None,
538 depth: Optional[int] = None,
539 ) -> Optional[MissingObjectFinder]:
540 """Fetch the missing objects required for a set of revisions.
542 Args:
543 determine_wants: Function that takes a dictionary with heads
544 and returns the list of heads to fetch.
545 graph_walker: Object that can iterate over the list of revisions
546 to fetch and has an "ack" method that will be called to acknowledge
547 that a revision is present.
548 progress: Simple progress function that will be called with
549 updated progress strings.
550 get_tagged: Function that returns a dict of pointed-to sha ->
551 tag sha for including tags.
552 depth: Shallow fetch depth
553 Returns: iterator over objects, with __len__ implemented
554 """
555 refs = serialize_refs(self.object_store, self.get_refs())
557 wants = determine_wants(refs)
558 if not isinstance(wants, list):
559 raise TypeError("determine_wants() did not return a list")
561 current_shallow = set(getattr(graph_walker, "shallow", set()))
563 if depth not in (None, 0):
564 shallow, not_shallow = find_shallow(self.object_store, wants, depth)
565 # Only update if graph_walker has shallow attribute
566 if hasattr(graph_walker, "shallow"):
567 graph_walker.shallow.update(shallow - not_shallow)
568 new_shallow = graph_walker.shallow - current_shallow
569 unshallow = graph_walker.unshallow = not_shallow & current_shallow
570 if hasattr(graph_walker, "update_shallow"):
571 graph_walker.update_shallow(new_shallow, unshallow)
572 else:
573 unshallow = getattr(graph_walker, "unshallow", frozenset())
575 if wants == []:
576 # TODO(dborowitz): find a way to short-circuit that doesn't change
577 # this interface.
579 if getattr(graph_walker, "shallow", set()) or unshallow:
580 # Do not send a pack in shallow short-circuit path
581 return None
583 class DummyMissingObjectFinder:
584 def get_remote_has(self) -> None:
585 return None
587 def __len__(self) -> int:
588 return 0
590 def __iter__(self):
591 yield from []
593 return DummyMissingObjectFinder() # type: ignore
595 # If the graph walker is set up with an implementation that can
596 # ACK/NAK to the wire, it will write data to the client through
597 # this call as a side-effect.
598 haves = self.object_store.find_common_revisions(graph_walker)
600 # Deal with shallow requests separately because the haves do
601 # not reflect what objects are missing
602 if getattr(graph_walker, "shallow", set()) or unshallow:
603 # TODO: filter the haves commits from iter_shas. the specific
604 # commits aren't missing.
605 haves = []
607 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow)
609 def get_parents(commit):
610 return parents_provider.get_parents(commit.id, commit)
612 return MissingObjectFinder(
613 self.object_store,
614 haves=haves,
615 wants=wants,
616 shallow=getattr(graph_walker, "shallow", set()),
617 progress=progress,
618 get_tagged=get_tagged,
619 get_parents=get_parents,
620 )
622 def generate_pack_data(
623 self,
624 have: list[ObjectID],
625 want: list[ObjectID],
626 progress: Optional[Callable[[str], None]] = None,
627 ofs_delta: Optional[bool] = None,
628 ):
629 """Generate pack data objects for a set of wants/haves.
631 Args:
632 have: List of SHA1s of objects that should not be sent
633 want: List of SHA1s of objects that should be sent
634 ofs_delta: Whether OFS deltas can be included
635 progress: Optional progress reporting method
636 """
637 return self.object_store.generate_pack_data(
638 have,
639 want,
640 shallow=self.get_shallow(),
641 progress=progress,
642 ofs_delta=ofs_delta,
643 )
645 def get_graph_walker(
646 self, heads: Optional[list[ObjectID]] = None
647 ) -> ObjectStoreGraphWalker:
648 """Retrieve a graph walker.
650 A graph walker is used by a remote repository (or proxy)
651 to find out which objects are present in this repository.
653 Args:
654 heads: Repository heads to use (optional)
655 Returns: A graph walker object
656 """
657 if heads is None:
658 heads = [
659 sha
660 for sha in self.refs.as_dict(b"refs/heads").values()
661 if sha in self.object_store
662 ]
663 parents_provider = ParentsProvider(self.object_store)
664 return ObjectStoreGraphWalker(
665 heads,
666 parents_provider.get_parents,
667 shallow=self.get_shallow(),
668 update_shallow=self.update_shallow,
669 )
671 def get_refs(self) -> dict[bytes, bytes]:
672 """Get dictionary with all refs.
674 Returns: A ``dict`` mapping ref names to SHA1s
675 """
676 return self.refs.as_dict()
678 def head(self) -> bytes:
679 """Return the SHA1 pointed at by HEAD."""
680 return self.refs[b"HEAD"]
682 def _get_object(self, sha, cls):
683 assert len(sha) in (20, 40)
684 ret = self.get_object(sha)
685 if not isinstance(ret, cls):
686 if cls is Commit:
687 raise NotCommitError(ret)
688 elif cls is Blob:
689 raise NotBlobError(ret)
690 elif cls is Tree:
691 raise NotTreeError(ret)
692 elif cls is Tag:
693 raise NotTagError(ret)
694 else:
695 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}")
696 return ret
698 def get_object(self, sha: bytes) -> ShaFile:
699 """Retrieve the object with the specified SHA.
701 Args:
702 sha: SHA to retrieve
703 Returns: A ShaFile object
704 Raises:
705 KeyError: when the object can not be found
706 """
707 return self.object_store[sha]
709 def parents_provider(self) -> ParentsProvider:
710 return ParentsProvider(
711 self.object_store,
712 grafts=self._graftpoints,
713 shallows=self.get_shallow(),
714 )
716 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]:
717 """Retrieve the parents of a specific commit.
719 If the specific commit is a graftpoint, the graft parents
720 will be returned instead.
722 Args:
723 sha: SHA of the commit for which to retrieve the parents
724 commit: Optional commit matching the sha
725 Returns: List of parents
726 """
727 return self.parents_provider().get_parents(sha, commit)
729 def get_config(self) -> "ConfigFile":
730 """Retrieve the config object.
732 Returns: `ConfigFile` object for the ``.git/config`` file.
733 """
734 raise NotImplementedError(self.get_config)
736 def get_worktree_config(self) -> "ConfigFile":
737 """Retrieve the worktree config object."""
738 raise NotImplementedError(self.get_worktree_config)
740 def get_description(self) -> Optional[str]:
741 """Retrieve the description for this repository.
743 Returns: String with the description of the repository
744 as set by the user.
745 """
746 raise NotImplementedError(self.get_description)
748 def set_description(self, description) -> None:
749 """Set the description for this repository.
751 Args:
752 description: Text to set as description for this repository.
753 """
754 raise NotImplementedError(self.set_description)
756 def get_rebase_state_manager(self):
757 """Get the appropriate rebase state manager for this repository.
759 Returns: RebaseStateManager instance
760 """
761 raise NotImplementedError(self.get_rebase_state_manager)
763 def get_config_stack(self) -> "StackedConfig":
764 """Return a config stack for this repository.
766 This stack accesses the configuration for both this repository
767 itself (.git/config) and the global configuration, which usually
768 lives in ~/.gitconfig.
770 Returns: `Config` instance for this repository
771 """
772 from .config import ConfigFile, StackedConfig
774 local_config = self.get_config()
775 backends: list[ConfigFile] = [local_config]
776 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
777 backends.append(self.get_worktree_config())
779 backends += StackedConfig.default_backends()
780 return StackedConfig(backends, writable=local_config)
782 def get_shallow(self) -> set[ObjectID]:
783 """Get the set of shallow commits.
785 Returns: Set of shallow commits.
786 """
787 f = self.get_named_file("shallow")
788 if f is None:
789 return set()
790 with f:
791 return {line.strip() for line in f}
793 def update_shallow(self, new_shallow, new_unshallow) -> None:
794 """Update the list of shallow objects.
796 Args:
797 new_shallow: Newly shallow objects
798 new_unshallow: Newly no longer shallow objects
799 """
800 shallow = self.get_shallow()
801 if new_shallow:
802 shallow.update(new_shallow)
803 if new_unshallow:
804 shallow.difference_update(new_unshallow)
805 if shallow:
806 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow]))
807 else:
808 self._del_named_file("shallow")
810 def get_peeled(self, ref: Ref) -> ObjectID:
811 """Get the peeled value of a ref.
813 Args:
814 ref: The refname to peel.
815 Returns: The fully-peeled SHA1 of a tag object, after peeling all
816 intermediate tags; if the original ref does not point to a tag,
817 this will equal the original SHA1.
818 """
819 cached = self.refs.get_peeled(ref)
820 if cached is not None:
821 return cached
822 return peel_sha(self.object_store, self.refs[ref])[1].id
824 @property
825 def notes(self) -> "Notes":
826 """Access notes functionality for this repository.
828 Returns:
829 Notes object for accessing notes
830 """
831 from .notes import Notes
833 return Notes(self.object_store, self.refs)
835 def get_walker(self, include: Optional[list[bytes]] = None, **kwargs):
836 """Obtain a walker for this repository.
838 Args:
839 include: Iterable of SHAs of commits to include along with their
840 ancestors. Defaults to [HEAD]
842 Keyword Args:
843 exclude: Iterable of SHAs of commits to exclude along with their
844 ancestors, overriding includes.
845 order: ORDER_* constant specifying the order of results.
846 Anything other than ORDER_DATE may result in O(n) memory usage.
847 reverse: If True, reverse the order of output, requiring O(n)
848 memory.
849 max_entries: The maximum number of entries to yield, or None for
850 no limit.
851 paths: Iterable of file or subtree paths to show entries for.
852 rename_detector: diff.RenameDetector object for detecting
853 renames.
854 follow: If True, follow path across renames/copies. Forces a
855 default rename_detector.
856 since: Timestamp to list commits after.
857 until: Timestamp to list commits before.
858 queue_cls: A class to use for a queue of commits, supporting the
859 iterator protocol. The constructor takes a single argument, the
860 Walker.
862 Returns: A `Walker` object
863 """
864 from .walk import Walker
866 if include is None:
867 include = [self.head()]
869 kwargs["get_parents"] = lambda commit: self.get_parents(commit.id, commit)
871 return Walker(self.object_store, include, **kwargs)
873 def __getitem__(self, name: Union[ObjectID, Ref]):
874 """Retrieve a Git object by SHA1 or ref.
876 Args:
877 name: A Git object SHA1 or a ref name
878 Returns: A `ShaFile` object, such as a Commit or Blob
879 Raises:
880 KeyError: when the specified ref or object does not exist
881 """
882 if not isinstance(name, bytes):
883 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}")
884 if len(name) in (20, 40):
885 try:
886 return self.object_store[name]
887 except (KeyError, ValueError):
888 pass
889 try:
890 return self.object_store[self.refs[name]]
891 except RefFormatError as exc:
892 raise KeyError(name) from exc
894 def __contains__(self, name: bytes) -> bool:
895 """Check if a specific Git object or ref is present.
897 Args:
898 name: Git object SHA1 or ref name
899 """
900 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)):
901 return name in self.object_store or name in self.refs
902 else:
903 return name in self.refs
905 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None:
906 """Set a ref.
908 Args:
909 name: ref name
910 value: Ref value - either a ShaFile object, or a hex sha
911 """
912 if name.startswith(b"refs/") or name == b"HEAD":
913 if isinstance(value, ShaFile):
914 self.refs[name] = value.id
915 elif isinstance(value, bytes):
916 self.refs[name] = value
917 else:
918 raise TypeError(value)
919 else:
920 raise ValueError(name)
922 def __delitem__(self, name: bytes) -> None:
923 """Remove a ref.
925 Args:
926 name: Name of the ref to remove
927 """
928 if name.startswith(b"refs/") or name == b"HEAD":
929 del self.refs[name]
930 else:
931 raise ValueError(name)
933 def _get_user_identity(
934 self, config: "StackedConfig", kind: Optional[str] = None
935 ) -> bytes:
936 """Determine the identity to use for new commits."""
937 warnings.warn(
938 "use get_user_identity() rather than Repo._get_user_identity",
939 DeprecationWarning,
940 )
941 return get_user_identity(config)
943 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None:
944 """Add or modify graftpoints.
946 Args:
947 updated_graftpoints: Dict of commit shas to list of parent shas
948 """
949 # Simple validation
950 for commit, parents in updated_graftpoints.items():
951 for sha in [commit, *parents]:
952 check_hexsha(sha, "Invalid graftpoint")
954 self._graftpoints.update(updated_graftpoints)
956 def _remove_graftpoints(self, to_remove: list[bytes] = []) -> None:
957 """Remove graftpoints.
959 Args:
960 to_remove: List of commit shas
961 """
962 for sha in to_remove:
963 del self._graftpoints[sha]
965 def _read_heads(self, name):
966 f = self.get_named_file(name)
967 if f is None:
968 return []
969 with f:
970 return [line.strip() for line in f.readlines() if line.strip()]
972 def do_commit(
973 self,
974 message: Optional[bytes] = None,
975 committer: Optional[bytes] = None,
976 author: Optional[bytes] = None,
977 commit_timestamp=None,
978 commit_timezone=None,
979 author_timestamp=None,
980 author_timezone=None,
981 tree: Optional[ObjectID] = None,
982 encoding: Optional[bytes] = None,
983 ref: Ref = b"HEAD",
984 merge_heads: Optional[list[ObjectID]] = None,
985 no_verify: bool = False,
986 sign: bool = False,
987 ):
988 """Create a new commit.
990 If not specified, committer and author default to
991 get_user_identity(..., 'COMMITTER')
992 and get_user_identity(..., 'AUTHOR') respectively.
994 Args:
995 message: Commit message
996 committer: Committer fullname
997 author: Author fullname
998 commit_timestamp: Commit timestamp (defaults to now)
999 commit_timezone: Commit timestamp timezone (defaults to GMT)
1000 author_timestamp: Author timestamp (defaults to commit
1001 timestamp)
1002 author_timezone: Author timestamp timezone
1003 (defaults to commit timestamp timezone)
1004 tree: SHA1 of the tree root to use (if not specified the
1005 current index will be committed).
1006 encoding: Encoding
1007 ref: Optional ref to commit to (defaults to current branch)
1008 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
1009 no_verify: Skip pre-commit and commit-msg hooks
1010 sign: GPG Sign the commit (bool, defaults to False,
1011 pass True to use default GPG key,
1012 pass a str containing Key ID to use a specific GPG key)
1014 Returns:
1015 New commit SHA1
1016 """
1017 try:
1018 if not no_verify:
1019 self.hooks["pre-commit"].execute()
1020 except HookError as exc:
1021 raise CommitError(exc) from exc
1022 except KeyError: # no hook defined, silent fallthrough
1023 pass
1025 c = Commit()
1026 if tree is None:
1027 index = self.open_index()
1028 c.tree = index.commit(self.object_store)
1029 else:
1030 if len(tree) != 40:
1031 raise ValueError("tree must be a 40-byte hex sha string")
1032 c.tree = tree
1034 config = self.get_config_stack()
1035 if merge_heads is None:
1036 merge_heads = self._read_heads("MERGE_HEAD")
1037 if committer is None:
1038 committer = get_user_identity(config, kind="COMMITTER")
1039 check_user_identity(committer)
1040 c.committer = committer
1041 if commit_timestamp is None:
1042 # FIXME: Support GIT_COMMITTER_DATE environment variable
1043 commit_timestamp = time.time()
1044 c.commit_time = int(commit_timestamp)
1045 if commit_timezone is None:
1046 # FIXME: Use current user timezone rather than UTC
1047 commit_timezone = 0
1048 c.commit_timezone = commit_timezone
1049 if author is None:
1050 author = get_user_identity(config, kind="AUTHOR")
1051 c.author = author
1052 check_user_identity(author)
1053 if author_timestamp is None:
1054 # FIXME: Support GIT_AUTHOR_DATE environment variable
1055 author_timestamp = commit_timestamp
1056 c.author_time = int(author_timestamp)
1057 if author_timezone is None:
1058 author_timezone = commit_timezone
1059 c.author_timezone = author_timezone
1060 if encoding is None:
1061 try:
1062 encoding = config.get(("i18n",), "commitEncoding")
1063 except KeyError:
1064 pass # No dice
1065 if encoding is not None:
1066 c.encoding = encoding
1067 if message is None:
1068 # FIXME: Try to read commit message from .git/MERGE_MSG
1069 raise ValueError("No commit message specified")
1071 try:
1072 if no_verify:
1073 c.message = message
1074 else:
1075 c.message = self.hooks["commit-msg"].execute(message)
1076 if c.message is None:
1077 c.message = message
1078 except HookError as exc:
1079 raise CommitError(exc) from exc
1080 except KeyError: # no hook defined, message not modified
1081 c.message = message
1083 keyid = sign if isinstance(sign, str) else None
1085 if ref is None:
1086 # Create a dangling commit
1087 c.parents = merge_heads
1088 if sign:
1089 c.sign(keyid)
1090 self.object_store.add_object(c)
1091 else:
1092 try:
1093 old_head = self.refs[ref]
1094 c.parents = [old_head, *merge_heads]
1095 if sign:
1096 c.sign(keyid)
1097 self.object_store.add_object(c)
1098 ok = self.refs.set_if_equals(
1099 ref,
1100 old_head,
1101 c.id,
1102 message=b"commit: " + message,
1103 committer=committer,
1104 timestamp=commit_timestamp,
1105 timezone=commit_timezone,
1106 )
1107 except KeyError:
1108 c.parents = merge_heads
1109 if sign:
1110 c.sign(keyid)
1111 self.object_store.add_object(c)
1112 ok = self.refs.add_if_new(
1113 ref,
1114 c.id,
1115 message=b"commit: " + message,
1116 committer=committer,
1117 timestamp=commit_timestamp,
1118 timezone=commit_timezone,
1119 )
1120 if not ok:
1121 # Fail if the atomic compare-and-swap failed, leaving the
1122 # commit and all its objects as garbage.
1123 raise CommitError(f"{ref!r} changed during commit")
1125 self._del_named_file("MERGE_HEAD")
1127 try:
1128 self.hooks["post-commit"].execute()
1129 except HookError as e: # silent failure
1130 warnings.warn(f"post-commit hook failed: {e}", UserWarning)
1131 except KeyError: # no hook defined, silent fallthrough
1132 pass
1134 # Trigger auto GC if needed
1135 from .gc import maybe_auto_gc
1137 maybe_auto_gc(self)
1139 return c.id
1142def read_gitfile(f):
1143 """Read a ``.git`` file.
1145 The first line of the file should start with "gitdir: "
1147 Args:
1148 f: File-like object to read from
1149 Returns: A path
1150 """
1151 cs = f.read()
1152 if not cs.startswith("gitdir: "):
1153 raise ValueError("Expected file to start with 'gitdir: '")
1154 return cs[len("gitdir: ") :].rstrip("\n")
1157class UnsupportedVersion(Exception):
1158 """Unsupported repository version."""
1160 def __init__(self, version) -> None:
1161 self.version = version
1164class UnsupportedExtension(Exception):
1165 """Unsupported repository extension."""
1167 def __init__(self, extension) -> None:
1168 self.extension = extension
1171class Repo(BaseRepo):
1172 """A git repository backed by local disk.
1174 To open an existing repository, call the constructor with
1175 the path of the repository.
1177 To create a new repository, use the Repo.init class method.
1179 Note that a repository object may hold on to resources such
1180 as file handles for performance reasons; call .close() to free
1181 up those resources.
1183 Attributes:
1184 path: Path to the working copy (if it exists) or repository control
1185 directory (if the repository is bare)
1186 bare: Whether this is a bare repository
1187 """
1189 path: str
1190 bare: bool
1192 def __init__(
1193 self,
1194 root: Union[str, bytes, os.PathLike],
1195 object_store: Optional[PackBasedObjectStore] = None,
1196 bare: Optional[bool] = None,
1197 ) -> None:
1198 """Open a repository on disk.
1200 Args:
1201 root: Path to the repository's root.
1202 object_store: ObjectStore to use; if omitted, we use the
1203 repository's default object store
1204 bare: True if this is a bare repository.
1205 """
1206 root = os.fspath(root)
1207 if isinstance(root, bytes):
1208 root = os.fsdecode(root)
1209 hidden_path = os.path.join(root, CONTROLDIR)
1210 if bare is None:
1211 if os.path.isfile(hidden_path) or os.path.isdir(
1212 os.path.join(hidden_path, OBJECTDIR)
1213 ):
1214 bare = False
1215 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(
1216 os.path.join(root, REFSDIR)
1217 ):
1218 bare = True
1219 else:
1220 raise NotGitRepository(
1221 "No git repository was found at {path}".format(**dict(path=root))
1222 )
1224 self.bare = bare
1225 if bare is False:
1226 if os.path.isfile(hidden_path):
1227 with open(hidden_path) as f:
1228 path = read_gitfile(f)
1229 self._controldir = os.path.join(root, path)
1230 else:
1231 self._controldir = hidden_path
1232 else:
1233 self._controldir = root
1234 commondir = self.get_named_file(COMMONDIR)
1235 if commondir is not None:
1236 with commondir:
1237 self._commondir = os.path.join(
1238 self.controldir(),
1239 os.fsdecode(commondir.read().rstrip(b"\r\n")),
1240 )
1241 else:
1242 self._commondir = self._controldir
1243 self.path = root
1245 # Initialize refs early so they're available for config condition matchers
1246 self.refs = DiskRefsContainer(
1247 self.commondir(), self._controldir, logger=self._write_reflog
1248 )
1250 config = self.get_config()
1251 try:
1252 repository_format_version = config.get("core", "repositoryformatversion")
1253 format_version = (
1254 0
1255 if repository_format_version is None
1256 else int(repository_format_version)
1257 )
1258 except KeyError:
1259 format_version = 0
1261 if format_version not in (0, 1):
1262 raise UnsupportedVersion(format_version)
1264 for extension, _value in config.items((b"extensions",)):
1265 if extension.lower() not in (b"worktreeconfig",):
1266 raise UnsupportedExtension(extension)
1268 if object_store is None:
1269 object_store = DiskObjectStore.from_config(
1270 os.path.join(self.commondir(), OBJECTDIR), config
1271 )
1272 BaseRepo.__init__(self, object_store, self.refs)
1274 self._graftpoints = {}
1275 graft_file = self.get_named_file(
1276 os.path.join("info", "grafts"), basedir=self.commondir()
1277 )
1278 if graft_file:
1279 with graft_file:
1280 self._graftpoints.update(parse_graftpoints(graft_file))
1281 graft_file = self.get_named_file("shallow", basedir=self.commondir())
1282 if graft_file:
1283 with graft_file:
1284 self._graftpoints.update(parse_graftpoints(graft_file))
1286 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir())
1287 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir())
1288 self.hooks["post-commit"] = PostCommitShellHook(self.controldir())
1289 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir())
1291 def _write_reflog(
1292 self, ref, old_sha, new_sha, committer, timestamp, timezone, message
1293 ) -> None:
1294 from .reflog import format_reflog_line
1296 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref))
1297 try:
1298 os.makedirs(os.path.dirname(path))
1299 except FileExistsError:
1300 pass
1301 if committer is None:
1302 config = self.get_config_stack()
1303 committer = get_user_identity(config)
1304 check_user_identity(committer)
1305 if timestamp is None:
1306 timestamp = int(time.time())
1307 if timezone is None:
1308 timezone = 0 # FIXME
1309 with open(path, "ab") as f:
1310 f.write(
1311 format_reflog_line(
1312 old_sha, new_sha, committer, timestamp, timezone, message
1313 )
1314 + b"\n"
1315 )
1317 @classmethod
1318 def discover(cls, start="."):
1319 """Iterate parent directories to discover a repository.
1321 Return a Repo object for the first parent directory that looks like a
1322 Git repository.
1324 Args:
1325 start: The directory to start discovery from (defaults to '.')
1326 """
1327 remaining = True
1328 path = os.path.abspath(start)
1329 while remaining:
1330 try:
1331 return cls(path)
1332 except NotGitRepository:
1333 path, remaining = os.path.split(path)
1334 raise NotGitRepository(
1335 "No git repository was found at {path}".format(**dict(path=start))
1336 )
1338 def controldir(self):
1339 """Return the path of the control directory."""
1340 return self._controldir
1342 def commondir(self):
1343 """Return the path of the common directory.
1345 For a main working tree, it is identical to controldir().
1347 For a linked working tree, it is the control directory of the
1348 main working tree.
1349 """
1350 return self._commondir
1352 def _determine_file_mode(self):
1353 """Probe the file-system to determine whether permissions can be trusted.
1355 Returns: True if permissions can be trusted, False otherwise.
1356 """
1357 fname = os.path.join(self.path, ".probe-permissions")
1358 with open(fname, "w") as f:
1359 f.write("")
1361 st1 = os.lstat(fname)
1362 try:
1363 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR)
1364 except PermissionError:
1365 return False
1366 st2 = os.lstat(fname)
1368 os.unlink(fname)
1370 mode_differs = st1.st_mode != st2.st_mode
1371 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0
1373 return mode_differs and st2_has_exec
1375 def _determine_symlinks(self):
1376 """Probe the filesystem to determine whether symlinks can be created.
1378 Returns: True if symlinks can be created, False otherwise.
1379 """
1380 # TODO(jelmer): Actually probe disk / look at filesystem
1381 return sys.platform != "win32"
1383 def _put_named_file(self, path, contents) -> None:
1384 """Write a file to the control dir with the given name and contents.
1386 Args:
1387 path: The path to the file, relative to the control dir.
1388 contents: A string to write to the file.
1389 """
1390 path = path.lstrip(os.path.sep)
1391 with GitFile(os.path.join(self.controldir(), path), "wb") as f:
1392 f.write(contents)
1394 def _del_named_file(self, path) -> None:
1395 try:
1396 os.unlink(os.path.join(self.controldir(), path))
1397 except FileNotFoundError:
1398 return
1400 def get_named_file(self, path, basedir=None):
1401 """Get a file from the control dir with a specific name.
1403 Although the filename should be interpreted as a filename relative to
1404 the control dir in a disk-based Repo, the object returned need not be
1405 pointing to a file in that location.
1407 Args:
1408 path: The path to the file, relative to the control dir.
1409 basedir: Optional argument that specifies an alternative to the
1410 control dir.
1411 Returns: An open file object, or None if the file does not exist.
1412 """
1413 # TODO(dborowitz): sanitize filenames, since this is used directly by
1414 # the dumb web serving code.
1415 if basedir is None:
1416 basedir = self.controldir()
1417 path = path.lstrip(os.path.sep)
1418 try:
1419 return open(os.path.join(basedir, path), "rb")
1420 except FileNotFoundError:
1421 return None
1423 def index_path(self):
1424 """Return path to the index file."""
1425 return os.path.join(self.controldir(), INDEX_FILENAME)
1427 def open_index(self) -> "Index":
1428 """Open the index for this repository.
1430 Raises:
1431 NoIndexPresent: If no index is present
1432 Returns: The matching `Index`
1433 """
1434 from .index import Index
1436 if not self.has_index():
1437 raise NoIndexPresent
1439 # Check for manyFiles feature configuration
1440 config = self.get_config_stack()
1441 many_files = config.get_boolean(b"feature", b"manyFiles", False)
1442 skip_hash = False
1443 index_version = None
1445 if many_files:
1446 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true
1447 try:
1448 index_version_str = config.get(b"index", b"version")
1449 index_version = int(index_version_str)
1450 except KeyError:
1451 index_version = 4 # Default to version 4 for manyFiles
1452 skip_hash = config.get_boolean(b"index", b"skipHash", True)
1453 else:
1454 # Check for explicit index settings
1455 try:
1456 index_version_str = config.get(b"index", b"version")
1457 index_version = int(index_version_str)
1458 except KeyError:
1459 index_version = None
1460 skip_hash = config.get_boolean(b"index", b"skipHash", False)
1462 return Index(self.index_path(), skip_hash=skip_hash, version=index_version)
1464 def has_index(self) -> bool:
1465 """Check if an index is present."""
1466 # Bare repos must never have index files; non-bare repos may have a
1467 # missing index file, which is treated as empty.
1468 return not self.bare
1470 def stage(
1471 self,
1472 fs_paths: Union[
1473 str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]]
1474 ],
1475 ) -> None:
1476 """Stage a set of paths.
1478 Args:
1479 fs_paths: List of paths, relative to the repository path
1480 """
1481 root_path_bytes = os.fsencode(self.path)
1483 if isinstance(fs_paths, (str, bytes, os.PathLike)):
1484 fs_paths = [fs_paths]
1485 fs_paths = list(fs_paths)
1487 from .index import (
1488 _fs_to_tree_path,
1489 blob_from_path_and_stat,
1490 index_entry_from_directory,
1491 index_entry_from_stat,
1492 )
1494 index = self.open_index()
1495 blob_normalizer = self.get_blob_normalizer()
1496 for fs_path in fs_paths:
1497 if not isinstance(fs_path, bytes):
1498 fs_path = os.fsencode(fs_path)
1499 if os.path.isabs(fs_path):
1500 raise ValueError(
1501 f"path {fs_path!r} should be relative to "
1502 "repository root, not absolute"
1503 )
1504 tree_path = _fs_to_tree_path(fs_path)
1505 full_path = os.path.join(root_path_bytes, fs_path)
1506 try:
1507 st = os.lstat(full_path)
1508 except OSError:
1509 # File no longer exists
1510 try:
1511 del index[tree_path]
1512 except KeyError:
1513 pass # already removed
1514 else:
1515 if stat.S_ISDIR(st.st_mode):
1516 entry = index_entry_from_directory(st, full_path)
1517 if entry:
1518 index[tree_path] = entry
1519 else:
1520 try:
1521 del index[tree_path]
1522 except KeyError:
1523 pass
1524 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
1525 try:
1526 del index[tree_path]
1527 except KeyError:
1528 pass
1529 else:
1530 blob = blob_from_path_and_stat(full_path, st)
1531 blob = blob_normalizer.checkin_normalize(blob, fs_path)
1532 self.object_store.add_object(blob)
1533 index[tree_path] = index_entry_from_stat(st, blob.id)
1534 index.write()
1536 def unstage(self, fs_paths: list[str]) -> None:
1537 """Unstage specific file in the index
1538 Args:
1539 fs_paths: a list of files to unstage,
1540 relative to the repository path.
1541 """
1542 from .index import IndexEntry, _fs_to_tree_path
1544 index = self.open_index()
1545 try:
1546 tree_id = self[b"HEAD"].tree
1547 except KeyError:
1548 # no head mean no commit in the repo
1549 for fs_path in fs_paths:
1550 tree_path = _fs_to_tree_path(fs_path)
1551 del index[tree_path]
1552 index.write()
1553 return
1555 for fs_path in fs_paths:
1556 tree_path = _fs_to_tree_path(fs_path)
1557 try:
1558 tree = self.object_store[tree_id]
1559 assert isinstance(tree, Tree)
1560 tree_entry = tree.lookup_path(self.object_store.__getitem__, tree_path)
1561 except KeyError:
1562 # if tree_entry didn't exist, this file was being added, so
1563 # remove index entry
1564 try:
1565 del index[tree_path]
1566 continue
1567 except KeyError as exc:
1568 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc
1570 st = None
1571 try:
1572 st = os.lstat(os.path.join(self.path, fs_path))
1573 except FileNotFoundError:
1574 pass
1576 index_entry = IndexEntry(
1577 ctime=(self[b"HEAD"].commit_time, 0),
1578 mtime=(self[b"HEAD"].commit_time, 0),
1579 dev=st.st_dev if st else 0,
1580 ino=st.st_ino if st else 0,
1581 mode=tree_entry[0],
1582 uid=st.st_uid if st else 0,
1583 gid=st.st_gid if st else 0,
1584 size=len(self[tree_entry[1]].data),
1585 sha=tree_entry[1],
1586 flags=0,
1587 extended_flags=0,
1588 )
1590 index[tree_path] = index_entry
1591 index.write()
1593 def clone(
1594 self,
1595 target_path,
1596 *,
1597 mkdir=True,
1598 bare=False,
1599 origin=b"origin",
1600 checkout=None,
1601 branch=None,
1602 progress=None,
1603 depth: Optional[int] = None,
1604 symlinks=None,
1605 ) -> "Repo":
1606 """Clone this repository.
1608 Args:
1609 target_path: Target path
1610 mkdir: Create the target directory
1611 bare: Whether to create a bare repository
1612 checkout: Whether or not to check-out HEAD after cloning
1613 origin: Base name for refs in target repository
1614 cloned from this repository
1615 branch: Optional branch or tag to be used as HEAD in the new repository
1616 instead of this repository's HEAD.
1617 progress: Optional progress function
1618 depth: Depth at which to fetch
1619 symlinks: Symlinks setting (default to autodetect)
1620 Returns: Created repository as `Repo`
1621 """
1622 encoded_path = os.fsencode(self.path)
1624 if mkdir:
1625 os.mkdir(target_path)
1627 try:
1628 if not bare:
1629 target = Repo.init(target_path, symlinks=symlinks)
1630 if checkout is None:
1631 checkout = True
1632 else:
1633 if checkout:
1634 raise ValueError("checkout and bare are incompatible")
1635 target = Repo.init_bare(target_path)
1637 try:
1638 target_config = target.get_config()
1639 target_config.set((b"remote", origin), b"url", encoded_path)
1640 target_config.set(
1641 (b"remote", origin),
1642 b"fetch",
1643 b"+refs/heads/*:refs/remotes/" + origin + b"/*",
1644 )
1645 target_config.write_to_path()
1647 ref_message = b"clone: from " + encoded_path
1648 self.fetch(target, depth=depth)
1649 target.refs.import_refs(
1650 b"refs/remotes/" + origin,
1651 self.refs.as_dict(b"refs/heads"),
1652 message=ref_message,
1653 )
1654 target.refs.import_refs(
1655 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message
1656 )
1658 head_chain, origin_sha = self.refs.follow(b"HEAD")
1659 origin_head = head_chain[-1] if head_chain else None
1660 if origin_sha and not origin_head:
1661 # set detached HEAD
1662 target.refs[b"HEAD"] = origin_sha
1663 else:
1664 _set_origin_head(target.refs, origin, origin_head)
1665 head_ref = _set_default_branch(
1666 target.refs, origin, origin_head, branch, ref_message
1667 )
1669 # Update target head
1670 if head_ref:
1671 head = _set_head(target.refs, head_ref, ref_message)
1672 else:
1673 head = None
1675 if checkout and head is not None:
1676 target.reset_index()
1677 except BaseException:
1678 target.close()
1679 raise
1680 except BaseException:
1681 if mkdir:
1682 import shutil
1684 shutil.rmtree(target_path)
1685 raise
1686 return target
1688 def reset_index(self, tree: Optional[bytes] = None):
1689 """Reset the index back to a specific tree.
1691 Args:
1692 tree: Tree SHA to reset to, None for current HEAD tree.
1693 """
1694 from .index import (
1695 build_index_from_tree,
1696 symlink,
1697 validate_path_element_default,
1698 validate_path_element_ntfs,
1699 )
1701 if tree is None:
1702 head = self[b"HEAD"]
1703 if isinstance(head, Tag):
1704 _cls, obj = head.object
1705 head = self.get_object(obj)
1706 tree = head.tree
1707 config = self.get_config()
1708 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
1709 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
1710 validate_path_element = validate_path_element_ntfs
1711 else:
1712 validate_path_element = validate_path_element_default
1713 if config.get_boolean(b"core", b"symlinks", True):
1714 symlink_fn = symlink
1715 else:
1717 def symlink_fn(source, target) -> None: # type: ignore
1718 with open(
1719 target, "w" + ("b" if isinstance(source, bytes) else "")
1720 ) as f:
1721 f.write(source)
1723 blob_normalizer = self.get_blob_normalizer()
1724 return build_index_from_tree(
1725 self.path,
1726 self.index_path(),
1727 self.object_store,
1728 tree,
1729 honor_filemode=honor_filemode,
1730 validate_path_element=validate_path_element,
1731 symlink_fn=symlink_fn,
1732 blob_normalizer=blob_normalizer,
1733 )
1735 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]:
1736 """Get condition matchers for includeIf conditions.
1738 Returns a dict of condition prefix to matcher function.
1739 """
1740 from pathlib import Path
1742 from .config import ConditionMatcher, match_glob_pattern
1744 # Add gitdir matchers
1745 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool:
1746 # Handle relative patterns (starting with ./)
1747 if pattern.startswith("./"):
1748 # Can't handle relative patterns without config directory context
1749 return False
1751 # Normalize repository path
1752 try:
1753 repo_path = str(Path(self._controldir).resolve())
1754 except (OSError, ValueError):
1755 return False
1757 # Expand ~ in pattern and normalize
1758 pattern = os.path.expanduser(pattern)
1760 # Normalize pattern following Git's rules
1761 pattern = pattern.replace("\\", "/")
1762 if not pattern.startswith(("~/", "./", "/", "**")):
1763 # Check for Windows absolute path
1764 if len(pattern) >= 2 and pattern[1] == ":":
1765 pass
1766 else:
1767 pattern = "**/" + pattern
1768 if pattern.endswith("/"):
1769 pattern = pattern + "**"
1771 # Use the existing _match_gitdir_pattern function
1772 from .config import _match_gitdir_pattern
1774 pattern_bytes = pattern.encode("utf-8", errors="replace")
1775 repo_path_bytes = repo_path.encode("utf-8", errors="replace")
1777 return _match_gitdir_pattern(
1778 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive
1779 )
1781 # Add onbranch matcher
1782 def match_onbranch(pattern: str) -> bool:
1783 try:
1784 # Get the current branch using refs
1785 ref_chain, _ = self.refs.follow(b"HEAD")
1786 head_ref = ref_chain[-1] # Get the final resolved ref
1787 except KeyError:
1788 pass
1789 else:
1790 if head_ref and head_ref.startswith(b"refs/heads/"):
1791 # Extract branch name from ref
1792 branch = head_ref[11:].decode("utf-8", errors="replace")
1793 return match_glob_pattern(branch, pattern)
1794 return False
1796 matchers: dict[str, ConditionMatcher] = {
1797 "onbranch:": match_onbranch,
1798 "gitdir:": lambda pattern: match_gitdir(pattern, True),
1799 "gitdir/i:": lambda pattern: match_gitdir(pattern, False),
1800 }
1802 return matchers
1804 def get_worktree_config(self) -> "ConfigFile":
1805 from .config import ConfigFile
1807 path = os.path.join(self.commondir(), "config.worktree")
1808 try:
1809 # Pass condition matchers for includeIf evaluation
1810 condition_matchers = self._get_config_condition_matchers()
1811 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1812 except FileNotFoundError:
1813 cf = ConfigFile()
1814 cf.path = path
1815 return cf
1817 def get_config(self) -> "ConfigFile":
1818 """Retrieve the config object.
1820 Returns: `ConfigFile` object for the ``.git/config`` file.
1821 """
1822 from .config import ConfigFile
1824 path = os.path.join(self._commondir, "config")
1825 try:
1826 # Pass condition matchers for includeIf evaluation
1827 condition_matchers = self._get_config_condition_matchers()
1828 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1829 except FileNotFoundError:
1830 ret = ConfigFile()
1831 ret.path = path
1832 return ret
1834 def get_rebase_state_manager(self):
1835 """Get the appropriate rebase state manager for this repository.
1837 Returns: DiskRebaseStateManager instance
1838 """
1839 import os
1841 from .rebase import DiskRebaseStateManager
1843 path = os.path.join(self.controldir(), "rebase-merge")
1844 return DiskRebaseStateManager(path)
1846 def get_description(self):
1847 """Retrieve the description of this repository.
1849 Returns: A string describing the repository or None.
1850 """
1851 path = os.path.join(self._controldir, "description")
1852 try:
1853 with GitFile(path, "rb") as f:
1854 return f.read()
1855 except FileNotFoundError:
1856 return None
1858 def __repr__(self) -> str:
1859 return f"<Repo at {self.path!r}>"
1861 def set_description(self, description) -> None:
1862 """Set the description for this repository.
1864 Args:
1865 description: Text to set as description for this repository.
1866 """
1867 self._put_named_file("description", description)
1869 @classmethod
1870 def _init_maybe_bare(
1871 cls,
1872 path: Union[str, bytes, os.PathLike],
1873 controldir: Union[str, bytes, os.PathLike],
1874 bare,
1875 object_store=None,
1876 config=None,
1877 default_branch=None,
1878 symlinks: Optional[bool] = None,
1879 format: Optional[int] = None,
1880 ):
1881 path = os.fspath(path)
1882 if isinstance(path, bytes):
1883 path = os.fsdecode(path)
1884 controldir = os.fspath(controldir)
1885 if isinstance(controldir, bytes):
1886 controldir = os.fsdecode(controldir)
1887 for d in BASE_DIRECTORIES:
1888 os.mkdir(os.path.join(controldir, *d))
1889 if object_store is None:
1890 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR))
1891 ret = cls(path, bare=bare, object_store=object_store)
1892 if default_branch is None:
1893 if config is None:
1894 from .config import StackedConfig
1896 config = StackedConfig.default()
1897 try:
1898 default_branch = config.get("init", "defaultBranch")
1899 except KeyError:
1900 default_branch = DEFAULT_BRANCH
1901 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch)
1902 ret._init_files(bare=bare, symlinks=symlinks, format=format)
1903 return ret
1905 @classmethod
1906 def init(
1907 cls,
1908 path: Union[str, bytes, os.PathLike],
1909 *,
1910 mkdir: bool = False,
1911 config=None,
1912 default_branch=None,
1913 symlinks: Optional[bool] = None,
1914 format: Optional[int] = None,
1915 ) -> "Repo":
1916 """Create a new repository.
1918 Args:
1919 path: Path in which to create the repository
1920 mkdir: Whether to create the directory
1921 format: Repository format version (defaults to 0)
1922 Returns: `Repo` instance
1923 """
1924 path = os.fspath(path)
1925 if isinstance(path, bytes):
1926 path = os.fsdecode(path)
1927 if mkdir:
1928 os.mkdir(path)
1929 controldir = os.path.join(path, CONTROLDIR)
1930 os.mkdir(controldir)
1931 _set_filesystem_hidden(controldir)
1932 return cls._init_maybe_bare(
1933 path,
1934 controldir,
1935 False,
1936 config=config,
1937 default_branch=default_branch,
1938 symlinks=symlinks,
1939 format=format,
1940 )
1942 @classmethod
1943 def _init_new_working_directory(
1944 cls,
1945 path: Union[str, bytes, os.PathLike],
1946 main_repo,
1947 identifier=None,
1948 mkdir=False,
1949 ):
1950 """Create a new working directory linked to a repository.
1952 Args:
1953 path: Path in which to create the working tree.
1954 main_repo: Main repository to reference
1955 identifier: Worktree identifier
1956 mkdir: Whether to create the directory
1957 Returns: `Repo` instance
1958 """
1959 path = os.fspath(path)
1960 if isinstance(path, bytes):
1961 path = os.fsdecode(path)
1962 if mkdir:
1963 os.mkdir(path)
1964 if identifier is None:
1965 identifier = os.path.basename(path)
1966 main_worktreesdir = os.path.join(main_repo.controldir(), WORKTREES)
1967 worktree_controldir = os.path.join(main_worktreesdir, identifier)
1968 gitdirfile = os.path.join(path, CONTROLDIR)
1969 with open(gitdirfile, "wb") as f:
1970 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n")
1971 try:
1972 os.mkdir(main_worktreesdir)
1973 except FileExistsError:
1974 pass
1975 try:
1976 os.mkdir(worktree_controldir)
1977 except FileExistsError:
1978 pass
1979 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f:
1980 f.write(os.fsencode(gitdirfile) + b"\n")
1981 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f:
1982 f.write(b"../..\n")
1983 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f:
1984 f.write(main_repo.head() + b"\n")
1985 r = cls(path)
1986 r.reset_index()
1987 return r
1989 @classmethod
1990 def init_bare(
1991 cls,
1992 path: Union[str, bytes, os.PathLike],
1993 *,
1994 mkdir=False,
1995 object_store=None,
1996 config=None,
1997 default_branch=None,
1998 format: Optional[int] = None,
1999 ):
2000 """Create a new bare repository.
2002 ``path`` should already exist and be an empty directory.
2004 Args:
2005 path: Path to create bare repository in
2006 format: Repository format version (defaults to 0)
2007 Returns: a `Repo` instance
2008 """
2009 path = os.fspath(path)
2010 if isinstance(path, bytes):
2011 path = os.fsdecode(path)
2012 if mkdir:
2013 os.mkdir(path)
2014 return cls._init_maybe_bare(
2015 path,
2016 path,
2017 True,
2018 object_store=object_store,
2019 config=config,
2020 default_branch=default_branch,
2021 format=format,
2022 )
2024 create = init_bare
2026 def close(self) -> None:
2027 """Close any files opened by this repository."""
2028 self.object_store.close()
2030 def __enter__(self):
2031 return self
2033 def __exit__(self, exc_type, exc_val, exc_tb):
2034 self.close()
2036 def get_blob_normalizer(self):
2037 """Return a BlobNormalizer object."""
2038 # TODO Parse the git attributes files
2039 git_attributes = {}
2040 config_stack = self.get_config_stack()
2041 try:
2042 head_sha = self.refs[b"HEAD"]
2043 # Peel tags to get the underlying commit
2044 _, obj = peel_sha(self.object_store, head_sha)
2045 tree = obj.tree
2046 return TreeBlobNormalizer(
2047 config_stack,
2048 git_attributes,
2049 self.object_store,
2050 tree,
2051 )
2052 except KeyError:
2053 return BlobNormalizer(config_stack, git_attributes)
2055 def _sparse_checkout_file_path(self) -> str:
2056 """Return the path of the sparse-checkout file in this repo's control dir."""
2057 return os.path.join(self.controldir(), "info", "sparse-checkout")
2059 def configure_for_cone_mode(self) -> None:
2060 """Ensure the repository is configured for cone-mode sparse-checkout."""
2061 config = self.get_config()
2062 config.set((b"core",), b"sparseCheckout", b"true")
2063 config.set((b"core",), b"sparseCheckoutCone", b"true")
2064 config.write_to_path()
2066 def infer_cone_mode(self) -> bool:
2067 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
2068 config = self.get_config()
2069 try:
2070 sc_cone = config.get((b"core",), b"sparseCheckoutCone")
2071 return sc_cone == b"true"
2072 except KeyError:
2073 # If core.sparseCheckoutCone is not set, default to False
2074 return False
2076 def get_sparse_checkout_patterns(self) -> list[str]:
2077 """Return a list of sparse-checkout patterns from info/sparse-checkout.
2079 Returns:
2080 A list of patterns. Returns an empty list if the file is missing.
2081 """
2082 path = self._sparse_checkout_file_path()
2083 try:
2084 with open(path, encoding="utf-8") as f:
2085 return [line.strip() for line in f if line.strip()]
2086 except FileNotFoundError:
2087 return []
2089 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None:
2090 """Write the given sparse-checkout patterns into info/sparse-checkout.
2092 Creates the info/ directory if it does not exist.
2094 Args:
2095 patterns: A list of gitignore-style patterns to store.
2096 """
2097 info_dir = os.path.join(self.controldir(), "info")
2098 os.makedirs(info_dir, exist_ok=True)
2100 path = self._sparse_checkout_file_path()
2101 with open(path, "w", encoding="utf-8") as f:
2102 for pat in patterns:
2103 f.write(pat + "\n")
2105 def set_cone_mode_patterns(self, dirs: Union[list[str], None] = None) -> None:
2106 """Write the given cone-mode directory patterns into info/sparse-checkout.
2108 For each directory to include, add an inclusion line that "undoes" the prior
2109 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
2110 Never add the same line twice.
2111 """
2112 patterns = ["/*", "!/*/"]
2113 if dirs:
2114 for d in dirs:
2115 d = d.strip("/")
2116 line = f"/{d}/"
2117 if d and line not in patterns:
2118 patterns.append(line)
2119 self.set_sparse_checkout_patterns(patterns)
2122class MemoryRepo(BaseRepo):
2123 """Repo that stores refs, objects, and named files in memory.
2125 MemoryRepos are always bare: they have no working tree and no index, since
2126 those have a stronger dependency on the filesystem.
2127 """
2129 def __init__(self) -> None:
2130 """Create a new repository in memory."""
2131 from .config import ConfigFile
2133 self._reflog: list[Any] = []
2134 refs_container = DictRefsContainer({}, logger=self._append_reflog)
2135 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) # type: ignore
2136 self._named_files: dict[str, bytes] = {}
2137 self.bare = True
2138 self._config = ConfigFile()
2139 self._description = None
2141 def _append_reflog(self, *args) -> None:
2142 self._reflog.append(args)
2144 def set_description(self, description) -> None:
2145 self._description = description
2147 def get_description(self):
2148 return self._description
2150 def _determine_file_mode(self):
2151 """Probe the file-system to determine whether permissions can be trusted.
2153 Returns: True if permissions can be trusted, False otherwise.
2154 """
2155 return sys.platform != "win32"
2157 def _determine_symlinks(self):
2158 """Probe the file-system to determine whether permissions can be trusted.
2160 Returns: True if permissions can be trusted, False otherwise.
2161 """
2162 return sys.platform != "win32"
2164 def _put_named_file(self, path, contents) -> None:
2165 """Write a file to the control dir with the given name and contents.
2167 Args:
2168 path: The path to the file, relative to the control dir.
2169 contents: A string to write to the file.
2170 """
2171 self._named_files[path] = contents
2173 def _del_named_file(self, path) -> None:
2174 try:
2175 del self._named_files[path]
2176 except KeyError:
2177 pass
2179 def get_named_file(self, path, basedir=None):
2180 """Get a file from the control dir with a specific name.
2182 Although the filename should be interpreted as a filename relative to
2183 the control dir in a disk-baked Repo, the object returned need not be
2184 pointing to a file in that location.
2186 Args:
2187 path: The path to the file, relative to the control dir.
2188 Returns: An open file object, or None if the file does not exist.
2189 """
2190 contents = self._named_files.get(path, None)
2191 if contents is None:
2192 return None
2193 return BytesIO(contents)
2195 def open_index(self) -> "Index":
2196 """Fail to open index for this repo, since it is bare.
2198 Raises:
2199 NoIndexPresent: Raised when no index is present
2200 """
2201 raise NoIndexPresent
2203 def get_config(self):
2204 """Retrieve the config object.
2206 Returns: `ConfigFile` object.
2207 """
2208 return self._config
2210 def get_rebase_state_manager(self):
2211 """Get the appropriate rebase state manager for this repository.
2213 Returns: MemoryRebaseStateManager instance
2214 """
2215 from .rebase import MemoryRebaseStateManager
2217 return MemoryRebaseStateManager(self)
2219 @classmethod
2220 def init_bare(cls, objects, refs, format: Optional[int] = None):
2221 """Create a new bare repository in memory.
2223 Args:
2224 objects: Objects for the new repository,
2225 as iterable
2226 refs: Refs as dictionary, mapping names
2227 to object SHA1s
2228 format: Repository format version (defaults to 0)
2229 """
2230 ret = cls()
2231 for obj in objects:
2232 ret.object_store.add_object(obj)
2233 for refname, sha in refs.items():
2234 ret.refs.add_if_new(refname, sha)
2235 ret._init_files(bare=True, format=format)
2236 return ret