Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 40%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# repo.py -- For dealing with git repositories.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as public by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
24"""Repository access.
26This module contains the base class for git repositories
27(BaseRepo) and an implementation which uses a repository on
28local disk (Repo).
30"""
32import os
33import stat
34import sys
35import time
36import warnings
37from collections.abc import Iterable
38from io import BytesIO
39from typing import (
40 TYPE_CHECKING,
41 Any,
42 BinaryIO,
43 Callable,
44 Optional,
45 Union,
46)
48if TYPE_CHECKING:
49 # There are no circular imports here, but we try to defer imports as long
50 # as possible to reduce start-up time for anything that doesn't need
51 # these imports.
52 from .attrs import GitAttributes
53 from .config import ConditionMatcher, ConfigFile, StackedConfig
54 from .index import Index
55 from .notes import Notes
57from .errors import (
58 CommitError,
59 HookError,
60 NoIndexPresent,
61 NotBlobError,
62 NotCommitError,
63 NotGitRepository,
64 NotTagError,
65 NotTreeError,
66 RefFormatError,
67)
68from .file import GitFile
69from .hooks import (
70 CommitMsgShellHook,
71 Hook,
72 PostCommitShellHook,
73 PostReceiveShellHook,
74 PreCommitShellHook,
75)
76from .line_ending import BlobNormalizer, TreeBlobNormalizer
77from .object_store import (
78 DiskObjectStore,
79 MemoryObjectStore,
80 MissingObjectFinder,
81 ObjectStoreGraphWalker,
82 PackBasedObjectStore,
83 find_shallow,
84 peel_sha,
85)
86from .objects import (
87 Blob,
88 Commit,
89 ObjectID,
90 ShaFile,
91 Tag,
92 Tree,
93 check_hexsha,
94 valid_hexsha,
95)
96from .pack import generate_unpacked_objects
97from .refs import (
98 ANNOTATED_TAG_SUFFIX, # noqa: F401
99 LOCAL_BRANCH_PREFIX,
100 LOCAL_TAG_PREFIX, # noqa: F401
101 SYMREF, # noqa: F401
102 DictRefsContainer,
103 DiskRefsContainer,
104 InfoRefsContainer, # noqa: F401
105 Ref,
106 RefsContainer,
107 _set_default_branch,
108 _set_head,
109 _set_origin_head,
110 check_ref_format, # noqa: F401
111 read_packed_refs, # noqa: F401
112 read_packed_refs_with_peeled, # noqa: F401
113 serialize_refs,
114 write_packed_refs, # noqa: F401
115)
117CONTROLDIR = ".git"
118OBJECTDIR = "objects"
119REFSDIR = "refs"
120REFSDIR_TAGS = "tags"
121REFSDIR_HEADS = "heads"
122INDEX_FILENAME = "index"
123COMMONDIR = "commondir"
124GITDIR = "gitdir"
125WORKTREES = "worktrees"
127BASE_DIRECTORIES = [
128 ["branches"],
129 [REFSDIR],
130 [REFSDIR, REFSDIR_TAGS],
131 [REFSDIR, REFSDIR_HEADS],
132 ["hooks"],
133 ["info"],
134]
136DEFAULT_BRANCH = b"master"
139class InvalidUserIdentity(Exception):
140 """User identity is not of the format 'user <email>'."""
142 def __init__(self, identity) -> None:
143 self.identity = identity
146class DefaultIdentityNotFound(Exception):
147 """Default identity could not be determined."""
150# TODO(jelmer): Cache?
151def _get_default_identity() -> tuple[str, str]:
152 import socket
154 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
155 username = os.environ.get(name)
156 if username:
157 break
158 else:
159 username = None
161 try:
162 import pwd
163 except ImportError:
164 fullname = None
165 else:
166 try:
167 entry = pwd.getpwuid(os.getuid()) # type: ignore
168 except KeyError:
169 fullname = None
170 else:
171 if getattr(entry, "gecos", None):
172 fullname = entry.pw_gecos.split(",")[0]
173 else:
174 fullname = None
175 if username is None:
176 username = entry.pw_name
177 if not fullname:
178 if username is None:
179 raise DefaultIdentityNotFound("no username found")
180 fullname = username
181 email = os.environ.get("EMAIL")
182 if email is None:
183 if username is None:
184 raise DefaultIdentityNotFound("no username found")
185 email = f"{username}@{socket.gethostname()}"
186 return (fullname, email)
189def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes:
190 """Determine the identity to use for new commits.
192 If kind is set, this first checks
193 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL.
195 If those variables are not set, then it will fall back
196 to reading the user.name and user.email settings from
197 the specified configuration.
199 If that also fails, then it will fall back to using
200 the current users' identity as obtained from the host
201 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f).
203 Args:
204 kind: Optional kind to return identity for,
205 usually either "AUTHOR" or "COMMITTER".
207 Returns:
208 A user identity
209 """
210 user: Optional[bytes] = None
211 email: Optional[bytes] = None
212 if kind:
213 user_uc = os.environ.get("GIT_" + kind + "_NAME")
214 if user_uc is not None:
215 user = user_uc.encode("utf-8")
216 email_uc = os.environ.get("GIT_" + kind + "_EMAIL")
217 if email_uc is not None:
218 email = email_uc.encode("utf-8")
219 if user is None:
220 try:
221 user = config.get(("user",), "name")
222 except KeyError:
223 user = None
224 if email is None:
225 try:
226 email = config.get(("user",), "email")
227 except KeyError:
228 email = None
229 default_user, default_email = _get_default_identity()
230 if user is None:
231 user = default_user.encode("utf-8")
232 if email is None:
233 email = default_email.encode("utf-8")
234 if email.startswith(b"<") and email.endswith(b">"):
235 email = email[1:-1]
236 return user + b" <" + email + b">"
239def check_user_identity(identity) -> None:
240 """Verify that a user identity is formatted correctly.
242 Args:
243 identity: User identity bytestring
244 Raises:
245 InvalidUserIdentity: Raised when identity is invalid
246 """
247 try:
248 fst, snd = identity.split(b" <", 1)
249 except ValueError as exc:
250 raise InvalidUserIdentity(identity) from exc
251 if b">" not in snd:
252 raise InvalidUserIdentity(identity)
253 if b"\0" in identity or b"\n" in identity:
254 raise InvalidUserIdentity(identity)
257def parse_graftpoints(
258 graftpoints: Iterable[bytes],
259) -> dict[bytes, list[bytes]]:
260 """Convert a list of graftpoints into a dict.
262 Args:
263 graftpoints: Iterator of graftpoint lines
265 Each line is formatted as:
266 <commit sha1> <parent sha1> [<parent sha1>]*
268 Resulting dictionary is:
269 <commit sha1>: [<parent sha1>*]
271 https://git.wiki.kernel.org/index.php/GraftPoint
272 """
273 grafts = {}
274 for line in graftpoints:
275 raw_graft = line.split(None, 1)
277 commit = raw_graft[0]
278 if len(raw_graft) == 2:
279 parents = raw_graft[1].split()
280 else:
281 parents = []
283 for sha in [commit, *parents]:
284 check_hexsha(sha, "Invalid graftpoint")
286 grafts[commit] = parents
287 return grafts
290def serialize_graftpoints(graftpoints: dict[bytes, list[bytes]]) -> bytes:
291 """Convert a dictionary of grafts into string.
293 The graft dictionary is:
294 <commit sha1>: [<parent sha1>*]
296 Each line is formatted as:
297 <commit sha1> <parent sha1> [<parent sha1>]*
299 https://git.wiki.kernel.org/index.php/GraftPoint
301 """
302 graft_lines = []
303 for commit, parents in graftpoints.items():
304 if parents:
305 graft_lines.append(commit + b" " + b" ".join(parents))
306 else:
307 graft_lines.append(commit)
308 return b"\n".join(graft_lines)
311def _set_filesystem_hidden(path) -> None:
312 """Mark path as to be hidden if supported by platform and filesystem.
314 On win32 uses SetFileAttributesW api:
315 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw>
316 """
317 if sys.platform == "win32":
318 import ctypes
319 from ctypes.wintypes import BOOL, DWORD, LPCWSTR
321 FILE_ATTRIBUTE_HIDDEN = 2
322 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)(
323 ("SetFileAttributesW", ctypes.windll.kernel32)
324 )
326 if isinstance(path, bytes):
327 path = os.fsdecode(path)
328 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN):
329 pass # Could raise or log `ctypes.WinError()` here
331 # Could implement other platform specific filesystem hiding here
334class ParentsProvider:
335 def __init__(self, store, grafts={}, shallows=[]) -> None:
336 self.store = store
337 self.grafts = grafts
338 self.shallows = set(shallows)
340 # Get commit graph once at initialization for performance
341 self.commit_graph = store.get_commit_graph()
343 def get_parents(self, commit_id, commit=None):
344 try:
345 return self.grafts[commit_id]
346 except KeyError:
347 pass
348 if commit_id in self.shallows:
349 return []
351 # Try to use commit graph for faster parent lookup
352 if self.commit_graph:
353 parents = self.commit_graph.get_parents(commit_id)
354 if parents is not None:
355 return parents
357 # Fallback to reading the commit object
358 if commit is None:
359 commit = self.store[commit_id]
360 return commit.parents
363class BaseRepo:
364 """Base class for a git repository.
366 This base class is meant to be used for Repository implementations that e.g.
367 work on top of a different transport than a standard filesystem path.
369 Attributes:
370 object_store: Dictionary-like object for accessing
371 the objects
372 refs: Dictionary-like object with the refs in this
373 repository
374 """
376 def __init__(self, object_store: PackBasedObjectStore, refs: RefsContainer) -> None:
377 """Open a repository.
379 This shouldn't be called directly, but rather through one of the
380 base classes, such as MemoryRepo or Repo.
382 Args:
383 object_store: Object store to use
384 refs: Refs container to use
385 """
386 self.object_store = object_store
387 self.refs = refs
389 self._graftpoints: dict[bytes, list[bytes]] = {}
390 self.hooks: dict[str, Hook] = {}
392 def _determine_file_mode(self) -> bool:
393 """Probe the file-system to determine whether permissions can be trusted.
395 Returns: True if permissions can be trusted, False otherwise.
396 """
397 raise NotImplementedError(self._determine_file_mode)
399 def _determine_symlinks(self) -> bool:
400 """Probe the filesystem to determine whether symlinks can be created.
402 Returns: True if symlinks can be created, False otherwise.
403 """
404 # For now, just mimic the old behaviour
405 return sys.platform != "win32"
407 def _init_files(
408 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None
409 ) -> None:
410 """Initialize a default set of named files."""
411 from .config import ConfigFile
413 self._put_named_file("description", b"Unnamed repository")
414 f = BytesIO()
415 cf = ConfigFile()
416 if format is None:
417 format = 0
418 if format not in (0, 1):
419 raise ValueError(f"Unsupported repository format version: {format}")
420 cf.set("core", "repositoryformatversion", str(format))
421 if self._determine_file_mode():
422 cf.set("core", "filemode", True)
423 else:
424 cf.set("core", "filemode", False)
426 if symlinks is None and not bare:
427 symlinks = self._determine_symlinks()
429 if symlinks is False:
430 cf.set("core", "symlinks", symlinks)
432 cf.set("core", "bare", bare)
433 cf.set("core", "logallrefupdates", True)
434 cf.write_to_file(f)
435 self._put_named_file("config", f.getvalue())
436 self._put_named_file(os.path.join("info", "exclude"), b"")
438 def get_named_file(self, path: str) -> Optional[BinaryIO]:
439 """Get a file from the control dir with a specific name.
441 Although the filename should be interpreted as a filename relative to
442 the control dir in a disk-based Repo, the object returned need not be
443 pointing to a file in that location.
445 Args:
446 path: The path to the file, relative to the control dir.
447 Returns: An open file object, or None if the file does not exist.
448 """
449 raise NotImplementedError(self.get_named_file)
451 def _put_named_file(self, path: str, contents: bytes) -> None:
452 """Write a file to the control dir with the given name and contents.
454 Args:
455 path: The path to the file, relative to the control dir.
456 contents: A string to write to the file.
457 """
458 raise NotImplementedError(self._put_named_file)
460 def _del_named_file(self, path: str) -> None:
461 """Delete a file in the control directory with the given name."""
462 raise NotImplementedError(self._del_named_file)
464 def open_index(self) -> "Index":
465 """Open the index for this repository.
467 Raises:
468 NoIndexPresent: If no index is present
469 Returns: The matching `Index`
470 """
471 raise NotImplementedError(self.open_index)
473 def fetch(
474 self, target, determine_wants=None, progress=None, depth: Optional[int] = None
475 ):
476 """Fetch objects into another repository.
478 Args:
479 target: The target repository
480 determine_wants: Optional function to determine what refs to
481 fetch.
482 progress: Optional progress function
483 depth: Optional shallow fetch depth
484 Returns: The local refs
485 """
486 if determine_wants is None:
487 determine_wants = target.object_store.determine_wants_all
488 count, pack_data = self.fetch_pack_data(
489 determine_wants,
490 target.get_graph_walker(),
491 progress=progress,
492 depth=depth,
493 )
494 target.object_store.add_pack_data(count, pack_data, progress)
495 return self.get_refs()
497 def fetch_pack_data(
498 self,
499 determine_wants,
500 graph_walker,
501 progress,
502 *,
503 get_tagged=None,
504 depth: Optional[int] = None,
505 ):
506 """Fetch the pack data required for a set of revisions.
508 Args:
509 determine_wants: Function that takes a dictionary with heads
510 and returns the list of heads to fetch.
511 graph_walker: Object that can iterate over the list of revisions
512 to fetch and has an "ack" method that will be called to acknowledge
513 that a revision is present.
514 progress: Simple progress function that will be called with
515 updated progress strings.
516 get_tagged: Function that returns a dict of pointed-to sha ->
517 tag sha for including tags.
518 depth: Shallow fetch depth
519 Returns: count and iterator over pack data
520 """
521 missing_objects = self.find_missing_objects(
522 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth
523 )
524 if missing_objects is None:
525 return 0, iter([])
526 remote_has = missing_objects.get_remote_has()
527 object_ids = list(missing_objects)
528 return len(object_ids), generate_unpacked_objects(
529 self.object_store, object_ids, progress=progress, other_haves=remote_has
530 )
532 def find_missing_objects(
533 self,
534 determine_wants,
535 graph_walker,
536 progress,
537 *,
538 get_tagged=None,
539 depth: Optional[int] = None,
540 ) -> Optional[MissingObjectFinder]:
541 """Fetch the missing objects required for a set of revisions.
543 Args:
544 determine_wants: Function that takes a dictionary with heads
545 and returns the list of heads to fetch.
546 graph_walker: Object that can iterate over the list of revisions
547 to fetch and has an "ack" method that will be called to acknowledge
548 that a revision is present.
549 progress: Simple progress function that will be called with
550 updated progress strings.
551 get_tagged: Function that returns a dict of pointed-to sha ->
552 tag sha for including tags.
553 depth: Shallow fetch depth
554 Returns: iterator over objects, with __len__ implemented
555 """
556 refs = serialize_refs(self.object_store, self.get_refs())
558 wants = determine_wants(refs)
559 if not isinstance(wants, list):
560 raise TypeError("determine_wants() did not return a list")
562 current_shallow = set(getattr(graph_walker, "shallow", set()))
564 if depth not in (None, 0):
565 shallow, not_shallow = find_shallow(self.object_store, wants, depth)
566 # Only update if graph_walker has shallow attribute
567 if hasattr(graph_walker, "shallow"):
568 graph_walker.shallow.update(shallow - not_shallow)
569 new_shallow = graph_walker.shallow - current_shallow
570 unshallow = graph_walker.unshallow = not_shallow & current_shallow
571 if hasattr(graph_walker, "update_shallow"):
572 graph_walker.update_shallow(new_shallow, unshallow)
573 else:
574 unshallow = getattr(graph_walker, "unshallow", frozenset())
576 if wants == []:
577 # TODO(dborowitz): find a way to short-circuit that doesn't change
578 # this interface.
580 if getattr(graph_walker, "shallow", set()) or unshallow:
581 # Do not send a pack in shallow short-circuit path
582 return None
584 class DummyMissingObjectFinder:
585 def get_remote_has(self) -> None:
586 return None
588 def __len__(self) -> int:
589 return 0
591 def __iter__(self):
592 yield from []
594 return DummyMissingObjectFinder() # type: ignore
596 # If the graph walker is set up with an implementation that can
597 # ACK/NAK to the wire, it will write data to the client through
598 # this call as a side-effect.
599 haves = self.object_store.find_common_revisions(graph_walker)
601 # Deal with shallow requests separately because the haves do
602 # not reflect what objects are missing
603 if getattr(graph_walker, "shallow", set()) or unshallow:
604 # TODO: filter the haves commits from iter_shas. the specific
605 # commits aren't missing.
606 haves = []
608 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow)
610 def get_parents(commit):
611 return parents_provider.get_parents(commit.id, commit)
613 return MissingObjectFinder(
614 self.object_store,
615 haves=haves,
616 wants=wants,
617 shallow=getattr(graph_walker, "shallow", set()),
618 progress=progress,
619 get_tagged=get_tagged,
620 get_parents=get_parents,
621 )
623 def generate_pack_data(
624 self,
625 have: list[ObjectID],
626 want: list[ObjectID],
627 progress: Optional[Callable[[str], None]] = None,
628 ofs_delta: Optional[bool] = None,
629 ):
630 """Generate pack data objects for a set of wants/haves.
632 Args:
633 have: List of SHA1s of objects that should not be sent
634 want: List of SHA1s of objects that should be sent
635 ofs_delta: Whether OFS deltas can be included
636 progress: Optional progress reporting method
637 """
638 return self.object_store.generate_pack_data(
639 have,
640 want,
641 shallow=self.get_shallow(),
642 progress=progress,
643 ofs_delta=ofs_delta,
644 )
646 def get_graph_walker(
647 self, heads: Optional[list[ObjectID]] = None
648 ) -> ObjectStoreGraphWalker:
649 """Retrieve a graph walker.
651 A graph walker is used by a remote repository (or proxy)
652 to find out which objects are present in this repository.
654 Args:
655 heads: Repository heads to use (optional)
656 Returns: A graph walker object
657 """
658 if heads is None:
659 heads = [
660 sha
661 for sha in self.refs.as_dict(b"refs/heads").values()
662 if sha in self.object_store
663 ]
664 parents_provider = ParentsProvider(self.object_store)
665 return ObjectStoreGraphWalker(
666 heads,
667 parents_provider.get_parents,
668 shallow=self.get_shallow(),
669 update_shallow=self.update_shallow,
670 )
672 def get_refs(self) -> dict[bytes, bytes]:
673 """Get dictionary with all refs.
675 Returns: A ``dict`` mapping ref names to SHA1s
676 """
677 return self.refs.as_dict()
679 def head(self) -> bytes:
680 """Return the SHA1 pointed at by HEAD."""
681 return self.refs[b"HEAD"]
683 def _get_object(self, sha, cls):
684 assert len(sha) in (20, 40)
685 ret = self.get_object(sha)
686 if not isinstance(ret, cls):
687 if cls is Commit:
688 raise NotCommitError(ret)
689 elif cls is Blob:
690 raise NotBlobError(ret)
691 elif cls is Tree:
692 raise NotTreeError(ret)
693 elif cls is Tag:
694 raise NotTagError(ret)
695 else:
696 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}")
697 return ret
699 def get_object(self, sha: bytes) -> ShaFile:
700 """Retrieve the object with the specified SHA.
702 Args:
703 sha: SHA to retrieve
704 Returns: A ShaFile object
705 Raises:
706 KeyError: when the object can not be found
707 """
708 return self.object_store[sha]
710 def parents_provider(self) -> ParentsProvider:
711 return ParentsProvider(
712 self.object_store,
713 grafts=self._graftpoints,
714 shallows=self.get_shallow(),
715 )
717 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]:
718 """Retrieve the parents of a specific commit.
720 If the specific commit is a graftpoint, the graft parents
721 will be returned instead.
723 Args:
724 sha: SHA of the commit for which to retrieve the parents
725 commit: Optional commit matching the sha
726 Returns: List of parents
727 """
728 return self.parents_provider().get_parents(sha, commit)
730 def get_config(self) -> "ConfigFile":
731 """Retrieve the config object.
733 Returns: `ConfigFile` object for the ``.git/config`` file.
734 """
735 raise NotImplementedError(self.get_config)
737 def get_worktree_config(self) -> "ConfigFile":
738 """Retrieve the worktree config object."""
739 raise NotImplementedError(self.get_worktree_config)
741 def get_description(self) -> Optional[str]:
742 """Retrieve the description for this repository.
744 Returns: String with the description of the repository
745 as set by the user.
746 """
747 raise NotImplementedError(self.get_description)
749 def set_description(self, description) -> None:
750 """Set the description for this repository.
752 Args:
753 description: Text to set as description for this repository.
754 """
755 raise NotImplementedError(self.set_description)
757 def get_rebase_state_manager(self):
758 """Get the appropriate rebase state manager for this repository.
760 Returns: RebaseStateManager instance
761 """
762 raise NotImplementedError(self.get_rebase_state_manager)
764 def get_config_stack(self) -> "StackedConfig":
765 """Return a config stack for this repository.
767 This stack accesses the configuration for both this repository
768 itself (.git/config) and the global configuration, which usually
769 lives in ~/.gitconfig.
771 Returns: `Config` instance for this repository
772 """
773 from .config import ConfigFile, StackedConfig
775 local_config = self.get_config()
776 backends: list[ConfigFile] = [local_config]
777 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
778 backends.append(self.get_worktree_config())
780 backends += StackedConfig.default_backends()
781 return StackedConfig(backends, writable=local_config)
783 def get_shallow(self) -> set[ObjectID]:
784 """Get the set of shallow commits.
786 Returns: Set of shallow commits.
787 """
788 f = self.get_named_file("shallow")
789 if f is None:
790 return set()
791 with f:
792 return {line.strip() for line in f}
794 def update_shallow(self, new_shallow, new_unshallow) -> None:
795 """Update the list of shallow objects.
797 Args:
798 new_shallow: Newly shallow objects
799 new_unshallow: Newly no longer shallow objects
800 """
801 shallow = self.get_shallow()
802 if new_shallow:
803 shallow.update(new_shallow)
804 if new_unshallow:
805 shallow.difference_update(new_unshallow)
806 if shallow:
807 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow]))
808 else:
809 self._del_named_file("shallow")
811 def get_peeled(self, ref: Ref) -> ObjectID:
812 """Get the peeled value of a ref.
814 Args:
815 ref: The refname to peel.
816 Returns: The fully-peeled SHA1 of a tag object, after peeling all
817 intermediate tags; if the original ref does not point to a tag,
818 this will equal the original SHA1.
819 """
820 cached = self.refs.get_peeled(ref)
821 if cached is not None:
822 return cached
823 return peel_sha(self.object_store, self.refs[ref])[1].id
825 @property
826 def notes(self) -> "Notes":
827 """Access notes functionality for this repository.
829 Returns:
830 Notes object for accessing notes
831 """
832 from .notes import Notes
834 return Notes(self.object_store, self.refs)
836 def get_walker(self, include: Optional[list[bytes]] = None, **kwargs):
837 """Obtain a walker for this repository.
839 Args:
840 include: Iterable of SHAs of commits to include along with their
841 ancestors. Defaults to [HEAD]
843 Keyword Args:
844 exclude: Iterable of SHAs of commits to exclude along with their
845 ancestors, overriding includes.
846 order: ORDER_* constant specifying the order of results.
847 Anything other than ORDER_DATE may result in O(n) memory usage.
848 reverse: If True, reverse the order of output, requiring O(n)
849 memory.
850 max_entries: The maximum number of entries to yield, or None for
851 no limit.
852 paths: Iterable of file or subtree paths to show entries for.
853 rename_detector: diff.RenameDetector object for detecting
854 renames.
855 follow: If True, follow path across renames/copies. Forces a
856 default rename_detector.
857 since: Timestamp to list commits after.
858 until: Timestamp to list commits before.
859 queue_cls: A class to use for a queue of commits, supporting the
860 iterator protocol. The constructor takes a single argument, the
861 Walker.
863 Returns: A `Walker` object
864 """
865 from .walk import Walker
867 if include is None:
868 include = [self.head()]
870 kwargs["get_parents"] = lambda commit: self.get_parents(commit.id, commit)
872 return Walker(self.object_store, include, **kwargs)
874 def __getitem__(self, name: Union[ObjectID, Ref]):
875 """Retrieve a Git object by SHA1 or ref.
877 Args:
878 name: A Git object SHA1 or a ref name
879 Returns: A `ShaFile` object, such as a Commit or Blob
880 Raises:
881 KeyError: when the specified ref or object does not exist
882 """
883 if not isinstance(name, bytes):
884 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}")
885 if len(name) in (20, 40):
886 try:
887 return self.object_store[name]
888 except (KeyError, ValueError):
889 pass
890 try:
891 return self.object_store[self.refs[name]]
892 except RefFormatError as exc:
893 raise KeyError(name) from exc
895 def __contains__(self, name: bytes) -> bool:
896 """Check if a specific Git object or ref is present.
898 Args:
899 name: Git object SHA1 or ref name
900 """
901 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)):
902 return name in self.object_store or name in self.refs
903 else:
904 return name in self.refs
906 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None:
907 """Set a ref.
909 Args:
910 name: ref name
911 value: Ref value - either a ShaFile object, or a hex sha
912 """
913 if name.startswith(b"refs/") or name == b"HEAD":
914 if isinstance(value, ShaFile):
915 self.refs[name] = value.id
916 elif isinstance(value, bytes):
917 self.refs[name] = value
918 else:
919 raise TypeError(value)
920 else:
921 raise ValueError(name)
923 def __delitem__(self, name: bytes) -> None:
924 """Remove a ref.
926 Args:
927 name: Name of the ref to remove
928 """
929 if name.startswith(b"refs/") or name == b"HEAD":
930 del self.refs[name]
931 else:
932 raise ValueError(name)
934 def _get_user_identity(
935 self, config: "StackedConfig", kind: Optional[str] = None
936 ) -> bytes:
937 """Determine the identity to use for new commits."""
938 warnings.warn(
939 "use get_user_identity() rather than Repo._get_user_identity",
940 DeprecationWarning,
941 )
942 return get_user_identity(config)
944 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None:
945 """Add or modify graftpoints.
947 Args:
948 updated_graftpoints: Dict of commit shas to list of parent shas
949 """
950 # Simple validation
951 for commit, parents in updated_graftpoints.items():
952 for sha in [commit, *parents]:
953 check_hexsha(sha, "Invalid graftpoint")
955 self._graftpoints.update(updated_graftpoints)
957 def _remove_graftpoints(self, to_remove: list[bytes] = []) -> None:
958 """Remove graftpoints.
960 Args:
961 to_remove: List of commit shas
962 """
963 for sha in to_remove:
964 del self._graftpoints[sha]
966 def _read_heads(self, name):
967 f = self.get_named_file(name)
968 if f is None:
969 return []
970 with f:
971 return [line.strip() for line in f.readlines() if line.strip()]
973 def do_commit(
974 self,
975 message: Optional[bytes] = None,
976 committer: Optional[bytes] = None,
977 author: Optional[bytes] = None,
978 commit_timestamp=None,
979 commit_timezone=None,
980 author_timestamp=None,
981 author_timezone=None,
982 tree: Optional[ObjectID] = None,
983 encoding: Optional[bytes] = None,
984 ref: Optional[Ref] = b"HEAD",
985 merge_heads: Optional[list[ObjectID]] = None,
986 no_verify: bool = False,
987 sign: bool = False,
988 ):
989 """Create a new commit.
991 If not specified, committer and author default to
992 get_user_identity(..., 'COMMITTER')
993 and get_user_identity(..., 'AUTHOR') respectively.
995 Args:
996 message: Commit message
997 committer: Committer fullname
998 author: Author fullname
999 commit_timestamp: Commit timestamp (defaults to now)
1000 commit_timezone: Commit timestamp timezone (defaults to GMT)
1001 author_timestamp: Author timestamp (defaults to commit
1002 timestamp)
1003 author_timezone: Author timestamp timezone
1004 (defaults to commit timestamp timezone)
1005 tree: SHA1 of the tree root to use (if not specified the
1006 current index will be committed).
1007 encoding: Encoding
1008 ref: Optional ref to commit to (defaults to current branch).
1009 If None, creates a dangling commit without updating any ref.
1010 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
1011 no_verify: Skip pre-commit and commit-msg hooks
1012 sign: GPG Sign the commit (bool, defaults to False,
1013 pass True to use default GPG key,
1014 pass a str containing Key ID to use a specific GPG key)
1016 Returns:
1017 New commit SHA1
1018 """
1019 try:
1020 if not no_verify:
1021 self.hooks["pre-commit"].execute()
1022 except HookError as exc:
1023 raise CommitError(exc) from exc
1024 except KeyError: # no hook defined, silent fallthrough
1025 pass
1027 c = Commit()
1028 if tree is None:
1029 index = self.open_index()
1030 c.tree = index.commit(self.object_store)
1031 else:
1032 if len(tree) != 40:
1033 raise ValueError("tree must be a 40-byte hex sha string")
1034 c.tree = tree
1036 config = self.get_config_stack()
1037 if merge_heads is None:
1038 merge_heads = self._read_heads("MERGE_HEAD")
1039 if committer is None:
1040 committer = get_user_identity(config, kind="COMMITTER")
1041 check_user_identity(committer)
1042 c.committer = committer
1043 if commit_timestamp is None:
1044 # FIXME: Support GIT_COMMITTER_DATE environment variable
1045 commit_timestamp = time.time()
1046 c.commit_time = int(commit_timestamp)
1047 if commit_timezone is None:
1048 # FIXME: Use current user timezone rather than UTC
1049 commit_timezone = 0
1050 c.commit_timezone = commit_timezone
1051 if author is None:
1052 author = get_user_identity(config, kind="AUTHOR")
1053 c.author = author
1054 check_user_identity(author)
1055 if author_timestamp is None:
1056 # FIXME: Support GIT_AUTHOR_DATE environment variable
1057 author_timestamp = commit_timestamp
1058 c.author_time = int(author_timestamp)
1059 if author_timezone is None:
1060 author_timezone = commit_timezone
1061 c.author_timezone = author_timezone
1062 if encoding is None:
1063 try:
1064 encoding = config.get(("i18n",), "commitEncoding")
1065 except KeyError:
1066 pass # No dice
1067 if encoding is not None:
1068 c.encoding = encoding
1069 if message is None:
1070 # FIXME: Try to read commit message from .git/MERGE_MSG
1071 raise ValueError("No commit message specified")
1073 try:
1074 if no_verify:
1075 c.message = message
1076 else:
1077 c.message = self.hooks["commit-msg"].execute(message)
1078 if c.message is None:
1079 c.message = message
1080 except HookError as exc:
1081 raise CommitError(exc) from exc
1082 except KeyError: # no hook defined, message not modified
1083 c.message = message
1085 # Check if we should sign the commit
1086 should_sign = sign
1087 if sign is None:
1088 # Check commit.gpgSign configuration when sign is not explicitly set
1089 config = self.get_config_stack()
1090 try:
1091 should_sign = config.get_boolean((b"commit",), b"gpgSign")
1092 except KeyError:
1093 should_sign = False # Default to not signing if no config
1094 keyid = sign if isinstance(sign, str) else None
1096 if ref is None:
1097 # Create a dangling commit
1098 c.parents = merge_heads
1099 if should_sign:
1100 c.sign(keyid)
1101 self.object_store.add_object(c)
1102 else:
1103 try:
1104 old_head = self.refs[ref]
1105 c.parents = [old_head, *merge_heads]
1106 if should_sign:
1107 c.sign(keyid)
1108 self.object_store.add_object(c)
1109 ok = self.refs.set_if_equals(
1110 ref,
1111 old_head,
1112 c.id,
1113 message=b"commit: " + message,
1114 committer=committer,
1115 timestamp=commit_timestamp,
1116 timezone=commit_timezone,
1117 )
1118 except KeyError:
1119 c.parents = merge_heads
1120 if should_sign:
1121 c.sign(keyid)
1122 self.object_store.add_object(c)
1123 ok = self.refs.add_if_new(
1124 ref,
1125 c.id,
1126 message=b"commit: " + message,
1127 committer=committer,
1128 timestamp=commit_timestamp,
1129 timezone=commit_timezone,
1130 )
1131 if not ok:
1132 # Fail if the atomic compare-and-swap failed, leaving the
1133 # commit and all its objects as garbage.
1134 raise CommitError(f"{ref!r} changed during commit")
1136 self._del_named_file("MERGE_HEAD")
1138 try:
1139 self.hooks["post-commit"].execute()
1140 except HookError as e: # silent failure
1141 warnings.warn(f"post-commit hook failed: {e}", UserWarning)
1142 except KeyError: # no hook defined, silent fallthrough
1143 pass
1145 # Trigger auto GC if needed
1146 from .gc import maybe_auto_gc
1148 maybe_auto_gc(self)
1150 return c.id
1153def read_gitfile(f):
1154 """Read a ``.git`` file.
1156 The first line of the file should start with "gitdir: "
1158 Args:
1159 f: File-like object to read from
1160 Returns: A path
1161 """
1162 cs = f.read()
1163 if not cs.startswith("gitdir: "):
1164 raise ValueError("Expected file to start with 'gitdir: '")
1165 return cs[len("gitdir: ") :].rstrip("\n")
1168class UnsupportedVersion(Exception):
1169 """Unsupported repository version."""
1171 def __init__(self, version) -> None:
1172 self.version = version
1175class UnsupportedExtension(Exception):
1176 """Unsupported repository extension."""
1178 def __init__(self, extension) -> None:
1179 self.extension = extension
1182class Repo(BaseRepo):
1183 """A git repository backed by local disk.
1185 To open an existing repository, call the constructor with
1186 the path of the repository.
1188 To create a new repository, use the Repo.init class method.
1190 Note that a repository object may hold on to resources such
1191 as file handles for performance reasons; call .close() to free
1192 up those resources.
1194 Attributes:
1195 path: Path to the working copy (if it exists) or repository control
1196 directory (if the repository is bare)
1197 bare: Whether this is a bare repository
1198 """
1200 path: str
1201 bare: bool
1203 def __init__(
1204 self,
1205 root: Union[str, bytes, os.PathLike],
1206 object_store: Optional[PackBasedObjectStore] = None,
1207 bare: Optional[bool] = None,
1208 ) -> None:
1209 """Open a repository on disk.
1211 Args:
1212 root: Path to the repository's root.
1213 object_store: ObjectStore to use; if omitted, we use the
1214 repository's default object store
1215 bare: True if this is a bare repository.
1216 """
1217 root = os.fspath(root)
1218 if isinstance(root, bytes):
1219 root = os.fsdecode(root)
1220 hidden_path = os.path.join(root, CONTROLDIR)
1221 if bare is None:
1222 if os.path.isfile(hidden_path) or os.path.isdir(
1223 os.path.join(hidden_path, OBJECTDIR)
1224 ):
1225 bare = False
1226 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(
1227 os.path.join(root, REFSDIR)
1228 ):
1229 bare = True
1230 else:
1231 raise NotGitRepository(
1232 "No git repository was found at {path}".format(**dict(path=root))
1233 )
1235 self.bare = bare
1236 if bare is False:
1237 if os.path.isfile(hidden_path):
1238 with open(hidden_path) as f:
1239 path = read_gitfile(f)
1240 self._controldir = os.path.join(root, path)
1241 else:
1242 self._controldir = hidden_path
1243 else:
1244 self._controldir = root
1245 commondir = self.get_named_file(COMMONDIR)
1246 if commondir is not None:
1247 with commondir:
1248 self._commondir = os.path.join(
1249 self.controldir(),
1250 os.fsdecode(commondir.read().rstrip(b"\r\n")),
1251 )
1252 else:
1253 self._commondir = self._controldir
1254 self.path = root
1256 # Initialize refs early so they're available for config condition matchers
1257 self.refs = DiskRefsContainer(
1258 self.commondir(), self._controldir, logger=self._write_reflog
1259 )
1261 config = self.get_config()
1262 try:
1263 repository_format_version = config.get("core", "repositoryformatversion")
1264 format_version = (
1265 0
1266 if repository_format_version is None
1267 else int(repository_format_version)
1268 )
1269 except KeyError:
1270 format_version = 0
1272 if format_version not in (0, 1):
1273 raise UnsupportedVersion(format_version)
1275 # Track extensions we encounter
1276 has_reftable_extension = False
1277 for extension, value in config.items((b"extensions",)):
1278 if extension.lower() == b"refstorage":
1279 if value == b"reftable":
1280 has_reftable_extension = True
1281 else:
1282 raise UnsupportedExtension(f"refStorage = {value.decode()}")
1283 elif extension.lower() not in (b"worktreeconfig",):
1284 raise UnsupportedExtension(extension)
1286 if object_store is None:
1287 object_store = DiskObjectStore.from_config(
1288 os.path.join(self.commondir(), OBJECTDIR), config
1289 )
1291 # Use reftable if extension is configured
1292 if has_reftable_extension:
1293 from .reftable import ReftableRefsContainer
1295 self.refs = ReftableRefsContainer(self.commondir())
1296 BaseRepo.__init__(self, object_store, self.refs)
1298 self._graftpoints = {}
1299 graft_file = self.get_named_file(
1300 os.path.join("info", "grafts"), basedir=self.commondir()
1301 )
1302 if graft_file:
1303 with graft_file:
1304 self._graftpoints.update(parse_graftpoints(graft_file))
1305 graft_file = self.get_named_file("shallow", basedir=self.commondir())
1306 if graft_file:
1307 with graft_file:
1308 self._graftpoints.update(parse_graftpoints(graft_file))
1310 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir())
1311 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir())
1312 self.hooks["post-commit"] = PostCommitShellHook(self.controldir())
1313 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir())
1315 def _write_reflog(
1316 self, ref, old_sha, new_sha, committer, timestamp, timezone, message
1317 ) -> None:
1318 from .reflog import format_reflog_line
1320 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref))
1321 try:
1322 os.makedirs(os.path.dirname(path))
1323 except FileExistsError:
1324 pass
1325 if committer is None:
1326 config = self.get_config_stack()
1327 committer = get_user_identity(config)
1328 check_user_identity(committer)
1329 if timestamp is None:
1330 timestamp = int(time.time())
1331 if timezone is None:
1332 timezone = 0 # FIXME
1333 with open(path, "ab") as f:
1334 f.write(
1335 format_reflog_line(
1336 old_sha, new_sha, committer, timestamp, timezone, message
1337 )
1338 + b"\n"
1339 )
1341 def read_reflog(self, ref):
1342 """Read reflog entries for a reference.
1344 Args:
1345 ref: Reference name (e.g. b'HEAD', b'refs/heads/master')
1347 Yields:
1348 reflog.Entry objects in chronological order (oldest first)
1349 """
1350 from .reflog import read_reflog
1352 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref))
1353 try:
1354 with open(path, "rb") as f:
1355 yield from read_reflog(f)
1356 except FileNotFoundError:
1357 return
1359 @classmethod
1360 def discover(cls, start="."):
1361 """Iterate parent directories to discover a repository.
1363 Return a Repo object for the first parent directory that looks like a
1364 Git repository.
1366 Args:
1367 start: The directory to start discovery from (defaults to '.')
1368 """
1369 remaining = True
1370 path = os.path.abspath(start)
1371 while remaining:
1372 try:
1373 return cls(path)
1374 except NotGitRepository:
1375 path, remaining = os.path.split(path)
1376 raise NotGitRepository(
1377 "No git repository was found at {path}".format(**dict(path=start))
1378 )
1380 def controldir(self):
1381 """Return the path of the control directory."""
1382 return self._controldir
1384 def commondir(self):
1385 """Return the path of the common directory.
1387 For a main working tree, it is identical to controldir().
1389 For a linked working tree, it is the control directory of the
1390 main working tree.
1391 """
1392 return self._commondir
1394 def _determine_file_mode(self):
1395 """Probe the file-system to determine whether permissions can be trusted.
1397 Returns: True if permissions can be trusted, False otherwise.
1398 """
1399 fname = os.path.join(self.path, ".probe-permissions")
1400 with open(fname, "w") as f:
1401 f.write("")
1403 st1 = os.lstat(fname)
1404 try:
1405 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR)
1406 except PermissionError:
1407 return False
1408 st2 = os.lstat(fname)
1410 os.unlink(fname)
1412 mode_differs = st1.st_mode != st2.st_mode
1413 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0
1415 return mode_differs and st2_has_exec
1417 def _determine_symlinks(self):
1418 """Probe the filesystem to determine whether symlinks can be created.
1420 Returns: True if symlinks can be created, False otherwise.
1421 """
1422 # TODO(jelmer): Actually probe disk / look at filesystem
1423 return sys.platform != "win32"
1425 def _put_named_file(self, path, contents) -> None:
1426 """Write a file to the control dir with the given name and contents.
1428 Args:
1429 path: The path to the file, relative to the control dir.
1430 contents: A string to write to the file.
1431 """
1432 path = path.lstrip(os.path.sep)
1433 with GitFile(os.path.join(self.controldir(), path), "wb") as f:
1434 f.write(contents)
1436 def _del_named_file(self, path) -> None:
1437 try:
1438 os.unlink(os.path.join(self.controldir(), path))
1439 except FileNotFoundError:
1440 return
1442 def get_named_file(self, path, basedir=None):
1443 """Get a file from the control dir with a specific name.
1445 Although the filename should be interpreted as a filename relative to
1446 the control dir in a disk-based Repo, the object returned need not be
1447 pointing to a file in that location.
1449 Args:
1450 path: The path to the file, relative to the control dir.
1451 basedir: Optional argument that specifies an alternative to the
1452 control dir.
1453 Returns: An open file object, or None if the file does not exist.
1454 """
1455 # TODO(dborowitz): sanitize filenames, since this is used directly by
1456 # the dumb web serving code.
1457 if basedir is None:
1458 basedir = self.controldir()
1459 path = path.lstrip(os.path.sep)
1460 try:
1461 return open(os.path.join(basedir, path), "rb")
1462 except FileNotFoundError:
1463 return None
1465 def index_path(self):
1466 """Return path to the index file."""
1467 return os.path.join(self.controldir(), INDEX_FILENAME)
1469 def open_index(self) -> "Index":
1470 """Open the index for this repository.
1472 Raises:
1473 NoIndexPresent: If no index is present
1474 Returns: The matching `Index`
1475 """
1476 from .index import Index
1478 if not self.has_index():
1479 raise NoIndexPresent
1481 # Check for manyFiles feature configuration
1482 config = self.get_config_stack()
1483 many_files = config.get_boolean(b"feature", b"manyFiles", False)
1484 skip_hash = False
1485 index_version = None
1487 if many_files:
1488 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true
1489 try:
1490 index_version_str = config.get(b"index", b"version")
1491 index_version = int(index_version_str)
1492 except KeyError:
1493 index_version = 4 # Default to version 4 for manyFiles
1494 skip_hash = config.get_boolean(b"index", b"skipHash", True)
1495 else:
1496 # Check for explicit index settings
1497 try:
1498 index_version_str = config.get(b"index", b"version")
1499 index_version = int(index_version_str)
1500 except KeyError:
1501 index_version = None
1502 skip_hash = config.get_boolean(b"index", b"skipHash", False)
1504 return Index(self.index_path(), skip_hash=skip_hash, version=index_version)
1506 def has_index(self) -> bool:
1507 """Check if an index is present."""
1508 # Bare repos must never have index files; non-bare repos may have a
1509 # missing index file, which is treated as empty.
1510 return not self.bare
1512 def stage(
1513 self,
1514 fs_paths: Union[
1515 str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]]
1516 ],
1517 ) -> None:
1518 """Stage a set of paths.
1520 Args:
1521 fs_paths: List of paths, relative to the repository path
1522 """
1523 root_path_bytes = os.fsencode(self.path)
1525 if isinstance(fs_paths, (str, bytes, os.PathLike)):
1526 fs_paths = [fs_paths]
1527 fs_paths = list(fs_paths)
1529 from .index import (
1530 _fs_to_tree_path,
1531 blob_from_path_and_stat,
1532 index_entry_from_directory,
1533 index_entry_from_stat,
1534 )
1536 index = self.open_index()
1537 blob_normalizer = self.get_blob_normalizer()
1538 for fs_path in fs_paths:
1539 if not isinstance(fs_path, bytes):
1540 fs_path = os.fsencode(fs_path)
1541 if os.path.isabs(fs_path):
1542 raise ValueError(
1543 f"path {fs_path!r} should be relative to "
1544 "repository root, not absolute"
1545 )
1546 tree_path = _fs_to_tree_path(fs_path)
1547 full_path = os.path.join(root_path_bytes, fs_path)
1548 try:
1549 st = os.lstat(full_path)
1550 except OSError:
1551 # File no longer exists
1552 try:
1553 del index[tree_path]
1554 except KeyError:
1555 pass # already removed
1556 else:
1557 if stat.S_ISDIR(st.st_mode):
1558 entry = index_entry_from_directory(st, full_path)
1559 if entry:
1560 index[tree_path] = entry
1561 else:
1562 try:
1563 del index[tree_path]
1564 except KeyError:
1565 pass
1566 elif not stat.S_ISREG(st.st_mode) and not stat.S_ISLNK(st.st_mode):
1567 try:
1568 del index[tree_path]
1569 except KeyError:
1570 pass
1571 else:
1572 blob = blob_from_path_and_stat(full_path, st)
1573 blob = blob_normalizer.checkin_normalize(blob, fs_path)
1574 self.object_store.add_object(blob)
1575 index[tree_path] = index_entry_from_stat(st, blob.id)
1576 index.write()
1578 def unstage(self, fs_paths: list[str]) -> None:
1579 """Unstage specific file in the index
1580 Args:
1581 fs_paths: a list of files to unstage,
1582 relative to the repository path.
1583 """
1584 from .index import IndexEntry, _fs_to_tree_path
1586 index = self.open_index()
1587 try:
1588 tree_id = self[b"HEAD"].tree
1589 except KeyError:
1590 # no head mean no commit in the repo
1591 for fs_path in fs_paths:
1592 tree_path = _fs_to_tree_path(fs_path)
1593 del index[tree_path]
1594 index.write()
1595 return
1597 for fs_path in fs_paths:
1598 tree_path = _fs_to_tree_path(fs_path)
1599 try:
1600 tree = self.object_store[tree_id]
1601 assert isinstance(tree, Tree)
1602 tree_entry = tree.lookup_path(self.object_store.__getitem__, tree_path)
1603 except KeyError:
1604 # if tree_entry didn't exist, this file was being added, so
1605 # remove index entry
1606 try:
1607 del index[tree_path]
1608 continue
1609 except KeyError as exc:
1610 raise KeyError(f"file '{tree_path.decode()}' not in index") from exc
1612 st = None
1613 try:
1614 st = os.lstat(os.path.join(self.path, fs_path))
1615 except FileNotFoundError:
1616 pass
1618 index_entry = IndexEntry(
1619 ctime=(self[b"HEAD"].commit_time, 0),
1620 mtime=(self[b"HEAD"].commit_time, 0),
1621 dev=st.st_dev if st else 0,
1622 ino=st.st_ino if st else 0,
1623 mode=tree_entry[0],
1624 uid=st.st_uid if st else 0,
1625 gid=st.st_gid if st else 0,
1626 size=len(self[tree_entry[1]].data),
1627 sha=tree_entry[1],
1628 flags=0,
1629 extended_flags=0,
1630 )
1632 index[tree_path] = index_entry
1633 index.write()
1635 def clone(
1636 self,
1637 target_path,
1638 *,
1639 mkdir=True,
1640 bare=False,
1641 origin=b"origin",
1642 checkout=None,
1643 branch=None,
1644 progress=None,
1645 depth: Optional[int] = None,
1646 symlinks=None,
1647 ) -> "Repo":
1648 """Clone this repository.
1650 Args:
1651 target_path: Target path
1652 mkdir: Create the target directory
1653 bare: Whether to create a bare repository
1654 checkout: Whether or not to check-out HEAD after cloning
1655 origin: Base name for refs in target repository
1656 cloned from this repository
1657 branch: Optional branch or tag to be used as HEAD in the new repository
1658 instead of this repository's HEAD.
1659 progress: Optional progress function
1660 depth: Depth at which to fetch
1661 symlinks: Symlinks setting (default to autodetect)
1662 Returns: Created repository as `Repo`
1663 """
1664 encoded_path = os.fsencode(self.path)
1666 if mkdir:
1667 os.mkdir(target_path)
1669 try:
1670 if not bare:
1671 target = Repo.init(target_path, symlinks=symlinks)
1672 if checkout is None:
1673 checkout = True
1674 else:
1675 if checkout:
1676 raise ValueError("checkout and bare are incompatible")
1677 target = Repo.init_bare(target_path)
1679 try:
1680 target_config = target.get_config()
1681 target_config.set((b"remote", origin), b"url", encoded_path)
1682 target_config.set(
1683 (b"remote", origin),
1684 b"fetch",
1685 b"+refs/heads/*:refs/remotes/" + origin + b"/*",
1686 )
1687 target_config.write_to_path()
1689 ref_message = b"clone: from " + encoded_path
1690 self.fetch(target, depth=depth)
1691 target.refs.import_refs(
1692 b"refs/remotes/" + origin,
1693 self.refs.as_dict(b"refs/heads"),
1694 message=ref_message,
1695 )
1696 target.refs.import_refs(
1697 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message
1698 )
1700 head_chain, origin_sha = self.refs.follow(b"HEAD")
1701 origin_head = head_chain[-1] if head_chain else None
1702 if origin_sha and not origin_head:
1703 # set detached HEAD
1704 target.refs[b"HEAD"] = origin_sha
1705 else:
1706 _set_origin_head(target.refs, origin, origin_head)
1707 head_ref = _set_default_branch(
1708 target.refs, origin, origin_head, branch, ref_message
1709 )
1711 # Update target head
1712 if head_ref:
1713 head = _set_head(target.refs, head_ref, ref_message)
1714 else:
1715 head = None
1717 if checkout and head is not None:
1718 target.reset_index()
1719 except BaseException:
1720 target.close()
1721 raise
1722 except BaseException:
1723 if mkdir:
1724 import shutil
1726 shutil.rmtree(target_path)
1727 raise
1728 return target
1730 def reset_index(self, tree: Optional[bytes] = None):
1731 """Reset the index back to a specific tree.
1733 Args:
1734 tree: Tree SHA to reset to, None for current HEAD tree.
1735 """
1736 from .index import (
1737 build_index_from_tree,
1738 symlink,
1739 validate_path_element_default,
1740 validate_path_element_hfs,
1741 validate_path_element_ntfs,
1742 )
1744 if tree is None:
1745 head = self[b"HEAD"]
1746 if isinstance(head, Tag):
1747 _cls, obj = head.object
1748 head = self.get_object(obj)
1749 tree = head.tree
1750 config = self.get_config()
1751 honor_filemode = config.get_boolean(b"core", b"filemode", os.name != "nt")
1752 if config.get_boolean(b"core", b"core.protectNTFS", os.name == "nt"):
1753 validate_path_element = validate_path_element_ntfs
1754 elif config.get_boolean(b"core", b"core.protectHFS", sys.platform == "darwin"):
1755 validate_path_element = validate_path_element_hfs
1756 else:
1757 validate_path_element = validate_path_element_default
1758 if config.get_boolean(b"core", b"symlinks", True):
1759 symlink_fn = symlink
1760 else:
1762 def symlink_fn(source, target) -> None: # type: ignore
1763 with open(
1764 target, "w" + ("b" if isinstance(source, bytes) else "")
1765 ) as f:
1766 f.write(source)
1768 blob_normalizer = self.get_blob_normalizer()
1769 return build_index_from_tree(
1770 self.path,
1771 self.index_path(),
1772 self.object_store,
1773 tree,
1774 honor_filemode=honor_filemode,
1775 validate_path_element=validate_path_element,
1776 symlink_fn=symlink_fn,
1777 blob_normalizer=blob_normalizer,
1778 )
1780 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]:
1781 """Get condition matchers for includeIf conditions.
1783 Returns a dict of condition prefix to matcher function.
1784 """
1785 from pathlib import Path
1787 from .config import ConditionMatcher, match_glob_pattern
1789 # Add gitdir matchers
1790 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool:
1791 # Handle relative patterns (starting with ./)
1792 if pattern.startswith("./"):
1793 # Can't handle relative patterns without config directory context
1794 return False
1796 # Normalize repository path
1797 try:
1798 repo_path = str(Path(self._controldir).resolve())
1799 except (OSError, ValueError):
1800 return False
1802 # Expand ~ in pattern and normalize
1803 pattern = os.path.expanduser(pattern)
1805 # Normalize pattern following Git's rules
1806 pattern = pattern.replace("\\", "/")
1807 if not pattern.startswith(("~/", "./", "/", "**")):
1808 # Check for Windows absolute path
1809 if len(pattern) >= 2 and pattern[1] == ":":
1810 pass
1811 else:
1812 pattern = "**/" + pattern
1813 if pattern.endswith("/"):
1814 pattern = pattern + "**"
1816 # Use the existing _match_gitdir_pattern function
1817 from .config import _match_gitdir_pattern
1819 pattern_bytes = pattern.encode("utf-8", errors="replace")
1820 repo_path_bytes = repo_path.encode("utf-8", errors="replace")
1822 return _match_gitdir_pattern(
1823 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive
1824 )
1826 # Add onbranch matcher
1827 def match_onbranch(pattern: str) -> bool:
1828 try:
1829 # Get the current branch using refs
1830 ref_chain, _ = self.refs.follow(b"HEAD")
1831 head_ref = ref_chain[-1] # Get the final resolved ref
1832 except KeyError:
1833 pass
1834 else:
1835 if head_ref and head_ref.startswith(b"refs/heads/"):
1836 # Extract branch name from ref
1837 branch = head_ref[11:].decode("utf-8", errors="replace")
1838 return match_glob_pattern(branch, pattern)
1839 return False
1841 matchers: dict[str, ConditionMatcher] = {
1842 "onbranch:": match_onbranch,
1843 "gitdir:": lambda pattern: match_gitdir(pattern, True),
1844 "gitdir/i:": lambda pattern: match_gitdir(pattern, False),
1845 }
1847 return matchers
1849 def get_worktree_config(self) -> "ConfigFile":
1850 from .config import ConfigFile
1852 path = os.path.join(self.commondir(), "config.worktree")
1853 try:
1854 # Pass condition matchers for includeIf evaluation
1855 condition_matchers = self._get_config_condition_matchers()
1856 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1857 except FileNotFoundError:
1858 cf = ConfigFile()
1859 cf.path = path
1860 return cf
1862 def get_config(self) -> "ConfigFile":
1863 """Retrieve the config object.
1865 Returns: `ConfigFile` object for the ``.git/config`` file.
1866 """
1867 from .config import ConfigFile
1869 path = os.path.join(self._commondir, "config")
1870 try:
1871 # Pass condition matchers for includeIf evaluation
1872 condition_matchers = self._get_config_condition_matchers()
1873 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1874 except FileNotFoundError:
1875 ret = ConfigFile()
1876 ret.path = path
1877 return ret
1879 def get_rebase_state_manager(self):
1880 """Get the appropriate rebase state manager for this repository.
1882 Returns: DiskRebaseStateManager instance
1883 """
1884 import os
1886 from .rebase import DiskRebaseStateManager
1888 path = os.path.join(self.controldir(), "rebase-merge")
1889 return DiskRebaseStateManager(path)
1891 def get_description(self):
1892 """Retrieve the description of this repository.
1894 Returns: A string describing the repository or None.
1895 """
1896 path = os.path.join(self._controldir, "description")
1897 try:
1898 with GitFile(path, "rb") as f:
1899 return f.read()
1900 except FileNotFoundError:
1901 return None
1903 def __repr__(self) -> str:
1904 return f"<Repo at {self.path!r}>"
1906 def set_description(self, description) -> None:
1907 """Set the description for this repository.
1909 Args:
1910 description: Text to set as description for this repository.
1911 """
1912 self._put_named_file("description", description)
1914 @classmethod
1915 def _init_maybe_bare(
1916 cls,
1917 path: Union[str, bytes, os.PathLike],
1918 controldir: Union[str, bytes, os.PathLike],
1919 bare,
1920 object_store=None,
1921 config=None,
1922 default_branch=None,
1923 symlinks: Optional[bool] = None,
1924 format: Optional[int] = None,
1925 ):
1926 path = os.fspath(path)
1927 if isinstance(path, bytes):
1928 path = os.fsdecode(path)
1929 controldir = os.fspath(controldir)
1930 if isinstance(controldir, bytes):
1931 controldir = os.fsdecode(controldir)
1932 for d in BASE_DIRECTORIES:
1933 os.mkdir(os.path.join(controldir, *d))
1934 if object_store is None:
1935 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR))
1936 ret = cls(path, bare=bare, object_store=object_store)
1937 if default_branch is None:
1938 if config is None:
1939 from .config import StackedConfig
1941 config = StackedConfig.default()
1942 try:
1943 default_branch = config.get("init", "defaultBranch")
1944 except KeyError:
1945 default_branch = DEFAULT_BRANCH
1946 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch)
1947 ret._init_files(bare=bare, symlinks=symlinks, format=format)
1948 return ret
1950 @classmethod
1951 def init(
1952 cls,
1953 path: Union[str, bytes, os.PathLike],
1954 *,
1955 mkdir: bool = False,
1956 config=None,
1957 default_branch=None,
1958 symlinks: Optional[bool] = None,
1959 format: Optional[int] = None,
1960 ) -> "Repo":
1961 """Create a new repository.
1963 Args:
1964 path: Path in which to create the repository
1965 mkdir: Whether to create the directory
1966 format: Repository format version (defaults to 0)
1967 Returns: `Repo` instance
1968 """
1969 path = os.fspath(path)
1970 if isinstance(path, bytes):
1971 path = os.fsdecode(path)
1972 if mkdir:
1973 os.mkdir(path)
1974 controldir = os.path.join(path, CONTROLDIR)
1975 os.mkdir(controldir)
1976 _set_filesystem_hidden(controldir)
1977 return cls._init_maybe_bare(
1978 path,
1979 controldir,
1980 False,
1981 config=config,
1982 default_branch=default_branch,
1983 symlinks=symlinks,
1984 format=format,
1985 )
1987 @classmethod
1988 def _init_new_working_directory(
1989 cls,
1990 path: Union[str, bytes, os.PathLike],
1991 main_repo,
1992 identifier=None,
1993 mkdir=False,
1994 ):
1995 """Create a new working directory linked to a repository.
1997 Args:
1998 path: Path in which to create the working tree.
1999 main_repo: Main repository to reference
2000 identifier: Worktree identifier
2001 mkdir: Whether to create the directory
2002 Returns: `Repo` instance
2003 """
2004 path = os.fspath(path)
2005 if isinstance(path, bytes):
2006 path = os.fsdecode(path)
2007 if mkdir:
2008 os.mkdir(path)
2009 if identifier is None:
2010 identifier = os.path.basename(path)
2011 main_worktreesdir = os.path.join(main_repo.controldir(), WORKTREES)
2012 worktree_controldir = os.path.join(main_worktreesdir, identifier)
2013 gitdirfile = os.path.join(path, CONTROLDIR)
2014 with open(gitdirfile, "wb") as f:
2015 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n")
2016 try:
2017 os.mkdir(main_worktreesdir)
2018 except FileExistsError:
2019 pass
2020 try:
2021 os.mkdir(worktree_controldir)
2022 except FileExistsError:
2023 pass
2024 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f:
2025 f.write(os.fsencode(gitdirfile) + b"\n")
2026 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f:
2027 f.write(b"../..\n")
2028 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f:
2029 f.write(main_repo.head() + b"\n")
2030 r = cls(path)
2031 r.reset_index()
2032 return r
2034 @classmethod
2035 def init_bare(
2036 cls,
2037 path: Union[str, bytes, os.PathLike],
2038 *,
2039 mkdir=False,
2040 object_store=None,
2041 config=None,
2042 default_branch=None,
2043 format: Optional[int] = None,
2044 ):
2045 """Create a new bare repository.
2047 ``path`` should already exist and be an empty directory.
2049 Args:
2050 path: Path to create bare repository in
2051 format: Repository format version (defaults to 0)
2052 Returns: a `Repo` instance
2053 """
2054 path = os.fspath(path)
2055 if isinstance(path, bytes):
2056 path = os.fsdecode(path)
2057 if mkdir:
2058 os.mkdir(path)
2059 return cls._init_maybe_bare(
2060 path,
2061 path,
2062 True,
2063 object_store=object_store,
2064 config=config,
2065 default_branch=default_branch,
2066 format=format,
2067 )
2069 create = init_bare
2071 def close(self) -> None:
2072 """Close any files opened by this repository."""
2073 self.object_store.close()
2075 def __enter__(self):
2076 return self
2078 def __exit__(self, exc_type, exc_val, exc_tb):
2079 self.close()
2081 def get_blob_normalizer(self):
2082 """Return a BlobNormalizer object."""
2083 # TODO Parse the git attributes files
2084 git_attributes = {}
2085 config_stack = self.get_config_stack()
2086 try:
2087 head_sha = self.refs[b"HEAD"]
2088 # Peel tags to get the underlying commit
2089 _, obj = peel_sha(self.object_store, head_sha)
2090 tree = obj.tree
2091 return TreeBlobNormalizer(
2092 config_stack,
2093 git_attributes,
2094 self.object_store,
2095 tree,
2096 )
2097 except KeyError:
2098 return BlobNormalizer(config_stack, git_attributes)
2100 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
2101 """Read gitattributes for the repository.
2103 Args:
2104 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
2106 Returns:
2107 GitAttributes object that can be used to match paths
2108 """
2109 from .attrs import (
2110 GitAttributes,
2111 Pattern,
2112 parse_git_attributes,
2113 )
2115 patterns = []
2117 # Read system gitattributes (TODO: implement this)
2118 # Read global gitattributes (TODO: implement this)
2120 # Read repository .gitattributes from index/tree
2121 if tree is None:
2122 try:
2123 # Try to get from HEAD
2124 head = self[b"HEAD"]
2125 if isinstance(head, Tag):
2126 _cls, obj = head.object
2127 head = self.get_object(obj)
2128 tree = head.tree
2129 except KeyError:
2130 # No HEAD, no attributes from tree
2131 pass
2133 if tree is not None:
2134 try:
2135 tree_obj = self[tree]
2136 if b".gitattributes" in tree_obj:
2137 _, attrs_sha = tree_obj[b".gitattributes"]
2138 attrs_blob = self[attrs_sha]
2139 if isinstance(attrs_blob, Blob):
2140 attrs_data = BytesIO(attrs_blob.data)
2141 for pattern_bytes, attrs in parse_git_attributes(attrs_data):
2142 pattern = Pattern(pattern_bytes)
2143 patterns.append((pattern, attrs))
2144 except (KeyError, NotTreeError):
2145 pass
2147 # Read .git/info/attributes
2148 info_attrs_path = os.path.join(self.controldir(), "info", "attributes")
2149 if os.path.exists(info_attrs_path):
2150 with open(info_attrs_path, "rb") as f:
2151 for pattern_bytes, attrs in parse_git_attributes(f):
2152 pattern = Pattern(pattern_bytes)
2153 patterns.append((pattern, attrs))
2155 return GitAttributes(patterns)
2157 def _sparse_checkout_file_path(self) -> str:
2158 """Return the path of the sparse-checkout file in this repo's control dir."""
2159 return os.path.join(self.controldir(), "info", "sparse-checkout")
2161 def configure_for_cone_mode(self) -> None:
2162 """Ensure the repository is configured for cone-mode sparse-checkout."""
2163 config = self.get_config()
2164 config.set((b"core",), b"sparseCheckout", b"true")
2165 config.set((b"core",), b"sparseCheckoutCone", b"true")
2166 config.write_to_path()
2168 def infer_cone_mode(self) -> bool:
2169 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
2170 config = self.get_config()
2171 try:
2172 sc_cone = config.get((b"core",), b"sparseCheckoutCone")
2173 return sc_cone == b"true"
2174 except KeyError:
2175 # If core.sparseCheckoutCone is not set, default to False
2176 return False
2178 def get_sparse_checkout_patterns(self) -> list[str]:
2179 """Return a list of sparse-checkout patterns from info/sparse-checkout.
2181 Returns:
2182 A list of patterns. Returns an empty list if the file is missing.
2183 """
2184 path = self._sparse_checkout_file_path()
2185 try:
2186 with open(path, encoding="utf-8") as f:
2187 return [line.strip() for line in f if line.strip()]
2188 except FileNotFoundError:
2189 return []
2191 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None:
2192 """Write the given sparse-checkout patterns into info/sparse-checkout.
2194 Creates the info/ directory if it does not exist.
2196 Args:
2197 patterns: A list of gitignore-style patterns to store.
2198 """
2199 info_dir = os.path.join(self.controldir(), "info")
2200 os.makedirs(info_dir, exist_ok=True)
2202 path = self._sparse_checkout_file_path()
2203 with open(path, "w", encoding="utf-8") as f:
2204 for pat in patterns:
2205 f.write(pat + "\n")
2207 def set_cone_mode_patterns(self, dirs: Union[list[str], None] = None) -> None:
2208 """Write the given cone-mode directory patterns into info/sparse-checkout.
2210 For each directory to include, add an inclusion line that "undoes" the prior
2211 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
2212 Never add the same line twice.
2213 """
2214 patterns = ["/*", "!/*/"]
2215 if dirs:
2216 for d in dirs:
2217 d = d.strip("/")
2218 line = f"/{d}/"
2219 if d and line not in patterns:
2220 patterns.append(line)
2221 self.set_sparse_checkout_patterns(patterns)
2224class MemoryRepo(BaseRepo):
2225 """Repo that stores refs, objects, and named files in memory.
2227 MemoryRepos are always bare: they have no working tree and no index, since
2228 those have a stronger dependency on the filesystem.
2229 """
2231 def __init__(self) -> None:
2232 """Create a new repository in memory."""
2233 from .config import ConfigFile
2235 self._reflog: list[Any] = []
2236 refs_container = DictRefsContainer({}, logger=self._append_reflog)
2237 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) # type: ignore
2238 self._named_files: dict[str, bytes] = {}
2239 self.bare = True
2240 self._config = ConfigFile()
2241 self._description = None
2243 def _append_reflog(self, *args) -> None:
2244 self._reflog.append(args)
2246 def set_description(self, description) -> None:
2247 self._description = description
2249 def get_description(self):
2250 return self._description
2252 def _determine_file_mode(self):
2253 """Probe the file-system to determine whether permissions can be trusted.
2255 Returns: True if permissions can be trusted, False otherwise.
2256 """
2257 return sys.platform != "win32"
2259 def _determine_symlinks(self):
2260 """Probe the file-system to determine whether permissions can be trusted.
2262 Returns: True if permissions can be trusted, False otherwise.
2263 """
2264 return sys.platform != "win32"
2266 def _put_named_file(self, path, contents) -> None:
2267 """Write a file to the control dir with the given name and contents.
2269 Args:
2270 path: The path to the file, relative to the control dir.
2271 contents: A string to write to the file.
2272 """
2273 self._named_files[path] = contents
2275 def _del_named_file(self, path) -> None:
2276 try:
2277 del self._named_files[path]
2278 except KeyError:
2279 pass
2281 def get_named_file(self, path, basedir=None):
2282 """Get a file from the control dir with a specific name.
2284 Although the filename should be interpreted as a filename relative to
2285 the control dir in a disk-baked Repo, the object returned need not be
2286 pointing to a file in that location.
2288 Args:
2289 path: The path to the file, relative to the control dir.
2290 Returns: An open file object, or None if the file does not exist.
2291 """
2292 contents = self._named_files.get(path, None)
2293 if contents is None:
2294 return None
2295 return BytesIO(contents)
2297 def open_index(self) -> "Index":
2298 """Fail to open index for this repo, since it is bare.
2300 Raises:
2301 NoIndexPresent: Raised when no index is present
2302 """
2303 raise NoIndexPresent
2305 def get_config(self):
2306 """Retrieve the config object.
2308 Returns: `ConfigFile` object.
2309 """
2310 return self._config
2312 def get_rebase_state_manager(self):
2313 """Get the appropriate rebase state manager for this repository.
2315 Returns: MemoryRebaseStateManager instance
2316 """
2317 from .rebase import MemoryRebaseStateManager
2319 return MemoryRebaseStateManager(self)
2321 @classmethod
2322 def init_bare(cls, objects, refs, format: Optional[int] = None):
2323 """Create a new bare repository in memory.
2325 Args:
2326 objects: Objects for the new repository,
2327 as iterable
2328 refs: Refs as dictionary, mapping names
2329 to object SHA1s
2330 format: Repository format version (defaults to 0)
2331 """
2332 ret = cls()
2333 for obj in objects:
2334 ret.object_store.add_object(obj)
2335 for refname, sha in refs.items():
2336 ret.refs.add_if_new(refname, sha)
2337 ret._init_files(bare=True, format=format)
2338 return ret