Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# repo.py -- For dealing with git repositories.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
24"""Repository access.
26This module contains the base class for git repositories
27(BaseRepo) and an implementation which uses a repository on
28local disk (Repo).
30"""
32import os
33import stat
34import sys
35import time
36import warnings
37from collections.abc import Iterable
38from io import BytesIO
39from typing import (
40 TYPE_CHECKING,
41 Any,
42 BinaryIO,
43 Callable,
44 Optional,
45 Union,
46)
48if TYPE_CHECKING:
49 # There are no circular imports here, but we try to defer imports as long
50 # as possible to reduce start-up time for anything that doesn't need
51 # these imports.
52 from .attrs import GitAttributes
53 from .config import ConditionMatcher, ConfigFile, StackedConfig
54 from .index import Index
55 from .notes import Notes
56 from .worktree import WorkTree
58from . import replace_me
59from .errors import (
60 NoIndexPresent,
61 NotBlobError,
62 NotCommitError,
63 NotGitRepository,
64 NotTagError,
65 NotTreeError,
66 RefFormatError,
67)
68from .file import GitFile
69from .hooks import (
70 CommitMsgShellHook,
71 Hook,
72 PostCommitShellHook,
73 PostReceiveShellHook,
74 PreCommitShellHook,
75)
76from .object_store import (
77 DiskObjectStore,
78 MemoryObjectStore,
79 MissingObjectFinder,
80 ObjectStoreGraphWalker,
81 PackBasedObjectStore,
82 find_shallow,
83 peel_sha,
84)
85from .objects import (
86 Blob,
87 Commit,
88 ObjectID,
89 ShaFile,
90 Tag,
91 Tree,
92 check_hexsha,
93 valid_hexsha,
94)
95from .pack import generate_unpacked_objects
96from .refs import (
97 ANNOTATED_TAG_SUFFIX, # noqa: F401
98 LOCAL_BRANCH_PREFIX,
99 LOCAL_TAG_PREFIX, # noqa: F401
100 SYMREF, # noqa: F401
101 DictRefsContainer,
102 DiskRefsContainer,
103 InfoRefsContainer, # noqa: F401
104 Ref,
105 RefsContainer,
106 _set_default_branch,
107 _set_head,
108 _set_origin_head,
109 check_ref_format, # noqa: F401
110 read_packed_refs, # noqa: F401
111 read_packed_refs_with_peeled, # noqa: F401
112 serialize_refs,
113 write_packed_refs, # noqa: F401
114)
116CONTROLDIR = ".git"
117OBJECTDIR = "objects"
118REFSDIR = "refs"
119REFSDIR_TAGS = "tags"
120REFSDIR_HEADS = "heads"
121INDEX_FILENAME = "index"
122COMMONDIR = "commondir"
123GITDIR = "gitdir"
124WORKTREES = "worktrees"
126BASE_DIRECTORIES = [
127 ["branches"],
128 [REFSDIR],
129 [REFSDIR, REFSDIR_TAGS],
130 [REFSDIR, REFSDIR_HEADS],
131 ["hooks"],
132 ["info"],
133]
135DEFAULT_BRANCH = b"master"
138class InvalidUserIdentity(Exception):
139 """User identity is not of the format 'user <email>'."""
141 def __init__(self, identity) -> None:
142 self.identity = identity
145class DefaultIdentityNotFound(Exception):
146 """Default identity could not be determined."""
149# TODO(jelmer): Cache?
150def _get_default_identity() -> tuple[str, str]:
151 import socket
153 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
154 username = os.environ.get(name)
155 if username:
156 break
157 else:
158 username = None
160 try:
161 import pwd
162 except ImportError:
163 fullname = None
164 else:
165 try:
166 entry = pwd.getpwuid(os.getuid()) # type: ignore
167 except KeyError:
168 fullname = None
169 else:
170 if getattr(entry, "gecos", None):
171 fullname = entry.pw_gecos.split(",")[0]
172 else:
173 fullname = None
174 if username is None:
175 username = entry.pw_name
176 if not fullname:
177 if username is None:
178 raise DefaultIdentityNotFound("no username found")
179 fullname = username
180 email = os.environ.get("EMAIL")
181 if email is None:
182 if username is None:
183 raise DefaultIdentityNotFound("no username found")
184 email = f"{username}@{socket.gethostname()}"
185 return (fullname, email)
188def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes:
189 """Determine the identity to use for new commits.
191 If kind is set, this first checks
192 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL.
194 If those variables are not set, then it will fall back
195 to reading the user.name and user.email settings from
196 the specified configuration.
198 If that also fails, then it will fall back to using
199 the current users' identity as obtained from the host
200 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f).
202 Args:
203 kind: Optional kind to return identity for,
204 usually either "AUTHOR" or "COMMITTER".
206 Returns:
207 A user identity
208 """
209 user: Optional[bytes] = None
210 email: Optional[bytes] = None
211 if kind:
212 user_uc = os.environ.get("GIT_" + kind + "_NAME")
213 if user_uc is not None:
214 user = user_uc.encode("utf-8")
215 email_uc = os.environ.get("GIT_" + kind + "_EMAIL")
216 if email_uc is not None:
217 email = email_uc.encode("utf-8")
218 if user is None:
219 try:
220 user = config.get(("user",), "name")
221 except KeyError:
222 user = None
223 if email is None:
224 try:
225 email = config.get(("user",), "email")
226 except KeyError:
227 email = None
228 default_user, default_email = _get_default_identity()
229 if user is None:
230 user = default_user.encode("utf-8")
231 if email is None:
232 email = default_email.encode("utf-8")
233 if email.startswith(b"<") and email.endswith(b">"):
234 email = email[1:-1]
235 return user + b" <" + email + b">"
238def check_user_identity(identity) -> None:
239 """Verify that a user identity is formatted correctly.
241 Args:
242 identity: User identity bytestring
243 Raises:
244 InvalidUserIdentity: Raised when identity is invalid
245 """
246 try:
247 fst, snd = identity.split(b" <", 1)
248 except ValueError as exc:
249 raise InvalidUserIdentity(identity) from exc
250 if b">" not in snd:
251 raise InvalidUserIdentity(identity)
252 if b"\0" in identity or b"\n" in identity:
253 raise InvalidUserIdentity(identity)
256def parse_graftpoints(
257 graftpoints: Iterable[bytes],
258) -> dict[bytes, list[bytes]]:
259 """Convert a list of graftpoints into a dict.
261 Args:
262 graftpoints: Iterator of graftpoint lines
264 Each line is formatted as:
265 <commit sha1> <parent sha1> [<parent sha1>]*
267 Resulting dictionary is:
268 <commit sha1>: [<parent sha1>*]
270 https://git.wiki.kernel.org/index.php/GraftPoint
271 """
272 grafts = {}
273 for line in graftpoints:
274 raw_graft = line.split(None, 1)
276 commit = raw_graft[0]
277 if len(raw_graft) == 2:
278 parents = raw_graft[1].split()
279 else:
280 parents = []
282 for sha in [commit, *parents]:
283 check_hexsha(sha, "Invalid graftpoint")
285 grafts[commit] = parents
286 return grafts
289def serialize_graftpoints(graftpoints: dict[bytes, list[bytes]]) -> bytes:
290 """Convert a dictionary of grafts into string.
292 The graft dictionary is:
293 <commit sha1>: [<parent sha1>*]
295 Each line is formatted as:
296 <commit sha1> <parent sha1> [<parent sha1>]*
298 https://git.wiki.kernel.org/index.php/GraftPoint
300 """
301 graft_lines = []
302 for commit, parents in graftpoints.items():
303 if parents:
304 graft_lines.append(commit + b" " + b" ".join(parents))
305 else:
306 graft_lines.append(commit)
307 return b"\n".join(graft_lines)
310def _set_filesystem_hidden(path) -> None:
311 """Mark path as to be hidden if supported by platform and filesystem.
313 On win32 uses SetFileAttributesW api:
314 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw>
315 """
316 if sys.platform == "win32":
317 import ctypes
318 from ctypes.wintypes import BOOL, DWORD, LPCWSTR
320 FILE_ATTRIBUTE_HIDDEN = 2
321 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)(
322 ("SetFileAttributesW", ctypes.windll.kernel32)
323 )
325 if isinstance(path, bytes):
326 path = os.fsdecode(path)
327 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN):
328 pass # Could raise or log `ctypes.WinError()` here
330 # Could implement other platform specific filesystem hiding here
333class ParentsProvider:
334 def __init__(self, store, grafts={}, shallows=[]) -> None:
335 self.store = store
336 self.grafts = grafts
337 self.shallows = set(shallows)
339 # Get commit graph once at initialization for performance
340 self.commit_graph = store.get_commit_graph()
342 def get_parents(self, commit_id, commit=None):
343 try:
344 return self.grafts[commit_id]
345 except KeyError:
346 pass
347 if commit_id in self.shallows:
348 return []
350 # Try to use commit graph for faster parent lookup
351 if self.commit_graph:
352 parents = self.commit_graph.get_parents(commit_id)
353 if parents is not None:
354 return parents
356 # Fallback to reading the commit object
357 if commit is None:
358 commit = self.store[commit_id]
359 return commit.parents
362class BaseRepo:
363 """Base class for a git repository.
365 This base class is meant to be used for Repository implementations that e.g.
366 work on top of a different transport than a standard filesystem path.
368 Attributes:
369 object_store: Dictionary-like object for accessing
370 the objects
371 refs: Dictionary-like object with the refs in this
372 repository
373 """
375 def __init__(self, object_store: PackBasedObjectStore, refs: RefsContainer) -> None:
376 """Open a repository.
378 This shouldn't be called directly, but rather through one of the
379 base classes, such as MemoryRepo or Repo.
381 Args:
382 object_store: Object store to use
383 refs: Refs container to use
384 """
385 self.object_store = object_store
386 self.refs = refs
388 self._graftpoints: dict[bytes, list[bytes]] = {}
389 self.hooks: dict[str, Hook] = {}
391 def _determine_file_mode(self) -> bool:
392 """Probe the file-system to determine whether permissions can be trusted.
394 Returns: True if permissions can be trusted, False otherwise.
395 """
396 raise NotImplementedError(self._determine_file_mode)
398 def _determine_symlinks(self) -> bool:
399 """Probe the filesystem to determine whether symlinks can be created.
401 Returns: True if symlinks can be created, False otherwise.
402 """
403 # For now, just mimic the old behaviour
404 return sys.platform != "win32"
406 def _init_files(
407 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None
408 ) -> None:
409 """Initialize a default set of named files."""
410 from .config import ConfigFile
412 self._put_named_file("description", b"Unnamed repository")
413 f = BytesIO()
414 cf = ConfigFile()
415 if format is None:
416 format = 0
417 if format not in (0, 1):
418 raise ValueError(f"Unsupported repository format version: {format}")
419 cf.set("core", "repositoryformatversion", str(format))
420 if self._determine_file_mode():
421 cf.set("core", "filemode", True)
422 else:
423 cf.set("core", "filemode", False)
425 if symlinks is None and not bare:
426 symlinks = self._determine_symlinks()
428 if symlinks is False:
429 cf.set("core", "symlinks", symlinks)
431 cf.set("core", "bare", bare)
432 cf.set("core", "logallrefupdates", True)
433 cf.write_to_file(f)
434 self._put_named_file("config", f.getvalue())
435 self._put_named_file(os.path.join("info", "exclude"), b"")
437 def get_named_file(self, path: str) -> Optional[BinaryIO]:
438 """Get a file from the control dir with a specific name.
440 Although the filename should be interpreted as a filename relative to
441 the control dir in a disk-based Repo, the object returned need not be
442 pointing to a file in that location.
444 Args:
445 path: The path to the file, relative to the control dir.
446 Returns: An open file object, or None if the file does not exist.
447 """
448 raise NotImplementedError(self.get_named_file)
450 def _put_named_file(self, path: str, contents: bytes) -> None:
451 """Write a file to the control dir with the given name and contents.
453 Args:
454 path: The path to the file, relative to the control dir.
455 contents: A string to write to the file.
456 """
457 raise NotImplementedError(self._put_named_file)
459 def _del_named_file(self, path: str) -> None:
460 """Delete a file in the control directory with the given name."""
461 raise NotImplementedError(self._del_named_file)
463 def open_index(self) -> "Index":
464 """Open the index for this repository.
466 Raises:
467 NoIndexPresent: If no index is present
468 Returns: The matching `Index`
469 """
470 raise NotImplementedError(self.open_index)
472 def fetch(
473 self, target, determine_wants=None, progress=None, depth: Optional[int] = None
474 ):
475 """Fetch objects into another repository.
477 Args:
478 target: The target repository
479 determine_wants: Optional function to determine what refs to
480 fetch.
481 progress: Optional progress function
482 depth: Optional shallow fetch depth
483 Returns: The local refs
484 """
485 if determine_wants is None:
486 determine_wants = target.object_store.determine_wants_all
487 count, pack_data = self.fetch_pack_data(
488 determine_wants,
489 target.get_graph_walker(),
490 progress=progress,
491 depth=depth,
492 )
493 target.object_store.add_pack_data(count, pack_data, progress)
494 return self.get_refs()
496 def fetch_pack_data(
497 self,
498 determine_wants,
499 graph_walker,
500 progress,
501 *,
502 get_tagged=None,
503 depth: Optional[int] = None,
504 ):
505 """Fetch the pack data required for a set of revisions.
507 Args:
508 determine_wants: Function that takes a dictionary with heads
509 and returns the list of heads to fetch.
510 graph_walker: Object that can iterate over the list of revisions
511 to fetch and has an "ack" method that will be called to acknowledge
512 that a revision is present.
513 progress: Simple progress function that will be called with
514 updated progress strings.
515 get_tagged: Function that returns a dict of pointed-to sha ->
516 tag sha for including tags.
517 depth: Shallow fetch depth
518 Returns: count and iterator over pack data
519 """
520 missing_objects = self.find_missing_objects(
521 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth
522 )
523 if missing_objects is None:
524 return 0, iter([])
525 remote_has = missing_objects.get_remote_has()
526 object_ids = list(missing_objects)
527 return len(object_ids), generate_unpacked_objects(
528 self.object_store, object_ids, progress=progress, other_haves=remote_has
529 )
531 def find_missing_objects(
532 self,
533 determine_wants,
534 graph_walker,
535 progress,
536 *,
537 get_tagged=None,
538 depth: Optional[int] = None,
539 ) -> Optional[MissingObjectFinder]:
540 """Fetch the missing objects required for a set of revisions.
542 Args:
543 determine_wants: Function that takes a dictionary with heads
544 and returns the list of heads to fetch.
545 graph_walker: Object that can iterate over the list of revisions
546 to fetch and has an "ack" method that will be called to acknowledge
547 that a revision is present.
548 progress: Simple progress function that will be called with
549 updated progress strings.
550 get_tagged: Function that returns a dict of pointed-to sha ->
551 tag sha for including tags.
552 depth: Shallow fetch depth
553 Returns: iterator over objects, with __len__ implemented
554 """
555 refs = serialize_refs(self.object_store, self.get_refs())
557 wants = determine_wants(refs)
558 if not isinstance(wants, list):
559 raise TypeError("determine_wants() did not return a list")
561 current_shallow = set(getattr(graph_walker, "shallow", set()))
563 if depth not in (None, 0):
564 shallow, not_shallow = find_shallow(self.object_store, wants, depth)
565 # Only update if graph_walker has shallow attribute
566 if hasattr(graph_walker, "shallow"):
567 graph_walker.shallow.update(shallow - not_shallow)
568 new_shallow = graph_walker.shallow - current_shallow
569 unshallow = graph_walker.unshallow = not_shallow & current_shallow
570 if hasattr(graph_walker, "update_shallow"):
571 graph_walker.update_shallow(new_shallow, unshallow)
572 else:
573 unshallow = getattr(graph_walker, "unshallow", frozenset())
575 if wants == []:
576 # TODO(dborowitz): find a way to short-circuit that doesn't change
577 # this interface.
579 if getattr(graph_walker, "shallow", set()) or unshallow:
580 # Do not send a pack in shallow short-circuit path
581 return None
583 class DummyMissingObjectFinder:
584 def get_remote_has(self) -> None:
585 return None
587 def __len__(self) -> int:
588 return 0
590 def __iter__(self):
591 yield from []
593 return DummyMissingObjectFinder() # type: ignore
595 # If the graph walker is set up with an implementation that can
596 # ACK/NAK to the wire, it will write data to the client through
597 # this call as a side-effect.
598 haves = self.object_store.find_common_revisions(graph_walker)
600 # Deal with shallow requests separately because the haves do
601 # not reflect what objects are missing
602 if getattr(graph_walker, "shallow", set()) or unshallow:
603 # TODO: filter the haves commits from iter_shas. the specific
604 # commits aren't missing.
605 haves = []
607 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow)
609 def get_parents(commit):
610 return parents_provider.get_parents(commit.id, commit)
612 return MissingObjectFinder(
613 self.object_store,
614 haves=haves,
615 wants=wants,
616 shallow=getattr(graph_walker, "shallow", set()),
617 progress=progress,
618 get_tagged=get_tagged,
619 get_parents=get_parents,
620 )
622 def generate_pack_data(
623 self,
624 have: list[ObjectID],
625 want: list[ObjectID],
626 progress: Optional[Callable[[str], None]] = None,
627 ofs_delta: Optional[bool] = None,
628 ):
629 """Generate pack data objects for a set of wants/haves.
631 Args:
632 have: List of SHA1s of objects that should not be sent
633 want: List of SHA1s of objects that should be sent
634 ofs_delta: Whether OFS deltas can be included
635 progress: Optional progress reporting method
636 """
637 return self.object_store.generate_pack_data(
638 have,
639 want,
640 shallow=self.get_shallow(),
641 progress=progress,
642 ofs_delta=ofs_delta,
643 )
645 def get_graph_walker(
646 self, heads: Optional[list[ObjectID]] = None
647 ) -> ObjectStoreGraphWalker:
648 """Retrieve a graph walker.
650 A graph walker is used by a remote repository (or proxy)
651 to find out which objects are present in this repository.
653 Args:
654 heads: Repository heads to use (optional)
655 Returns: A graph walker object
656 """
657 if heads is None:
658 heads = [
659 sha
660 for sha in self.refs.as_dict(b"refs/heads").values()
661 if sha in self.object_store
662 ]
663 parents_provider = ParentsProvider(self.object_store)
664 return ObjectStoreGraphWalker(
665 heads,
666 parents_provider.get_parents,
667 shallow=self.get_shallow(),
668 update_shallow=self.update_shallow,
669 )
671 def get_refs(self) -> dict[bytes, bytes]:
672 """Get dictionary with all refs.
674 Returns: A ``dict`` mapping ref names to SHA1s
675 """
676 return self.refs.as_dict()
678 def head(self) -> bytes:
679 """Return the SHA1 pointed at by HEAD."""
680 # TODO: move this method to WorkTree
681 return self.refs[b"HEAD"]
683 def _get_object(self, sha, cls):
684 assert len(sha) in (20, 40)
685 ret = self.get_object(sha)
686 if not isinstance(ret, cls):
687 if cls is Commit:
688 raise NotCommitError(ret)
689 elif cls is Blob:
690 raise NotBlobError(ret)
691 elif cls is Tree:
692 raise NotTreeError(ret)
693 elif cls is Tag:
694 raise NotTagError(ret)
695 else:
696 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}")
697 return ret
699 def get_object(self, sha: bytes) -> ShaFile:
700 """Retrieve the object with the specified SHA.
702 Args:
703 sha: SHA to retrieve
704 Returns: A ShaFile object
705 Raises:
706 KeyError: when the object can not be found
707 """
708 return self.object_store[sha]
710 def parents_provider(self) -> ParentsProvider:
711 return ParentsProvider(
712 self.object_store,
713 grafts=self._graftpoints,
714 shallows=self.get_shallow(),
715 )
717 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]:
718 """Retrieve the parents of a specific commit.
720 If the specific commit is a graftpoint, the graft parents
721 will be returned instead.
723 Args:
724 sha: SHA of the commit for which to retrieve the parents
725 commit: Optional commit matching the sha
726 Returns: List of parents
727 """
728 return self.parents_provider().get_parents(sha, commit)
730 def get_config(self) -> "ConfigFile":
731 """Retrieve the config object.
733 Returns: `ConfigFile` object for the ``.git/config`` file.
734 """
735 raise NotImplementedError(self.get_config)
737 def get_worktree_config(self) -> "ConfigFile":
738 """Retrieve the worktree config object."""
739 raise NotImplementedError(self.get_worktree_config)
741 def get_description(self) -> Optional[str]:
742 """Retrieve the description for this repository.
744 Returns: String with the description of the repository
745 as set by the user.
746 """
747 raise NotImplementedError(self.get_description)
749 def set_description(self, description) -> None:
750 """Set the description for this repository.
752 Args:
753 description: Text to set as description for this repository.
754 """
755 raise NotImplementedError(self.set_description)
757 def get_rebase_state_manager(self):
758 """Get the appropriate rebase state manager for this repository.
760 Returns: RebaseStateManager instance
761 """
762 raise NotImplementedError(self.get_rebase_state_manager)
764 def get_blob_normalizer(self):
765 """Return a BlobNormalizer object for checkin/checkout operations.
767 Returns: BlobNormalizer instance
768 """
769 raise NotImplementedError(self.get_blob_normalizer)
771 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
772 """Read gitattributes for the repository.
774 Args:
775 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
777 Returns:
778 GitAttributes object that can be used to match paths
779 """
780 raise NotImplementedError(self.get_gitattributes)
782 def get_config_stack(self) -> "StackedConfig":
783 """Return a config stack for this repository.
785 This stack accesses the configuration for both this repository
786 itself (.git/config) and the global configuration, which usually
787 lives in ~/.gitconfig.
789 Returns: `Config` instance for this repository
790 """
791 from .config import ConfigFile, StackedConfig
793 local_config = self.get_config()
794 backends: list[ConfigFile] = [local_config]
795 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
796 backends.append(self.get_worktree_config())
798 backends += StackedConfig.default_backends()
799 return StackedConfig(backends, writable=local_config)
801 def get_shallow(self) -> set[ObjectID]:
802 """Get the set of shallow commits.
804 Returns: Set of shallow commits.
805 """
806 f = self.get_named_file("shallow")
807 if f is None:
808 return set()
809 with f:
810 return {line.strip() for line in f}
812 def update_shallow(self, new_shallow, new_unshallow) -> None:
813 """Update the list of shallow objects.
815 Args:
816 new_shallow: Newly shallow objects
817 new_unshallow: Newly no longer shallow objects
818 """
819 shallow = self.get_shallow()
820 if new_shallow:
821 shallow.update(new_shallow)
822 if new_unshallow:
823 shallow.difference_update(new_unshallow)
824 if shallow:
825 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow]))
826 else:
827 self._del_named_file("shallow")
829 def get_peeled(self, ref: Ref) -> ObjectID:
830 """Get the peeled value of a ref.
832 Args:
833 ref: The refname to peel.
834 Returns: The fully-peeled SHA1 of a tag object, after peeling all
835 intermediate tags; if the original ref does not point to a tag,
836 this will equal the original SHA1.
837 """
838 cached = self.refs.get_peeled(ref)
839 if cached is not None:
840 return cached
841 return peel_sha(self.object_store, self.refs[ref])[1].id
843 @property
844 def notes(self) -> "Notes":
845 """Access notes functionality for this repository.
847 Returns:
848 Notes object for accessing notes
849 """
850 from .notes import Notes
852 return Notes(self.object_store, self.refs)
854 def get_walker(self, include: Optional[list[bytes]] = None, **kwargs):
855 """Obtain a walker for this repository.
857 Args:
858 include: Iterable of SHAs of commits to include along with their
859 ancestors. Defaults to [HEAD]
861 Keyword Args:
862 exclude: Iterable of SHAs of commits to exclude along with their
863 ancestors, overriding includes.
864 order: ORDER_* constant specifying the order of results.
865 Anything other than ORDER_DATE may result in O(n) memory usage.
866 reverse: If True, reverse the order of output, requiring O(n)
867 memory.
868 max_entries: The maximum number of entries to yield, or None for
869 no limit.
870 paths: Iterable of file or subtree paths to show entries for.
871 rename_detector: diff.RenameDetector object for detecting
872 renames.
873 follow: If True, follow path across renames/copies. Forces a
874 default rename_detector.
875 since: Timestamp to list commits after.
876 until: Timestamp to list commits before.
877 queue_cls: A class to use for a queue of commits, supporting the
878 iterator protocol. The constructor takes a single argument, the
879 Walker.
881 Returns: A `Walker` object
882 """
883 from .walk import Walker
885 if include is None:
886 include = [self.head()]
888 kwargs["get_parents"] = lambda commit: self.get_parents(commit.id, commit)
890 return Walker(self.object_store, include, **kwargs)
892 def __getitem__(self, name: Union[ObjectID, Ref]):
893 """Retrieve a Git object by SHA1 or ref.
895 Args:
896 name: A Git object SHA1 or a ref name
897 Returns: A `ShaFile` object, such as a Commit or Blob
898 Raises:
899 KeyError: when the specified ref or object does not exist
900 """
901 if not isinstance(name, bytes):
902 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}")
903 if len(name) in (20, 40):
904 try:
905 return self.object_store[name]
906 except (KeyError, ValueError):
907 pass
908 try:
909 return self.object_store[self.refs[name]]
910 except RefFormatError as exc:
911 raise KeyError(name) from exc
913 def __contains__(self, name: bytes) -> bool:
914 """Check if a specific Git object or ref is present.
916 Args:
917 name: Git object SHA1 or ref name
918 """
919 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)):
920 return name in self.object_store or name in self.refs
921 else:
922 return name in self.refs
924 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None:
925 """Set a ref.
927 Args:
928 name: ref name
929 value: Ref value - either a ShaFile object, or a hex sha
930 """
931 if name.startswith(b"refs/") or name == b"HEAD":
932 if isinstance(value, ShaFile):
933 self.refs[name] = value.id
934 elif isinstance(value, bytes):
935 self.refs[name] = value
936 else:
937 raise TypeError(value)
938 else:
939 raise ValueError(name)
941 def __delitem__(self, name: bytes) -> None:
942 """Remove a ref.
944 Args:
945 name: Name of the ref to remove
946 """
947 if name.startswith(b"refs/") or name == b"HEAD":
948 del self.refs[name]
949 else:
950 raise ValueError(name)
952 def _get_user_identity(
953 self, config: "StackedConfig", kind: Optional[str] = None
954 ) -> bytes:
955 """Determine the identity to use for new commits."""
956 warnings.warn(
957 "use get_user_identity() rather than Repo._get_user_identity",
958 DeprecationWarning,
959 )
960 return get_user_identity(config)
962 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None:
963 """Add or modify graftpoints.
965 Args:
966 updated_graftpoints: Dict of commit shas to list of parent shas
967 """
968 # Simple validation
969 for commit, parents in updated_graftpoints.items():
970 for sha in [commit, *parents]:
971 check_hexsha(sha, "Invalid graftpoint")
973 self._graftpoints.update(updated_graftpoints)
975 def _remove_graftpoints(self, to_remove: list[bytes] = []) -> None:
976 """Remove graftpoints.
978 Args:
979 to_remove: List of commit shas
980 """
981 for sha in to_remove:
982 del self._graftpoints[sha]
984 def _read_heads(self, name):
985 f = self.get_named_file(name)
986 if f is None:
987 return []
988 with f:
989 return [line.strip() for line in f.readlines() if line.strip()]
991 def get_worktree(self) -> "WorkTree":
992 """Get the working tree for this repository.
994 Returns:
995 WorkTree instance for performing working tree operations
997 Raises:
998 NotImplementedError: If the repository doesn't support working trees
999 """
1000 raise NotImplementedError(
1001 "Working tree operations not supported by this repository type"
1002 )
1004 @replace_me(remove_in="0.26.0")
1005 def do_commit(
1006 self,
1007 message: Optional[bytes] = None,
1008 committer: Optional[bytes] = None,
1009 author: Optional[bytes] = None,
1010 commit_timestamp=None,
1011 commit_timezone=None,
1012 author_timestamp=None,
1013 author_timezone=None,
1014 tree: Optional[ObjectID] = None,
1015 encoding: Optional[bytes] = None,
1016 ref: Optional[Ref] = b"HEAD",
1017 merge_heads: Optional[list[ObjectID]] = None,
1018 no_verify: bool = False,
1019 sign: bool = False,
1020 ):
1021 """Create a new commit.
1023 If not specified, committer and author default to
1024 get_user_identity(..., 'COMMITTER')
1025 and get_user_identity(..., 'AUTHOR') respectively.
1027 Args:
1028 message: Commit message (bytes or callable that takes (repo, commit)
1029 and returns bytes)
1030 committer: Committer fullname
1031 author: Author fullname
1032 commit_timestamp: Commit timestamp (defaults to now)
1033 commit_timezone: Commit timestamp timezone (defaults to GMT)
1034 author_timestamp: Author timestamp (defaults to commit
1035 timestamp)
1036 author_timezone: Author timestamp timezone
1037 (defaults to commit timestamp timezone)
1038 tree: SHA1 of the tree root to use (if not specified the
1039 current index will be committed).
1040 encoding: Encoding
1041 ref: Optional ref to commit to (defaults to current branch).
1042 If None, creates a dangling commit without updating any ref.
1043 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
1044 no_verify: Skip pre-commit and commit-msg hooks
1045 sign: GPG Sign the commit (bool, defaults to False,
1046 pass True to use default GPG key,
1047 pass a str containing Key ID to use a specific GPG key)
1049 Returns:
1050 New commit SHA1
1051 """
1052 return self.get_worktree().commit(
1053 message=message,
1054 committer=committer,
1055 author=author,
1056 commit_timestamp=commit_timestamp,
1057 commit_timezone=commit_timezone,
1058 author_timestamp=author_timestamp,
1059 author_timezone=author_timezone,
1060 tree=tree,
1061 encoding=encoding,
1062 ref=ref,
1063 merge_heads=merge_heads,
1064 no_verify=no_verify,
1065 sign=sign,
1066 )
1069def read_gitfile(f):
1070 """Read a ``.git`` file.
1072 The first line of the file should start with "gitdir: "
1074 Args:
1075 f: File-like object to read from
1076 Returns: A path
1077 """
1078 cs = f.read()
1079 if not cs.startswith("gitdir: "):
1080 raise ValueError("Expected file to start with 'gitdir: '")
1081 return cs[len("gitdir: ") :].rstrip("\n")
1084class UnsupportedVersion(Exception):
1085 """Unsupported repository version."""
1087 def __init__(self, version) -> None:
1088 self.version = version
1091class UnsupportedExtension(Exception):
1092 """Unsupported repository extension."""
1094 def __init__(self, extension) -> None:
1095 self.extension = extension
1098class Repo(BaseRepo):
1099 """A git repository backed by local disk.
1101 To open an existing repository, call the constructor with
1102 the path of the repository.
1104 To create a new repository, use the Repo.init class method.
1106 Note that a repository object may hold on to resources such
1107 as file handles for performance reasons; call .close() to free
1108 up those resources.
1110 Attributes:
1111 path: Path to the working copy (if it exists) or repository control
1112 directory (if the repository is bare)
1113 bare: Whether this is a bare repository
1114 """
1116 path: str
1117 bare: bool
1119 def __init__(
1120 self,
1121 root: Union[str, bytes, os.PathLike],
1122 object_store: Optional[PackBasedObjectStore] = None,
1123 bare: Optional[bool] = None,
1124 ) -> None:
1125 """Open a repository on disk.
1127 Args:
1128 root: Path to the repository's root.
1129 object_store: ObjectStore to use; if omitted, we use the
1130 repository's default object store
1131 bare: True if this is a bare repository.
1132 """
1133 root = os.fspath(root)
1134 if isinstance(root, bytes):
1135 root = os.fsdecode(root)
1136 hidden_path = os.path.join(root, CONTROLDIR)
1137 if bare is None:
1138 if os.path.isfile(hidden_path) or os.path.isdir(
1139 os.path.join(hidden_path, OBJECTDIR)
1140 ):
1141 bare = False
1142 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(
1143 os.path.join(root, REFSDIR)
1144 ):
1145 bare = True
1146 else:
1147 raise NotGitRepository(
1148 "No git repository was found at {path}".format(**dict(path=root))
1149 )
1151 self.bare = bare
1152 if bare is False:
1153 if os.path.isfile(hidden_path):
1154 with open(hidden_path) as f:
1155 path = read_gitfile(f)
1156 self._controldir = os.path.join(root, path)
1157 else:
1158 self._controldir = hidden_path
1159 else:
1160 self._controldir = root
1161 commondir = self.get_named_file(COMMONDIR)
1162 if commondir is not None:
1163 with commondir:
1164 self._commondir = os.path.join(
1165 self.controldir(),
1166 os.fsdecode(commondir.read().rstrip(b"\r\n")),
1167 )
1168 else:
1169 self._commondir = self._controldir
1170 self.path = root
1172 # Initialize refs early so they're available for config condition matchers
1173 self.refs = DiskRefsContainer(
1174 self.commondir(), self._controldir, logger=self._write_reflog
1175 )
1177 # Initialize worktrees container
1178 from .worktree import WorkTreeContainer
1180 self.worktrees = WorkTreeContainer(self)
1182 config = self.get_config()
1183 try:
1184 repository_format_version = config.get("core", "repositoryformatversion")
1185 format_version = (
1186 0
1187 if repository_format_version is None
1188 else int(repository_format_version)
1189 )
1190 except KeyError:
1191 format_version = 0
1193 if format_version not in (0, 1):
1194 raise UnsupportedVersion(format_version)
1196 # Track extensions we encounter
1197 has_reftable_extension = False
1198 for extension, value in config.items((b"extensions",)):
1199 if extension.lower() == b"refstorage":
1200 if value == b"reftable":
1201 has_reftable_extension = True
1202 else:
1203 raise UnsupportedExtension(f"refStorage = {value.decode()}")
1204 elif extension.lower() not in (b"worktreeconfig",):
1205 raise UnsupportedExtension(extension)
1207 if object_store is None:
1208 object_store = DiskObjectStore.from_config(
1209 os.path.join(self.commondir(), OBJECTDIR), config
1210 )
1212 # Use reftable if extension is configured
1213 if has_reftable_extension:
1214 from .reftable import ReftableRefsContainer
1216 self.refs = ReftableRefsContainer(self.commondir())
1217 # Update worktrees container after refs change
1218 self.worktrees = WorkTreeContainer(self)
1219 BaseRepo.__init__(self, object_store, self.refs)
1221 self._graftpoints = {}
1222 graft_file = self.get_named_file(
1223 os.path.join("info", "grafts"), basedir=self.commondir()
1224 )
1225 if graft_file:
1226 with graft_file:
1227 self._graftpoints.update(parse_graftpoints(graft_file))
1228 graft_file = self.get_named_file("shallow", basedir=self.commondir())
1229 if graft_file:
1230 with graft_file:
1231 self._graftpoints.update(parse_graftpoints(graft_file))
1233 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir())
1234 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir())
1235 self.hooks["post-commit"] = PostCommitShellHook(self.controldir())
1236 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir())
1238 def get_worktree(self) -> "WorkTree":
1239 """Get the working tree for this repository.
1241 Returns:
1242 WorkTree instance for performing working tree operations
1243 """
1244 from .worktree import WorkTree
1246 return WorkTree(self, self.path)
1248 def _write_reflog(
1249 self, ref, old_sha, new_sha, committer, timestamp, timezone, message
1250 ) -> None:
1251 from .reflog import format_reflog_line
1253 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref))
1254 try:
1255 os.makedirs(os.path.dirname(path))
1256 except FileExistsError:
1257 pass
1258 if committer is None:
1259 config = self.get_config_stack()
1260 committer = get_user_identity(config)
1261 check_user_identity(committer)
1262 if timestamp is None:
1263 timestamp = int(time.time())
1264 if timezone is None:
1265 timezone = 0 # FIXME
1266 with open(path, "ab") as f:
1267 f.write(
1268 format_reflog_line(
1269 old_sha, new_sha, committer, timestamp, timezone, message
1270 )
1271 + b"\n"
1272 )
1274 def read_reflog(self, ref):
1275 """Read reflog entries for a reference.
1277 Args:
1278 ref: Reference name (e.g. b'HEAD', b'refs/heads/master')
1280 Yields:
1281 reflog.Entry objects in chronological order (oldest first)
1282 """
1283 from .reflog import read_reflog
1285 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref))
1286 try:
1287 with open(path, "rb") as f:
1288 yield from read_reflog(f)
1289 except FileNotFoundError:
1290 return
1292 @classmethod
1293 def discover(cls, start="."):
1294 """Iterate parent directories to discover a repository.
1296 Return a Repo object for the first parent directory that looks like a
1297 Git repository.
1299 Args:
1300 start: The directory to start discovery from (defaults to '.')
1301 """
1302 remaining = True
1303 path = os.path.abspath(start)
1304 while remaining:
1305 try:
1306 return cls(path)
1307 except NotGitRepository:
1308 path, remaining = os.path.split(path)
1309 raise NotGitRepository(
1310 "No git repository was found at {path}".format(**dict(path=start))
1311 )
1313 def controldir(self):
1314 """Return the path of the control directory."""
1315 return self._controldir
1317 def commondir(self):
1318 """Return the path of the common directory.
1320 For a main working tree, it is identical to controldir().
1322 For a linked working tree, it is the control directory of the
1323 main working tree.
1324 """
1325 return self._commondir
1327 def _determine_file_mode(self):
1328 """Probe the file-system to determine whether permissions can be trusted.
1330 Returns: True if permissions can be trusted, False otherwise.
1331 """
1332 fname = os.path.join(self.path, ".probe-permissions")
1333 with open(fname, "w") as f:
1334 f.write("")
1336 st1 = os.lstat(fname)
1337 try:
1338 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR)
1339 except PermissionError:
1340 return False
1341 st2 = os.lstat(fname)
1343 os.unlink(fname)
1345 mode_differs = st1.st_mode != st2.st_mode
1346 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0
1348 return mode_differs and st2_has_exec
1350 def _determine_symlinks(self):
1351 """Probe the filesystem to determine whether symlinks can be created.
1353 Returns: True if symlinks can be created, False otherwise.
1354 """
1355 # TODO(jelmer): Actually probe disk / look at filesystem
1356 return sys.platform != "win32"
1358 def _put_named_file(self, path, contents) -> None:
1359 """Write a file to the control dir with the given name and contents.
1361 Args:
1362 path: The path to the file, relative to the control dir.
1363 contents: A string to write to the file.
1364 """
1365 path = path.lstrip(os.path.sep)
1366 with GitFile(os.path.join(self.controldir(), path), "wb") as f:
1367 f.write(contents)
1369 def _del_named_file(self, path) -> None:
1370 try:
1371 os.unlink(os.path.join(self.controldir(), path))
1372 except FileNotFoundError:
1373 return
1375 def get_named_file(self, path, basedir=None):
1376 """Get a file from the control dir with a specific name.
1378 Although the filename should be interpreted as a filename relative to
1379 the control dir in a disk-based Repo, the object returned need not be
1380 pointing to a file in that location.
1382 Args:
1383 path: The path to the file, relative to the control dir.
1384 basedir: Optional argument that specifies an alternative to the
1385 control dir.
1386 Returns: An open file object, or None if the file does not exist.
1387 """
1388 # TODO(dborowitz): sanitize filenames, since this is used directly by
1389 # the dumb web serving code.
1390 if basedir is None:
1391 basedir = self.controldir()
1392 path = path.lstrip(os.path.sep)
1393 try:
1394 return open(os.path.join(basedir, path), "rb")
1395 except FileNotFoundError:
1396 return None
1398 def index_path(self):
1399 """Return path to the index file."""
1400 return os.path.join(self.controldir(), INDEX_FILENAME)
1402 def open_index(self) -> "Index":
1403 """Open the index for this repository.
1405 Raises:
1406 NoIndexPresent: If no index is present
1407 Returns: The matching `Index`
1408 """
1409 from .index import Index
1411 if not self.has_index():
1412 raise NoIndexPresent
1414 # Check for manyFiles feature configuration
1415 config = self.get_config_stack()
1416 many_files = config.get_boolean(b"feature", b"manyFiles", False)
1417 skip_hash = False
1418 index_version = None
1420 if many_files:
1421 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true
1422 try:
1423 index_version_str = config.get(b"index", b"version")
1424 index_version = int(index_version_str)
1425 except KeyError:
1426 index_version = 4 # Default to version 4 for manyFiles
1427 skip_hash = config.get_boolean(b"index", b"skipHash", True)
1428 else:
1429 # Check for explicit index settings
1430 try:
1431 index_version_str = config.get(b"index", b"version")
1432 index_version = int(index_version_str)
1433 except KeyError:
1434 index_version = None
1435 skip_hash = config.get_boolean(b"index", b"skipHash", False)
1437 return Index(self.index_path(), skip_hash=skip_hash, version=index_version)
1439 def has_index(self) -> bool:
1440 """Check if an index is present."""
1441 # Bare repos must never have index files; non-bare repos may have a
1442 # missing index file, which is treated as empty.
1443 return not self.bare
1445 @replace_me(remove_in="0.26.0")
1446 def stage(
1447 self,
1448 fs_paths: Union[
1449 str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]]
1450 ],
1451 ) -> None:
1452 """Stage a set of paths.
1454 Args:
1455 fs_paths: List of paths, relative to the repository path
1456 """
1457 return self.get_worktree().stage(fs_paths)
1459 @replace_me(remove_in="0.26.0")
1460 def unstage(self, fs_paths: list[str]) -> None:
1461 """Unstage specific file in the index
1462 Args:
1463 fs_paths: a list of files to unstage,
1464 relative to the repository path.
1465 """
1466 return self.get_worktree().unstage(fs_paths)
1468 def clone(
1469 self,
1470 target_path,
1471 *,
1472 mkdir=True,
1473 bare=False,
1474 origin=b"origin",
1475 checkout=None,
1476 branch=None,
1477 progress=None,
1478 depth: Optional[int] = None,
1479 symlinks=None,
1480 ) -> "Repo":
1481 """Clone this repository.
1483 Args:
1484 target_path: Target path
1485 mkdir: Create the target directory
1486 bare: Whether to create a bare repository
1487 checkout: Whether or not to check-out HEAD after cloning
1488 origin: Base name for refs in target repository
1489 cloned from this repository
1490 branch: Optional branch or tag to be used as HEAD in the new repository
1491 instead of this repository's HEAD.
1492 progress: Optional progress function
1493 depth: Depth at which to fetch
1494 symlinks: Symlinks setting (default to autodetect)
1495 Returns: Created repository as `Repo`
1496 """
1497 encoded_path = os.fsencode(self.path)
1499 if mkdir:
1500 os.mkdir(target_path)
1502 try:
1503 if not bare:
1504 target = Repo.init(target_path, symlinks=symlinks)
1505 if checkout is None:
1506 checkout = True
1507 else:
1508 if checkout:
1509 raise ValueError("checkout and bare are incompatible")
1510 target = Repo.init_bare(target_path)
1512 try:
1513 target_config = target.get_config()
1514 target_config.set((b"remote", origin), b"url", encoded_path)
1515 target_config.set(
1516 (b"remote", origin),
1517 b"fetch",
1518 b"+refs/heads/*:refs/remotes/" + origin + b"/*",
1519 )
1520 target_config.write_to_path()
1522 ref_message = b"clone: from " + encoded_path
1523 self.fetch(target, depth=depth)
1524 target.refs.import_refs(
1525 b"refs/remotes/" + origin,
1526 self.refs.as_dict(b"refs/heads"),
1527 message=ref_message,
1528 )
1529 target.refs.import_refs(
1530 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message
1531 )
1533 head_chain, origin_sha = self.refs.follow(b"HEAD")
1534 origin_head = head_chain[-1] if head_chain else None
1535 if origin_sha and not origin_head:
1536 # set detached HEAD
1537 target.refs[b"HEAD"] = origin_sha
1538 else:
1539 _set_origin_head(target.refs, origin, origin_head)
1540 head_ref = _set_default_branch(
1541 target.refs, origin, origin_head, branch, ref_message
1542 )
1544 # Update target head
1545 if head_ref:
1546 head = _set_head(target.refs, head_ref, ref_message)
1547 else:
1548 head = None
1550 if checkout and head is not None:
1551 target.get_worktree().reset_index()
1552 except BaseException:
1553 target.close()
1554 raise
1555 except BaseException:
1556 if mkdir:
1557 import shutil
1559 shutil.rmtree(target_path)
1560 raise
1561 return target
1563 @replace_me(remove_in="0.26.0")
1564 def reset_index(self, tree: Optional[bytes] = None):
1565 """Reset the index back to a specific tree.
1567 Args:
1568 tree: Tree SHA to reset to, None for current HEAD tree.
1569 """
1570 return self.get_worktree().reset_index(tree)
1572 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]:
1573 """Get condition matchers for includeIf conditions.
1575 Returns a dict of condition prefix to matcher function.
1576 """
1577 from pathlib import Path
1579 from .config import ConditionMatcher, match_glob_pattern
1581 # Add gitdir matchers
1582 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool:
1583 # Handle relative patterns (starting with ./)
1584 if pattern.startswith("./"):
1585 # Can't handle relative patterns without config directory context
1586 return False
1588 # Normalize repository path
1589 try:
1590 repo_path = str(Path(self._controldir).resolve())
1591 except (OSError, ValueError):
1592 return False
1594 # Expand ~ in pattern and normalize
1595 pattern = os.path.expanduser(pattern)
1597 # Normalize pattern following Git's rules
1598 pattern = pattern.replace("\\", "/")
1599 if not pattern.startswith(("~/", "./", "/", "**")):
1600 # Check for Windows absolute path
1601 if len(pattern) >= 2 and pattern[1] == ":":
1602 pass
1603 else:
1604 pattern = "**/" + pattern
1605 if pattern.endswith("/"):
1606 pattern = pattern + "**"
1608 # Use the existing _match_gitdir_pattern function
1609 from .config import _match_gitdir_pattern
1611 pattern_bytes = pattern.encode("utf-8", errors="replace")
1612 repo_path_bytes = repo_path.encode("utf-8", errors="replace")
1614 return _match_gitdir_pattern(
1615 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive
1616 )
1618 # Add onbranch matcher
1619 def match_onbranch(pattern: str) -> bool:
1620 try:
1621 # Get the current branch using refs
1622 ref_chain, _ = self.refs.follow(b"HEAD")
1623 head_ref = ref_chain[-1] # Get the final resolved ref
1624 except KeyError:
1625 pass
1626 else:
1627 if head_ref and head_ref.startswith(b"refs/heads/"):
1628 # Extract branch name from ref
1629 branch = head_ref[11:].decode("utf-8", errors="replace")
1630 return match_glob_pattern(branch, pattern)
1631 return False
1633 matchers: dict[str, ConditionMatcher] = {
1634 "onbranch:": match_onbranch,
1635 "gitdir:": lambda pattern: match_gitdir(pattern, True),
1636 "gitdir/i:": lambda pattern: match_gitdir(pattern, False),
1637 }
1639 return matchers
1641 def get_worktree_config(self) -> "ConfigFile":
1642 from .config import ConfigFile
1644 path = os.path.join(self.commondir(), "config.worktree")
1645 try:
1646 # Pass condition matchers for includeIf evaluation
1647 condition_matchers = self._get_config_condition_matchers()
1648 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1649 except FileNotFoundError:
1650 cf = ConfigFile()
1651 cf.path = path
1652 return cf
1654 def get_config(self) -> "ConfigFile":
1655 """Retrieve the config object.
1657 Returns: `ConfigFile` object for the ``.git/config`` file.
1658 """
1659 from .config import ConfigFile
1661 path = os.path.join(self._commondir, "config")
1662 try:
1663 # Pass condition matchers for includeIf evaluation
1664 condition_matchers = self._get_config_condition_matchers()
1665 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1666 except FileNotFoundError:
1667 ret = ConfigFile()
1668 ret.path = path
1669 return ret
1671 def get_rebase_state_manager(self):
1672 """Get the appropriate rebase state manager for this repository.
1674 Returns: DiskRebaseStateManager instance
1675 """
1676 import os
1678 from .rebase import DiskRebaseStateManager
1680 path = os.path.join(self.controldir(), "rebase-merge")
1681 return DiskRebaseStateManager(path)
1683 def get_description(self):
1684 """Retrieve the description of this repository.
1686 Returns: A string describing the repository or None.
1687 """
1688 path = os.path.join(self._controldir, "description")
1689 try:
1690 with GitFile(path, "rb") as f:
1691 return f.read()
1692 except FileNotFoundError:
1693 return None
1695 def __repr__(self) -> str:
1696 return f"<Repo at {self.path!r}>"
1698 def set_description(self, description) -> None:
1699 """Set the description for this repository.
1701 Args:
1702 description: Text to set as description for this repository.
1703 """
1704 self._put_named_file("description", description)
1706 @classmethod
1707 def _init_maybe_bare(
1708 cls,
1709 path: Union[str, bytes, os.PathLike],
1710 controldir: Union[str, bytes, os.PathLike],
1711 bare,
1712 object_store=None,
1713 config=None,
1714 default_branch=None,
1715 symlinks: Optional[bool] = None,
1716 format: Optional[int] = None,
1717 ):
1718 path = os.fspath(path)
1719 if isinstance(path, bytes):
1720 path = os.fsdecode(path)
1721 controldir = os.fspath(controldir)
1722 if isinstance(controldir, bytes):
1723 controldir = os.fsdecode(controldir)
1724 for d in BASE_DIRECTORIES:
1725 os.mkdir(os.path.join(controldir, *d))
1726 if object_store is None:
1727 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR))
1728 ret = cls(path, bare=bare, object_store=object_store)
1729 if default_branch is None:
1730 if config is None:
1731 from .config import StackedConfig
1733 config = StackedConfig.default()
1734 try:
1735 default_branch = config.get("init", "defaultBranch")
1736 except KeyError:
1737 default_branch = DEFAULT_BRANCH
1738 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch)
1739 ret._init_files(bare=bare, symlinks=symlinks, format=format)
1740 return ret
1742 @classmethod
1743 def init(
1744 cls,
1745 path: Union[str, bytes, os.PathLike],
1746 *,
1747 mkdir: bool = False,
1748 config=None,
1749 default_branch=None,
1750 symlinks: Optional[bool] = None,
1751 format: Optional[int] = None,
1752 ) -> "Repo":
1753 """Create a new repository.
1755 Args:
1756 path: Path in which to create the repository
1757 mkdir: Whether to create the directory
1758 format: Repository format version (defaults to 0)
1759 Returns: `Repo` instance
1760 """
1761 path = os.fspath(path)
1762 if isinstance(path, bytes):
1763 path = os.fsdecode(path)
1764 if mkdir:
1765 os.mkdir(path)
1766 controldir = os.path.join(path, CONTROLDIR)
1767 os.mkdir(controldir)
1768 _set_filesystem_hidden(controldir)
1769 return cls._init_maybe_bare(
1770 path,
1771 controldir,
1772 False,
1773 config=config,
1774 default_branch=default_branch,
1775 symlinks=symlinks,
1776 format=format,
1777 )
1779 @classmethod
1780 def _init_new_working_directory(
1781 cls,
1782 path: Union[str, bytes, os.PathLike],
1783 main_repo,
1784 identifier=None,
1785 mkdir=False,
1786 ):
1787 """Create a new working directory linked to a repository.
1789 Args:
1790 path: Path in which to create the working tree.
1791 main_repo: Main repository to reference
1792 identifier: Worktree identifier
1793 mkdir: Whether to create the directory
1794 Returns: `Repo` instance
1795 """
1796 path = os.fspath(path)
1797 if isinstance(path, bytes):
1798 path = os.fsdecode(path)
1799 if mkdir:
1800 os.mkdir(path)
1801 if identifier is None:
1802 identifier = os.path.basename(path)
1803 # Ensure we use absolute path for the worktree control directory
1804 main_controldir = os.path.abspath(main_repo.controldir())
1805 main_worktreesdir = os.path.join(main_controldir, WORKTREES)
1806 worktree_controldir = os.path.join(main_worktreesdir, identifier)
1807 gitdirfile = os.path.join(path, CONTROLDIR)
1808 with open(gitdirfile, "wb") as f:
1809 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n")
1810 try:
1811 os.mkdir(main_worktreesdir)
1812 except FileExistsError:
1813 pass
1814 try:
1815 os.mkdir(worktree_controldir)
1816 except FileExistsError:
1817 pass
1818 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f:
1819 f.write(os.fsencode(gitdirfile) + b"\n")
1820 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f:
1821 f.write(b"../..\n")
1822 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f:
1823 f.write(main_repo.head() + b"\n")
1824 r = cls(os.path.normpath(path))
1825 r.get_worktree().reset_index()
1826 return r
1828 @classmethod
1829 def init_bare(
1830 cls,
1831 path: Union[str, bytes, os.PathLike],
1832 *,
1833 mkdir=False,
1834 object_store=None,
1835 config=None,
1836 default_branch=None,
1837 format: Optional[int] = None,
1838 ):
1839 """Create a new bare repository.
1841 ``path`` should already exist and be an empty directory.
1843 Args:
1844 path: Path to create bare repository in
1845 format: Repository format version (defaults to 0)
1846 Returns: a `Repo` instance
1847 """
1848 path = os.fspath(path)
1849 if isinstance(path, bytes):
1850 path = os.fsdecode(path)
1851 if mkdir:
1852 os.mkdir(path)
1853 return cls._init_maybe_bare(
1854 path,
1855 path,
1856 True,
1857 object_store=object_store,
1858 config=config,
1859 default_branch=default_branch,
1860 format=format,
1861 )
1863 create = init_bare
1865 def close(self) -> None:
1866 """Close any files opened by this repository."""
1867 self.object_store.close()
1869 def __enter__(self):
1870 return self
1872 def __exit__(self, exc_type, exc_val, exc_tb):
1873 self.close()
1875 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]:
1876 """Read .gitattributes file from working tree.
1878 Returns:
1879 Dictionary mapping file patterns to attributes
1880 """
1881 gitattributes = {}
1882 gitattributes_path = os.path.join(self.path, ".gitattributes")
1884 if os.path.exists(gitattributes_path):
1885 with open(gitattributes_path, "rb") as f:
1886 for line in f:
1887 line = line.strip()
1888 if not line or line.startswith(b"#"):
1889 continue
1891 parts = line.split()
1892 if len(parts) < 2:
1893 continue
1895 pattern = parts[0]
1896 attrs = {}
1898 for attr in parts[1:]:
1899 if attr.startswith(b"-"):
1900 # Unset attribute
1901 attrs[attr[1:]] = b"false"
1902 elif b"=" in attr:
1903 # Set to value
1904 key, value = attr.split(b"=", 1)
1905 attrs[key] = value
1906 else:
1907 # Set attribute
1908 attrs[attr] = b"true"
1910 gitattributes[pattern] = attrs
1912 return gitattributes
1914 def get_blob_normalizer(self):
1915 """Return a BlobNormalizer object."""
1916 from .filters import FilterBlobNormalizer, FilterRegistry
1918 # Get proper GitAttributes object
1919 git_attributes = self.get_gitattributes()
1920 config_stack = self.get_config_stack()
1922 # Create FilterRegistry with repo reference
1923 filter_registry = FilterRegistry(config_stack, self)
1925 # Return FilterBlobNormalizer which handles all filters including line endings
1926 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self)
1928 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
1929 """Read gitattributes for the repository.
1931 Args:
1932 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
1934 Returns:
1935 GitAttributes object that can be used to match paths
1936 """
1937 from .attrs import (
1938 GitAttributes,
1939 Pattern,
1940 parse_git_attributes,
1941 )
1943 patterns = []
1945 # Read system gitattributes (TODO: implement this)
1946 # Read global gitattributes (TODO: implement this)
1948 # Read repository .gitattributes from index/tree
1949 if tree is None:
1950 try:
1951 # Try to get from HEAD
1952 head = self[b"HEAD"]
1953 if isinstance(head, Tag):
1954 _cls, obj = head.object
1955 head = self.get_object(obj)
1956 tree = head.tree
1957 except KeyError:
1958 # No HEAD, no attributes from tree
1959 pass
1961 if tree is not None:
1962 try:
1963 tree_obj = self[tree]
1964 if b".gitattributes" in tree_obj:
1965 _, attrs_sha = tree_obj[b".gitattributes"]
1966 attrs_blob = self[attrs_sha]
1967 if isinstance(attrs_blob, Blob):
1968 attrs_data = BytesIO(attrs_blob.data)
1969 for pattern_bytes, attrs in parse_git_attributes(attrs_data):
1970 pattern = Pattern(pattern_bytes)
1971 patterns.append((pattern, attrs))
1972 except (KeyError, NotTreeError):
1973 pass
1975 # Read .git/info/attributes
1976 info_attrs_path = os.path.join(self.controldir(), "info", "attributes")
1977 if os.path.exists(info_attrs_path):
1978 with open(info_attrs_path, "rb") as f:
1979 for pattern_bytes, attrs in parse_git_attributes(f):
1980 pattern = Pattern(pattern_bytes)
1981 patterns.append((pattern, attrs))
1983 # Read .gitattributes from working directory (if it exists)
1984 working_attrs_path = os.path.join(self.path, ".gitattributes")
1985 if os.path.exists(working_attrs_path):
1986 with open(working_attrs_path, "rb") as f:
1987 for pattern_bytes, attrs in parse_git_attributes(f):
1988 pattern = Pattern(pattern_bytes)
1989 patterns.append((pattern, attrs))
1991 return GitAttributes(patterns)
1993 @replace_me(remove_in="0.26.0")
1994 def _sparse_checkout_file_path(self) -> str:
1995 """Return the path of the sparse-checkout file in this repo's control dir."""
1996 return self.get_worktree()._sparse_checkout_file_path()
1998 @replace_me(remove_in="0.26.0")
1999 def configure_for_cone_mode(self) -> None:
2000 """Ensure the repository is configured for cone-mode sparse-checkout."""
2001 return self.get_worktree().configure_for_cone_mode()
2003 @replace_me(remove_in="0.26.0")
2004 def infer_cone_mode(self) -> bool:
2005 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
2006 return self.get_worktree().infer_cone_mode()
2008 @replace_me(remove_in="0.26.0")
2009 def get_sparse_checkout_patterns(self) -> list[str]:
2010 """Return a list of sparse-checkout patterns from info/sparse-checkout.
2012 Returns:
2013 A list of patterns. Returns an empty list if the file is missing.
2014 """
2015 return self.get_worktree().get_sparse_checkout_patterns()
2017 @replace_me(remove_in="0.26.0")
2018 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None:
2019 """Write the given sparse-checkout patterns into info/sparse-checkout.
2021 Creates the info/ directory if it does not exist.
2023 Args:
2024 patterns: A list of gitignore-style patterns to store.
2025 """
2026 return self.get_worktree().set_sparse_checkout_patterns(patterns)
2028 @replace_me(remove_in="0.26.0")
2029 def set_cone_mode_patterns(self, dirs: Union[list[str], None] = None) -> None:
2030 """Write the given cone-mode directory patterns into info/sparse-checkout.
2032 For each directory to include, add an inclusion line that "undoes" the prior
2033 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
2034 Never add the same line twice.
2035 """
2036 return self.get_worktree().set_cone_mode_patterns(dirs)
2039class MemoryRepo(BaseRepo):
2040 """Repo that stores refs, objects, and named files in memory.
2042 MemoryRepos are always bare: they have no working tree and no index, since
2043 those have a stronger dependency on the filesystem.
2044 """
2046 def __init__(self) -> None:
2047 """Create a new repository in memory."""
2048 from .config import ConfigFile
2050 self._reflog: list[Any] = []
2051 refs_container = DictRefsContainer({}, logger=self._append_reflog)
2052 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) # type: ignore
2053 self._named_files: dict[str, bytes] = {}
2054 self.bare = True
2055 self._config = ConfigFile()
2056 self._description = None
2058 def _append_reflog(self, *args) -> None:
2059 self._reflog.append(args)
2061 def set_description(self, description) -> None:
2062 self._description = description
2064 def get_description(self):
2065 return self._description
2067 def _determine_file_mode(self):
2068 """Probe the file-system to determine whether permissions can be trusted.
2070 Returns: True if permissions can be trusted, False otherwise.
2071 """
2072 return sys.platform != "win32"
2074 def _determine_symlinks(self):
2075 """Probe the file-system to determine whether permissions can be trusted.
2077 Returns: True if permissions can be trusted, False otherwise.
2078 """
2079 return sys.platform != "win32"
2081 def _put_named_file(self, path, contents) -> None:
2082 """Write a file to the control dir with the given name and contents.
2084 Args:
2085 path: The path to the file, relative to the control dir.
2086 contents: A string to write to the file.
2087 """
2088 self._named_files[path] = contents
2090 def _del_named_file(self, path) -> None:
2091 try:
2092 del self._named_files[path]
2093 except KeyError:
2094 pass
2096 def get_named_file(self, path, basedir=None):
2097 """Get a file from the control dir with a specific name.
2099 Although the filename should be interpreted as a filename relative to
2100 the control dir in a disk-baked Repo, the object returned need not be
2101 pointing to a file in that location.
2103 Args:
2104 path: The path to the file, relative to the control dir.
2105 Returns: An open file object, or None if the file does not exist.
2106 """
2107 contents = self._named_files.get(path, None)
2108 if contents is None:
2109 return None
2110 return BytesIO(contents)
2112 def open_index(self) -> "Index":
2113 """Fail to open index for this repo, since it is bare.
2115 Raises:
2116 NoIndexPresent: Raised when no index is present
2117 """
2118 raise NoIndexPresent
2120 def get_config(self):
2121 """Retrieve the config object.
2123 Returns: `ConfigFile` object.
2124 """
2125 return self._config
2127 def get_rebase_state_manager(self):
2128 """Get the appropriate rebase state manager for this repository.
2130 Returns: MemoryRebaseStateManager instance
2131 """
2132 from .rebase import MemoryRebaseStateManager
2134 return MemoryRebaseStateManager(self)
2136 def get_blob_normalizer(self):
2137 """Return a BlobNormalizer object for checkin/checkout operations."""
2138 from .filters import FilterBlobNormalizer, FilterRegistry
2140 # Get GitAttributes object
2141 git_attributes = self.get_gitattributes()
2142 config_stack = self.get_config_stack()
2144 # Create FilterRegistry with repo reference
2145 filter_registry = FilterRegistry(config_stack, self)
2147 # Return FilterBlobNormalizer which handles all filters
2148 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self)
2150 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
2151 """Read gitattributes for the repository."""
2152 from .attrs import GitAttributes
2154 # Memory repos don't have working trees or gitattributes files
2155 # Return empty GitAttributes
2156 return GitAttributes([])
2158 def do_commit(
2159 self,
2160 message: Optional[bytes] = None,
2161 committer: Optional[bytes] = None,
2162 author: Optional[bytes] = None,
2163 commit_timestamp=None,
2164 commit_timezone=None,
2165 author_timestamp=None,
2166 author_timezone=None,
2167 tree: Optional[ObjectID] = None,
2168 encoding: Optional[bytes] = None,
2169 ref: Optional[Ref] = b"HEAD",
2170 merge_heads: Optional[list[ObjectID]] = None,
2171 no_verify: bool = False,
2172 sign: bool = False,
2173 ):
2174 """Create a new commit.
2176 This is a simplified implementation for in-memory repositories that
2177 doesn't support worktree operations or hooks.
2179 Args:
2180 message: Commit message
2181 committer: Committer fullname
2182 author: Author fullname
2183 commit_timestamp: Commit timestamp (defaults to now)
2184 commit_timezone: Commit timestamp timezone (defaults to GMT)
2185 author_timestamp: Author timestamp (defaults to commit timestamp)
2186 author_timezone: Author timestamp timezone (defaults to commit timezone)
2187 tree: SHA1 of the tree root to use
2188 encoding: Encoding
2189 ref: Optional ref to commit to (defaults to current branch).
2190 If None, creates a dangling commit without updating any ref.
2191 merge_heads: Merge heads
2192 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo)
2193 sign: GPG Sign the commit (ignored for MemoryRepo)
2195 Returns:
2196 New commit SHA1
2197 """
2198 import time
2200 from .objects import Commit
2202 if tree is None:
2203 raise ValueError("tree must be specified for MemoryRepo")
2205 c = Commit()
2206 if len(tree) != 40:
2207 raise ValueError("tree must be a 40-byte hex sha string")
2208 c.tree = tree
2210 config = self.get_config_stack()
2211 if merge_heads is None:
2212 merge_heads = []
2213 if committer is None:
2214 committer = get_user_identity(config, kind="COMMITTER")
2215 check_user_identity(committer)
2216 c.committer = committer
2217 if commit_timestamp is None:
2218 commit_timestamp = time.time()
2219 c.commit_time = int(commit_timestamp)
2220 if commit_timezone is None:
2221 commit_timezone = 0
2222 c.commit_timezone = commit_timezone
2223 if author is None:
2224 author = get_user_identity(config, kind="AUTHOR")
2225 c.author = author
2226 check_user_identity(author)
2227 if author_timestamp is None:
2228 author_timestamp = commit_timestamp
2229 c.author_time = int(author_timestamp)
2230 if author_timezone is None:
2231 author_timezone = commit_timezone
2232 c.author_timezone = author_timezone
2233 if encoding is None:
2234 try:
2235 encoding = config.get(("i18n",), "commitEncoding")
2236 except KeyError:
2237 pass
2238 if encoding is not None:
2239 c.encoding = encoding
2241 # Handle message (for MemoryRepo, we don't support callable messages)
2242 if callable(message):
2243 message = message(self, c)
2244 if message is None:
2245 raise ValueError("Message callback returned None")
2247 if message is None:
2248 raise ValueError("No commit message specified")
2250 c.message = message
2252 if ref is None:
2253 # Create a dangling commit
2254 c.parents = merge_heads
2255 self.object_store.add_object(c)
2256 else:
2257 try:
2258 old_head = self.refs[ref]
2259 c.parents = [old_head, *merge_heads]
2260 self.object_store.add_object(c)
2261 ok = self.refs.set_if_equals(
2262 ref,
2263 old_head,
2264 c.id,
2265 message=b"commit: " + message,
2266 committer=committer,
2267 timestamp=commit_timestamp,
2268 timezone=commit_timezone,
2269 )
2270 except KeyError:
2271 c.parents = merge_heads
2272 self.object_store.add_object(c)
2273 ok = self.refs.add_if_new(
2274 ref,
2275 c.id,
2276 message=b"commit: " + message,
2277 committer=committer,
2278 timestamp=commit_timestamp,
2279 timezone=commit_timezone,
2280 )
2281 if not ok:
2282 from .errors import CommitError
2284 raise CommitError(f"{ref!r} changed during commit")
2286 return c.id
2288 @classmethod
2289 def init_bare(cls, objects, refs, format: Optional[int] = None):
2290 """Create a new bare repository in memory.
2292 Args:
2293 objects: Objects for the new repository,
2294 as iterable
2295 refs: Refs as dictionary, mapping names
2296 to object SHA1s
2297 format: Repository format version (defaults to 0)
2298 """
2299 ret = cls()
2300 for obj in objects:
2301 ret.object_store.add_object(obj)
2302 for refname, sha in refs.items():
2303 ret.refs.add_if_new(refname, sha)
2304 ret._init_files(bare=True, format=format)
2305 return ret