Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# repo.py -- For dealing with git repositories.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
24"""Repository access.
26This module contains the base class for git repositories
27(BaseRepo) and an implementation which uses a repository on
28local disk (Repo).
30"""
32import os
33import stat
34import sys
35import time
36import warnings
37from collections.abc import Iterable, Iterator
38from io import BytesIO
39from typing import (
40 TYPE_CHECKING,
41 Any,
42 BinaryIO,
43 Callable,
44 Optional,
45 TypeVar,
46 Union,
47)
49if TYPE_CHECKING:
50 # There are no circular imports here, but we try to defer imports as long
51 # as possible to reduce start-up time for anything that doesn't need
52 # these imports.
53 from .attrs import GitAttributes
54 from .config import ConditionMatcher, ConfigFile, StackedConfig
55 from .index import Index
56 from .line_ending import BlobNormalizer
57 from .notes import Notes
58 from .object_store import BaseObjectStore, GraphWalker, UnpackedObject
59 from .rebase import RebaseStateManager
60 from .walk import Walker
61 from .worktree import WorkTree
63from . import replace_me
64from .errors import (
65 NoIndexPresent,
66 NotBlobError,
67 NotCommitError,
68 NotGitRepository,
69 NotTagError,
70 NotTreeError,
71 RefFormatError,
72)
73from .file import GitFile
74from .hooks import (
75 CommitMsgShellHook,
76 Hook,
77 PostCommitShellHook,
78 PostReceiveShellHook,
79 PreCommitShellHook,
80)
81from .object_store import (
82 DiskObjectStore,
83 MemoryObjectStore,
84 MissingObjectFinder,
85 ObjectStoreGraphWalker,
86 PackBasedObjectStore,
87 find_shallow,
88 peel_sha,
89)
90from .objects import (
91 Blob,
92 Commit,
93 ObjectID,
94 ShaFile,
95 Tag,
96 Tree,
97 check_hexsha,
98 valid_hexsha,
99)
100from .pack import generate_unpacked_objects
101from .refs import (
102 ANNOTATED_TAG_SUFFIX, # noqa: F401
103 LOCAL_BRANCH_PREFIX,
104 LOCAL_TAG_PREFIX, # noqa: F401
105 SYMREF, # noqa: F401
106 DictRefsContainer,
107 DiskRefsContainer,
108 InfoRefsContainer, # noqa: F401
109 Ref,
110 RefsContainer,
111 _set_default_branch,
112 _set_head,
113 _set_origin_head,
114 check_ref_format, # noqa: F401
115 is_per_worktree_ref,
116 read_packed_refs, # noqa: F401
117 read_packed_refs_with_peeled, # noqa: F401
118 serialize_refs,
119 write_packed_refs, # noqa: F401
120)
122CONTROLDIR = ".git"
123OBJECTDIR = "objects"
125T = TypeVar("T", bound="ShaFile")
126REFSDIR = "refs"
127REFSDIR_TAGS = "tags"
128REFSDIR_HEADS = "heads"
129INDEX_FILENAME = "index"
130COMMONDIR = "commondir"
131GITDIR = "gitdir"
132WORKTREES = "worktrees"
134BASE_DIRECTORIES = [
135 ["branches"],
136 [REFSDIR],
137 [REFSDIR, REFSDIR_TAGS],
138 [REFSDIR, REFSDIR_HEADS],
139 ["hooks"],
140 ["info"],
141]
143DEFAULT_BRANCH = b"master"
146class InvalidUserIdentity(Exception):
147 """User identity is not of the format 'user <email>'."""
149 def __init__(self, identity: str) -> None:
150 """Initialize InvalidUserIdentity exception."""
151 self.identity = identity
154class DefaultIdentityNotFound(Exception):
155 """Default identity could not be determined."""
158# TODO(jelmer): Cache?
159def _get_default_identity() -> tuple[str, str]:
160 import socket
162 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
163 username = os.environ.get(name)
164 if username:
165 break
166 else:
167 username = None
169 try:
170 import pwd
171 except ImportError:
172 fullname = None
173 else:
174 try:
175 entry = pwd.getpwuid(os.getuid()) # type: ignore
176 except KeyError:
177 fullname = None
178 else:
179 if getattr(entry, "gecos", None):
180 fullname = entry.pw_gecos.split(",")[0]
181 else:
182 fullname = None
183 if username is None:
184 username = entry.pw_name
185 if not fullname:
186 if username is None:
187 raise DefaultIdentityNotFound("no username found")
188 fullname = username
189 email = os.environ.get("EMAIL")
190 if email is None:
191 if username is None:
192 raise DefaultIdentityNotFound("no username found")
193 email = f"{username}@{socket.gethostname()}"
194 return (fullname, email)
197def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes:
198 """Determine the identity to use for new commits.
200 If kind is set, this first checks
201 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL.
203 If those variables are not set, then it will fall back
204 to reading the user.name and user.email settings from
205 the specified configuration.
207 If that also fails, then it will fall back to using
208 the current users' identity as obtained from the host
209 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f).
211 Args:
212 config: Configuration stack to read from
213 kind: Optional kind to return identity for,
214 usually either "AUTHOR" or "COMMITTER".
216 Returns:
217 A user identity
218 """
219 user: Optional[bytes] = None
220 email: Optional[bytes] = None
221 if kind:
222 user_uc = os.environ.get("GIT_" + kind + "_NAME")
223 if user_uc is not None:
224 user = user_uc.encode("utf-8")
225 email_uc = os.environ.get("GIT_" + kind + "_EMAIL")
226 if email_uc is not None:
227 email = email_uc.encode("utf-8")
228 if user is None:
229 try:
230 user = config.get(("user",), "name")
231 except KeyError:
232 user = None
233 if email is None:
234 try:
235 email = config.get(("user",), "email")
236 except KeyError:
237 email = None
238 default_user, default_email = _get_default_identity()
239 if user is None:
240 user = default_user.encode("utf-8")
241 if email is None:
242 email = default_email.encode("utf-8")
243 if email.startswith(b"<") and email.endswith(b">"):
244 email = email[1:-1]
245 return user + b" <" + email + b">"
248def check_user_identity(identity: bytes) -> None:
249 """Verify that a user identity is formatted correctly.
251 Args:
252 identity: User identity bytestring
253 Raises:
254 InvalidUserIdentity: Raised when identity is invalid
255 """
256 try:
257 fst, snd = identity.split(b" <", 1)
258 except ValueError as exc:
259 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc
260 if b">" not in snd:
261 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
262 if b"\0" in identity or b"\n" in identity:
263 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
266def parse_graftpoints(
267 graftpoints: Iterable[bytes],
268) -> dict[bytes, list[bytes]]:
269 """Convert a list of graftpoints into a dict.
271 Args:
272 graftpoints: Iterator of graftpoint lines
274 Each line is formatted as:
275 <commit sha1> <parent sha1> [<parent sha1>]*
277 Resulting dictionary is:
278 <commit sha1>: [<parent sha1>*]
280 https://git.wiki.kernel.org/index.php/GraftPoint
281 """
282 grafts = {}
283 for line in graftpoints:
284 raw_graft = line.split(None, 1)
286 commit = raw_graft[0]
287 if len(raw_graft) == 2:
288 parents = raw_graft[1].split()
289 else:
290 parents = []
292 for sha in [commit, *parents]:
293 check_hexsha(sha, "Invalid graftpoint")
295 grafts[commit] = parents
296 return grafts
299def serialize_graftpoints(graftpoints: dict[bytes, list[bytes]]) -> bytes:
300 """Convert a dictionary of grafts into string.
302 The graft dictionary is:
303 <commit sha1>: [<parent sha1>*]
305 Each line is formatted as:
306 <commit sha1> <parent sha1> [<parent sha1>]*
308 https://git.wiki.kernel.org/index.php/GraftPoint
310 """
311 graft_lines = []
312 for commit, parents in graftpoints.items():
313 if parents:
314 graft_lines.append(commit + b" " + b" ".join(parents))
315 else:
316 graft_lines.append(commit)
317 return b"\n".join(graft_lines)
320def _set_filesystem_hidden(path: str) -> None:
321 """Mark path as to be hidden if supported by platform and filesystem.
323 On win32 uses SetFileAttributesW api:
324 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw>
325 """
326 if sys.platform == "win32":
327 import ctypes
328 from ctypes.wintypes import BOOL, DWORD, LPCWSTR
330 FILE_ATTRIBUTE_HIDDEN = 2
331 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)(
332 ("SetFileAttributesW", ctypes.windll.kernel32)
333 )
335 if isinstance(path, bytes):
336 path = os.fsdecode(path)
337 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN):
338 pass # Could raise or log `ctypes.WinError()` here
340 # Could implement other platform specific filesystem hiding here
343class ParentsProvider:
344 """Provider for commit parent information."""
346 def __init__(
347 self,
348 store: "BaseObjectStore",
349 grafts: dict = {},
350 shallows: Iterable[bytes] = [],
351 ) -> None:
352 """Initialize ParentsProvider.
354 Args:
355 store: Object store to use
356 grafts: Graft information
357 shallows: Shallow commit SHAs
358 """
359 self.store = store
360 self.grafts = grafts
361 self.shallows = set(shallows)
363 # Get commit graph once at initialization for performance
364 self.commit_graph = store.get_commit_graph()
366 def get_parents(
367 self, commit_id: bytes, commit: Optional[Commit] = None
368 ) -> list[bytes]:
369 """Get parents for a commit using the parents provider."""
370 try:
371 return self.grafts[commit_id]
372 except KeyError:
373 pass
374 if commit_id in self.shallows:
375 return []
377 # Try to use commit graph for faster parent lookup
378 if self.commit_graph:
379 parents = self.commit_graph.get_parents(commit_id)
380 if parents is not None:
381 return parents
383 # Fallback to reading the commit object
384 if commit is None:
385 obj = self.store[commit_id]
386 assert isinstance(obj, Commit)
387 commit = obj
388 return commit.parents
391class BaseRepo:
392 """Base class for a git repository.
394 This base class is meant to be used for Repository implementations that e.g.
395 work on top of a different transport than a standard filesystem path.
397 Attributes:
398 object_store: Dictionary-like object for accessing
399 the objects
400 refs: Dictionary-like object with the refs in this
401 repository
402 """
404 def __init__(self, object_store: PackBasedObjectStore, refs: RefsContainer) -> None:
405 """Open a repository.
407 This shouldn't be called directly, but rather through one of the
408 base classes, such as MemoryRepo or Repo.
410 Args:
411 object_store: Object store to use
412 refs: Refs container to use
413 """
414 self.object_store = object_store
415 self.refs = refs
417 self._graftpoints: dict[bytes, list[bytes]] = {}
418 self.hooks: dict[str, Hook] = {}
420 def _determine_file_mode(self) -> bool:
421 """Probe the file-system to determine whether permissions can be trusted.
423 Returns: True if permissions can be trusted, False otherwise.
424 """
425 raise NotImplementedError(self._determine_file_mode)
427 def _determine_symlinks(self) -> bool:
428 """Probe the filesystem to determine whether symlinks can be created.
430 Returns: True if symlinks can be created, False otherwise.
431 """
432 # For now, just mimic the old behaviour
433 return sys.platform != "win32"
435 def _init_files(
436 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None
437 ) -> None:
438 """Initialize a default set of named files."""
439 from .config import ConfigFile
441 self._put_named_file("description", b"Unnamed repository")
442 f = BytesIO()
443 cf = ConfigFile()
444 if format is None:
445 format = 0
446 if format not in (0, 1):
447 raise ValueError(f"Unsupported repository format version: {format}")
448 cf.set("core", "repositoryformatversion", str(format))
449 if self._determine_file_mode():
450 cf.set("core", "filemode", True)
451 else:
452 cf.set("core", "filemode", False)
454 if symlinks is None and not bare:
455 symlinks = self._determine_symlinks()
457 if symlinks is False:
458 cf.set("core", "symlinks", symlinks)
460 cf.set("core", "bare", bare)
461 cf.set("core", "logallrefupdates", True)
462 cf.write_to_file(f)
463 self._put_named_file("config", f.getvalue())
464 self._put_named_file(os.path.join("info", "exclude"), b"")
466 def get_named_file(self, path: str) -> Optional[BinaryIO]:
467 """Get a file from the control dir with a specific name.
469 Although the filename should be interpreted as a filename relative to
470 the control dir in a disk-based Repo, the object returned need not be
471 pointing to a file in that location.
473 Args:
474 path: The path to the file, relative to the control dir.
475 Returns: An open file object, or None if the file does not exist.
476 """
477 raise NotImplementedError(self.get_named_file)
479 def _put_named_file(self, path: str, contents: bytes) -> None:
480 """Write a file to the control dir with the given name and contents.
482 Args:
483 path: The path to the file, relative to the control dir.
484 contents: A string to write to the file.
485 """
486 raise NotImplementedError(self._put_named_file)
488 def _del_named_file(self, path: str) -> None:
489 """Delete a file in the control directory with the given name."""
490 raise NotImplementedError(self._del_named_file)
492 def open_index(self) -> "Index":
493 """Open the index for this repository.
495 Raises:
496 NoIndexPresent: If no index is present
497 Returns: The matching `Index`
498 """
499 raise NotImplementedError(self.open_index)
501 def fetch(
502 self,
503 target: "BaseRepo",
504 determine_wants: Optional[Callable] = None,
505 progress: Optional[Callable] = None,
506 depth: Optional[int] = None,
507 ) -> dict:
508 """Fetch objects into another repository.
510 Args:
511 target: The target repository
512 determine_wants: Optional function to determine what refs to
513 fetch.
514 progress: Optional progress function
515 depth: Optional shallow fetch depth
516 Returns: The local refs
517 """
518 if determine_wants is None:
519 determine_wants = target.object_store.determine_wants_all
520 count, pack_data = self.fetch_pack_data(
521 determine_wants,
522 target.get_graph_walker(),
523 progress=progress,
524 depth=depth,
525 )
526 target.object_store.add_pack_data(count, pack_data, progress)
527 return self.get_refs()
529 def fetch_pack_data(
530 self,
531 determine_wants: Callable,
532 graph_walker: "GraphWalker",
533 progress: Optional[Callable],
534 *,
535 get_tagged: Optional[Callable] = None,
536 depth: Optional[int] = None,
537 ) -> tuple:
538 """Fetch the pack data required for a set of revisions.
540 Args:
541 determine_wants: Function that takes a dictionary with heads
542 and returns the list of heads to fetch.
543 graph_walker: Object that can iterate over the list of revisions
544 to fetch and has an "ack" method that will be called to acknowledge
545 that a revision is present.
546 progress: Simple progress function that will be called with
547 updated progress strings.
548 get_tagged: Function that returns a dict of pointed-to sha ->
549 tag sha for including tags.
550 depth: Shallow fetch depth
551 Returns: count and iterator over pack data
552 """
553 missing_objects = self.find_missing_objects(
554 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth
555 )
556 if missing_objects is None:
557 return 0, iter([])
558 remote_has = missing_objects.get_remote_has()
559 object_ids = list(missing_objects)
560 return len(object_ids), generate_unpacked_objects(
561 self.object_store, object_ids, progress=progress, other_haves=remote_has
562 )
564 def find_missing_objects(
565 self,
566 determine_wants: Callable,
567 graph_walker: "GraphWalker",
568 progress: Optional[Callable],
569 *,
570 get_tagged: Optional[Callable] = None,
571 depth: Optional[int] = None,
572 ) -> Optional[MissingObjectFinder]:
573 """Fetch the missing objects required for a set of revisions.
575 Args:
576 determine_wants: Function that takes a dictionary with heads
577 and returns the list of heads to fetch.
578 graph_walker: Object that can iterate over the list of revisions
579 to fetch and has an "ack" method that will be called to acknowledge
580 that a revision is present.
581 progress: Simple progress function that will be called with
582 updated progress strings.
583 get_tagged: Function that returns a dict of pointed-to sha ->
584 tag sha for including tags.
585 depth: Shallow fetch depth
586 Returns: iterator over objects, with __len__ implemented
587 """
588 refs = serialize_refs(self.object_store, self.get_refs())
590 wants = determine_wants(refs)
591 if not isinstance(wants, list):
592 raise TypeError("determine_wants() did not return a list")
594 current_shallow = set(getattr(graph_walker, "shallow", set()))
596 if depth not in (None, 0):
597 assert depth is not None
598 shallow, not_shallow = find_shallow(self.object_store, wants, depth)
599 # Only update if graph_walker has shallow attribute
600 if hasattr(graph_walker, "shallow"):
601 graph_walker.shallow.update(shallow - not_shallow)
602 new_shallow = graph_walker.shallow - current_shallow
603 unshallow = graph_walker.unshallow = not_shallow & current_shallow # type: ignore[attr-defined]
604 if hasattr(graph_walker, "update_shallow"):
605 graph_walker.update_shallow(new_shallow, unshallow)
606 else:
607 unshallow = getattr(graph_walker, "unshallow", set())
609 if wants == []:
610 # TODO(dborowitz): find a way to short-circuit that doesn't change
611 # this interface.
613 if getattr(graph_walker, "shallow", set()) or unshallow:
614 # Do not send a pack in shallow short-circuit path
615 return None
617 class DummyMissingObjectFinder:
618 """Dummy finder that returns no missing objects."""
620 def get_remote_has(self) -> None:
621 """Get remote has (always returns None).
623 Returns:
624 None
625 """
626 return None
628 def __len__(self) -> int:
629 return 0
631 def __iter__(self) -> Iterator[tuple[bytes, Optional[bytes]]]:
632 yield from []
634 return DummyMissingObjectFinder() # type: ignore
636 # If the graph walker is set up with an implementation that can
637 # ACK/NAK to the wire, it will write data to the client through
638 # this call as a side-effect.
639 haves = self.object_store.find_common_revisions(graph_walker)
641 # Deal with shallow requests separately because the haves do
642 # not reflect what objects are missing
643 if getattr(graph_walker, "shallow", set()) or unshallow:
644 # TODO: filter the haves commits from iter_shas. the specific
645 # commits aren't missing.
646 haves = []
648 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow)
650 def get_parents(commit: Commit) -> list[bytes]:
651 """Get parents for a commit using the parents provider.
653 Args:
654 commit: Commit object
656 Returns:
657 List of parent commit SHAs
658 """
659 return parents_provider.get_parents(commit.id, commit)
661 return MissingObjectFinder(
662 self.object_store,
663 haves=haves,
664 wants=wants,
665 shallow=getattr(graph_walker, "shallow", set()),
666 progress=progress,
667 get_tagged=get_tagged,
668 get_parents=get_parents,
669 )
671 def generate_pack_data(
672 self,
673 have: Iterable[ObjectID],
674 want: Iterable[ObjectID],
675 progress: Optional[Callable[[str], None]] = None,
676 ofs_delta: Optional[bool] = None,
677 ) -> tuple[int, Iterator["UnpackedObject"]]:
678 """Generate pack data objects for a set of wants/haves.
680 Args:
681 have: List of SHA1s of objects that should not be sent
682 want: List of SHA1s of objects that should be sent
683 ofs_delta: Whether OFS deltas can be included
684 progress: Optional progress reporting method
685 """
686 return self.object_store.generate_pack_data(
687 have,
688 want,
689 shallow=self.get_shallow(),
690 progress=progress,
691 ofs_delta=ofs_delta,
692 )
694 def get_graph_walker(
695 self, heads: Optional[list[ObjectID]] = None
696 ) -> ObjectStoreGraphWalker:
697 """Retrieve a graph walker.
699 A graph walker is used by a remote repository (or proxy)
700 to find out which objects are present in this repository.
702 Args:
703 heads: Repository heads to use (optional)
704 Returns: A graph walker object
705 """
706 if heads is None:
707 heads = [
708 sha
709 for sha in self.refs.as_dict(b"refs/heads").values()
710 if sha in self.object_store
711 ]
712 parents_provider = ParentsProvider(self.object_store)
713 return ObjectStoreGraphWalker(
714 heads,
715 parents_provider.get_parents,
716 shallow=self.get_shallow(),
717 update_shallow=self.update_shallow,
718 )
720 def get_refs(self) -> dict[bytes, bytes]:
721 """Get dictionary with all refs.
723 Returns: A ``dict`` mapping ref names to SHA1s
724 """
725 return self.refs.as_dict()
727 def head(self) -> bytes:
728 """Return the SHA1 pointed at by HEAD."""
729 # TODO: move this method to WorkTree
730 return self.refs[b"HEAD"]
732 def _get_object(self, sha: bytes, cls: type[T]) -> T:
733 assert len(sha) in (20, 40)
734 ret = self.get_object(sha)
735 if not isinstance(ret, cls):
736 if cls is Commit:
737 raise NotCommitError(ret.id)
738 elif cls is Blob:
739 raise NotBlobError(ret.id)
740 elif cls is Tree:
741 raise NotTreeError(ret.id)
742 elif cls is Tag:
743 raise NotTagError(ret.id)
744 else:
745 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}")
746 return ret
748 def get_object(self, sha: bytes) -> ShaFile:
749 """Retrieve the object with the specified SHA.
751 Args:
752 sha: SHA to retrieve
753 Returns: A ShaFile object
754 Raises:
755 KeyError: when the object can not be found
756 """
757 return self.object_store[sha]
759 def parents_provider(self) -> ParentsProvider:
760 """Get a parents provider for this repository.
762 Returns:
763 ParentsProvider instance configured with grafts and shallows
764 """
765 return ParentsProvider(
766 self.object_store,
767 grafts=self._graftpoints,
768 shallows=self.get_shallow(),
769 )
771 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]:
772 """Retrieve the parents of a specific commit.
774 If the specific commit is a graftpoint, the graft parents
775 will be returned instead.
777 Args:
778 sha: SHA of the commit for which to retrieve the parents
779 commit: Optional commit matching the sha
780 Returns: List of parents
781 """
782 return self.parents_provider().get_parents(sha, commit)
784 def get_config(self) -> "ConfigFile":
785 """Retrieve the config object.
787 Returns: `ConfigFile` object for the ``.git/config`` file.
788 """
789 raise NotImplementedError(self.get_config)
791 def get_worktree_config(self) -> "ConfigFile":
792 """Retrieve the worktree config object."""
793 raise NotImplementedError(self.get_worktree_config)
795 def get_description(self) -> Optional[str]:
796 """Retrieve the description for this repository.
798 Returns: String with the description of the repository
799 as set by the user.
800 """
801 raise NotImplementedError(self.get_description)
803 def set_description(self, description: bytes) -> None:
804 """Set the description for this repository.
806 Args:
807 description: Text to set as description for this repository.
808 """
809 raise NotImplementedError(self.set_description)
811 def get_rebase_state_manager(self) -> "RebaseStateManager":
812 """Get the appropriate rebase state manager for this repository.
814 Returns: RebaseStateManager instance
815 """
816 raise NotImplementedError(self.get_rebase_state_manager)
818 def get_blob_normalizer(self) -> "BlobNormalizer":
819 """Return a BlobNormalizer object for checkin/checkout operations.
821 Returns: BlobNormalizer instance
822 """
823 raise NotImplementedError(self.get_blob_normalizer)
825 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
826 """Read gitattributes for the repository.
828 Args:
829 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
831 Returns:
832 GitAttributes object that can be used to match paths
833 """
834 raise NotImplementedError(self.get_gitattributes)
836 def get_config_stack(self) -> "StackedConfig":
837 """Return a config stack for this repository.
839 This stack accesses the configuration for both this repository
840 itself (.git/config) and the global configuration, which usually
841 lives in ~/.gitconfig.
843 Returns: `Config` instance for this repository
844 """
845 from .config import ConfigFile, StackedConfig
847 local_config = self.get_config()
848 backends: list[ConfigFile] = [local_config]
849 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
850 backends.append(self.get_worktree_config())
852 backends += StackedConfig.default_backends()
853 return StackedConfig(backends, writable=local_config)
855 def get_shallow(self) -> set[ObjectID]:
856 """Get the set of shallow commits.
858 Returns: Set of shallow commits.
859 """
860 f = self.get_named_file("shallow")
861 if f is None:
862 return set()
863 with f:
864 return {line.strip() for line in f}
866 def update_shallow(
867 self, new_shallow: Optional[set[bytes]], new_unshallow: Optional[set[bytes]]
868 ) -> None:
869 """Update the list of shallow objects.
871 Args:
872 new_shallow: Newly shallow objects
873 new_unshallow: Newly no longer shallow objects
874 """
875 shallow = self.get_shallow()
876 if new_shallow:
877 shallow.update(new_shallow)
878 if new_unshallow:
879 shallow.difference_update(new_unshallow)
880 if shallow:
881 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow]))
882 else:
883 self._del_named_file("shallow")
885 def get_peeled(self, ref: Ref) -> ObjectID:
886 """Get the peeled value of a ref.
888 Args:
889 ref: The refname to peel.
890 Returns: The fully-peeled SHA1 of a tag object, after peeling all
891 intermediate tags; if the original ref does not point to a tag,
892 this will equal the original SHA1.
893 """
894 cached = self.refs.get_peeled(ref)
895 if cached is not None:
896 return cached
897 return peel_sha(self.object_store, self.refs[ref])[1].id
899 @property
900 def notes(self) -> "Notes":
901 """Access notes functionality for this repository.
903 Returns:
904 Notes object for accessing notes
905 """
906 from .notes import Notes
908 return Notes(self.object_store, self.refs)
910 def get_walker(self, include: Optional[list[bytes]] = None, **kwargs) -> "Walker":
911 """Obtain a walker for this repository.
913 Args:
914 include: Iterable of SHAs of commits to include along with their
915 ancestors. Defaults to [HEAD]
916 **kwargs: Additional keyword arguments including:
918 * exclude: Iterable of SHAs of commits to exclude along with their
919 ancestors, overriding includes.
920 * order: ORDER_* constant specifying the order of results.
921 Anything other than ORDER_DATE may result in O(n) memory usage.
922 * reverse: If True, reverse the order of output, requiring O(n)
923 memory.
924 * max_entries: The maximum number of entries to yield, or None for
925 no limit.
926 * paths: Iterable of file or subtree paths to show entries for.
927 * rename_detector: diff.RenameDetector object for detecting
928 renames.
929 * follow: If True, follow path across renames/copies. Forces a
930 default rename_detector.
931 * since: Timestamp to list commits after.
932 * until: Timestamp to list commits before.
933 * queue_cls: A class to use for a queue of commits, supporting the
934 iterator protocol. The constructor takes a single argument, the Walker.
936 Returns: A `Walker` object
937 """
938 from .walk import Walker
940 if include is None:
941 include = [self.head()]
943 kwargs["get_parents"] = lambda commit: self.get_parents(commit.id, commit)
945 return Walker(self.object_store, include, **kwargs)
947 def __getitem__(self, name: Union[ObjectID, Ref]) -> "ShaFile":
948 """Retrieve a Git object by SHA1 or ref.
950 Args:
951 name: A Git object SHA1 or a ref name
952 Returns: A `ShaFile` object, such as a Commit or Blob
953 Raises:
954 KeyError: when the specified ref or object does not exist
955 """
956 if not isinstance(name, bytes):
957 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}")
958 if len(name) in (20, 40):
959 try:
960 return self.object_store[name]
961 except (KeyError, ValueError):
962 pass
963 try:
964 return self.object_store[self.refs[name]]
965 except RefFormatError as exc:
966 raise KeyError(name) from exc
968 def __contains__(self, name: bytes) -> bool:
969 """Check if a specific Git object or ref is present.
971 Args:
972 name: Git object SHA1 or ref name
973 """
974 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)):
975 return name in self.object_store or name in self.refs
976 else:
977 return name in self.refs
979 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None:
980 """Set a ref.
982 Args:
983 name: ref name
984 value: Ref value - either a ShaFile object, or a hex sha
985 """
986 if name.startswith(b"refs/") or name == b"HEAD":
987 if isinstance(value, ShaFile):
988 self.refs[name] = value.id
989 elif isinstance(value, bytes):
990 self.refs[name] = value
991 else:
992 raise TypeError(value)
993 else:
994 raise ValueError(name)
996 def __delitem__(self, name: bytes) -> None:
997 """Remove a ref.
999 Args:
1000 name: Name of the ref to remove
1001 """
1002 if name.startswith(b"refs/") or name == b"HEAD":
1003 del self.refs[name]
1004 else:
1005 raise ValueError(name)
1007 def _get_user_identity(
1008 self, config: "StackedConfig", kind: Optional[str] = None
1009 ) -> bytes:
1010 """Determine the identity to use for new commits."""
1011 warnings.warn(
1012 "use get_user_identity() rather than Repo._get_user_identity",
1013 DeprecationWarning,
1014 )
1015 return get_user_identity(config)
1017 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None:
1018 """Add or modify graftpoints.
1020 Args:
1021 updated_graftpoints: Dict of commit shas to list of parent shas
1022 """
1023 # Simple validation
1024 for commit, parents in updated_graftpoints.items():
1025 for sha in [commit, *parents]:
1026 check_hexsha(sha, "Invalid graftpoint")
1028 self._graftpoints.update(updated_graftpoints)
1030 def _remove_graftpoints(self, to_remove: list[bytes] = []) -> None:
1031 """Remove graftpoints.
1033 Args:
1034 to_remove: List of commit shas
1035 """
1036 for sha in to_remove:
1037 del self._graftpoints[sha]
1039 def _read_heads(self, name: str) -> list[bytes]:
1040 f = self.get_named_file(name)
1041 if f is None:
1042 return []
1043 with f:
1044 return [line.strip() for line in f.readlines() if line.strip()]
1046 def get_worktree(self) -> "WorkTree":
1047 """Get the working tree for this repository.
1049 Returns:
1050 WorkTree instance for performing working tree operations
1052 Raises:
1053 NotImplementedError: If the repository doesn't support working trees
1054 """
1055 raise NotImplementedError(
1056 "Working tree operations not supported by this repository type"
1057 )
1059 @replace_me(remove_in="0.26.0")
1060 def do_commit(
1061 self,
1062 message: Optional[bytes] = None,
1063 committer: Optional[bytes] = None,
1064 author: Optional[bytes] = None,
1065 commit_timestamp: Optional[float] = None,
1066 commit_timezone: Optional[int] = None,
1067 author_timestamp: Optional[float] = None,
1068 author_timezone: Optional[int] = None,
1069 tree: Optional[ObjectID] = None,
1070 encoding: Optional[bytes] = None,
1071 ref: Optional[Ref] = b"HEAD",
1072 merge_heads: Optional[list[ObjectID]] = None,
1073 no_verify: bool = False,
1074 sign: bool = False,
1075 ) -> bytes:
1076 """Create a new commit.
1078 If not specified, committer and author default to
1079 get_user_identity(..., 'COMMITTER')
1080 and get_user_identity(..., 'AUTHOR') respectively.
1082 Args:
1083 message: Commit message (bytes or callable that takes (repo, commit)
1084 and returns bytes)
1085 committer: Committer fullname
1086 author: Author fullname
1087 commit_timestamp: Commit timestamp (defaults to now)
1088 commit_timezone: Commit timestamp timezone (defaults to GMT)
1089 author_timestamp: Author timestamp (defaults to commit
1090 timestamp)
1091 author_timezone: Author timestamp timezone
1092 (defaults to commit timestamp timezone)
1093 tree: SHA1 of the tree root to use (if not specified the
1094 current index will be committed).
1095 encoding: Encoding
1096 ref: Optional ref to commit to (defaults to current branch).
1097 If None, creates a dangling commit without updating any ref.
1098 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
1099 no_verify: Skip pre-commit and commit-msg hooks
1100 sign: GPG Sign the commit (bool, defaults to False,
1101 pass True to use default GPG key,
1102 pass a str containing Key ID to use a specific GPG key)
1104 Returns:
1105 New commit SHA1
1106 """
1107 return self.get_worktree().commit(
1108 message=message,
1109 committer=committer,
1110 author=author,
1111 commit_timestamp=commit_timestamp,
1112 commit_timezone=commit_timezone,
1113 author_timestamp=author_timestamp,
1114 author_timezone=author_timezone,
1115 tree=tree,
1116 encoding=encoding,
1117 ref=ref,
1118 merge_heads=merge_heads,
1119 no_verify=no_verify,
1120 sign=sign,
1121 )
1124def read_gitfile(f: BinaryIO) -> str:
1125 """Read a ``.git`` file.
1127 The first line of the file should start with "gitdir: "
1129 Args:
1130 f: File-like object to read from
1131 Returns: A path
1132 """
1133 cs = f.read()
1134 if not cs.startswith(b"gitdir: "):
1135 raise ValueError("Expected file to start with 'gitdir: '")
1136 return cs[len(b"gitdir: ") :].rstrip(b"\n").decode("utf-8")
1139class UnsupportedVersion(Exception):
1140 """Unsupported repository version."""
1142 def __init__(self, version) -> None:
1143 """Initialize UnsupportedVersion exception.
1145 Args:
1146 version: The unsupported repository version
1147 """
1148 self.version = version
1151class UnsupportedExtension(Exception):
1152 """Unsupported repository extension."""
1154 def __init__(self, extension) -> None:
1155 """Initialize UnsupportedExtension exception.
1157 Args:
1158 extension: The unsupported repository extension
1159 """
1160 self.extension = extension
1163class Repo(BaseRepo):
1164 """A git repository backed by local disk.
1166 To open an existing repository, call the constructor with
1167 the path of the repository.
1169 To create a new repository, use the Repo.init class method.
1171 Note that a repository object may hold on to resources such
1172 as file handles for performance reasons; call .close() to free
1173 up those resources.
1175 Attributes:
1176 path: Path to the working copy (if it exists) or repository control
1177 directory (if the repository is bare)
1178 bare: Whether this is a bare repository
1179 """
1181 path: str
1182 bare: bool
1183 object_store: DiskObjectStore
1185 def __init__(
1186 self,
1187 root: Union[str, bytes, os.PathLike],
1188 object_store: Optional[PackBasedObjectStore] = None,
1189 bare: Optional[bool] = None,
1190 ) -> None:
1191 """Open a repository on disk.
1193 Args:
1194 root: Path to the repository's root.
1195 object_store: ObjectStore to use; if omitted, we use the
1196 repository's default object store
1197 bare: True if this is a bare repository.
1198 """
1199 root = os.fspath(root)
1200 if isinstance(root, bytes):
1201 root = os.fsdecode(root)
1202 hidden_path = os.path.join(root, CONTROLDIR)
1203 if bare is None:
1204 if os.path.isfile(hidden_path) or os.path.isdir(
1205 os.path.join(hidden_path, OBJECTDIR)
1206 ):
1207 bare = False
1208 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(
1209 os.path.join(root, REFSDIR)
1210 ):
1211 bare = True
1212 else:
1213 raise NotGitRepository(
1214 "No git repository was found at {path}".format(**dict(path=root))
1215 )
1217 self.bare = bare
1218 if bare is False:
1219 if os.path.isfile(hidden_path):
1220 with open(hidden_path, "rb") as f:
1221 path = read_gitfile(f)
1222 self._controldir = os.path.join(root, path)
1223 else:
1224 self._controldir = hidden_path
1225 else:
1226 self._controldir = root
1227 commondir = self.get_named_file(COMMONDIR)
1228 if commondir is not None:
1229 with commondir:
1230 self._commondir = os.path.join(
1231 self.controldir(),
1232 os.fsdecode(commondir.read().rstrip(b"\r\n")),
1233 )
1234 else:
1235 self._commondir = self._controldir
1236 self.path = root
1238 # Initialize refs early so they're available for config condition matchers
1239 self.refs = DiskRefsContainer(
1240 self.commondir(), self._controldir, logger=self._write_reflog
1241 )
1243 # Initialize worktrees container
1244 from .worktree import WorkTreeContainer
1246 self.worktrees = WorkTreeContainer(self)
1248 config = self.get_config()
1249 try:
1250 repository_format_version = config.get("core", "repositoryformatversion")
1251 format_version = (
1252 0
1253 if repository_format_version is None
1254 else int(repository_format_version)
1255 )
1256 except KeyError:
1257 format_version = 0
1259 if format_version not in (0, 1):
1260 raise UnsupportedVersion(format_version)
1262 # Track extensions we encounter
1263 has_reftable_extension = False
1264 for extension, value in config.items((b"extensions",)):
1265 if extension.lower() == b"refstorage":
1266 if value == b"reftable":
1267 has_reftable_extension = True
1268 else:
1269 raise UnsupportedExtension(f"refStorage = {value.decode()}")
1270 elif extension.lower() not in (b"worktreeconfig",):
1271 raise UnsupportedExtension(extension)
1273 if object_store is None:
1274 object_store = DiskObjectStore.from_config(
1275 os.path.join(self.commondir(), OBJECTDIR), config
1276 )
1278 # Use reftable if extension is configured
1279 if has_reftable_extension:
1280 from .reftable import ReftableRefsContainer
1282 self.refs = ReftableRefsContainer(self.commondir())
1283 # Update worktrees container after refs change
1284 self.worktrees = WorkTreeContainer(self)
1285 BaseRepo.__init__(self, object_store, self.refs)
1287 self._graftpoints = {}
1288 graft_file = self.get_named_file(
1289 os.path.join("info", "grafts"), basedir=self.commondir()
1290 )
1291 if graft_file:
1292 with graft_file:
1293 self._graftpoints.update(parse_graftpoints(graft_file))
1294 graft_file = self.get_named_file("shallow", basedir=self.commondir())
1295 if graft_file:
1296 with graft_file:
1297 self._graftpoints.update(parse_graftpoints(graft_file))
1299 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir())
1300 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir())
1301 self.hooks["post-commit"] = PostCommitShellHook(self.controldir())
1302 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir())
1304 # Initialize filter context as None, will be created lazily
1305 self.filter_context = None
1307 def get_worktree(self) -> "WorkTree":
1308 """Get the working tree for this repository.
1310 Returns:
1311 WorkTree instance for performing working tree operations
1312 """
1313 from .worktree import WorkTree
1315 return WorkTree(self, self.path)
1317 def _write_reflog(
1318 self, ref, old_sha, new_sha, committer, timestamp, timezone, message
1319 ) -> None:
1320 from .reflog import format_reflog_line
1322 path = self._reflog_path(ref)
1323 try:
1324 os.makedirs(os.path.dirname(path))
1325 except FileExistsError:
1326 pass
1327 if committer is None:
1328 config = self.get_config_stack()
1329 committer = get_user_identity(config)
1330 check_user_identity(committer)
1331 if timestamp is None:
1332 timestamp = int(time.time())
1333 if timezone is None:
1334 timezone = 0 # FIXME
1335 with open(path, "ab") as f:
1336 f.write(
1337 format_reflog_line(
1338 old_sha, new_sha, committer, timestamp, timezone, message
1339 )
1340 + b"\n"
1341 )
1343 def _reflog_path(self, ref: bytes) -> str:
1344 if ref.startswith((b"main-worktree/", b"worktrees/")):
1345 raise NotImplementedError(f"refs {ref.decode()} are not supported")
1347 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir()
1348 return os.path.join(base, "logs", os.fsdecode(ref))
1350 def read_reflog(self, ref):
1351 """Read reflog entries for a reference.
1353 Args:
1354 ref: Reference name (e.g. b'HEAD', b'refs/heads/master')
1356 Yields:
1357 reflog.Entry objects in chronological order (oldest first)
1358 """
1359 from .reflog import read_reflog
1361 path = self._reflog_path(ref)
1362 try:
1363 with open(path, "rb") as f:
1364 yield from read_reflog(f)
1365 except FileNotFoundError:
1366 return
1368 @classmethod
1369 def discover(cls, start="."):
1370 """Iterate parent directories to discover a repository.
1372 Return a Repo object for the first parent directory that looks like a
1373 Git repository.
1375 Args:
1376 start: The directory to start discovery from (defaults to '.')
1377 """
1378 remaining = True
1379 path = os.path.abspath(start)
1380 while remaining:
1381 try:
1382 return cls(path)
1383 except NotGitRepository:
1384 path, remaining = os.path.split(path)
1385 raise NotGitRepository(
1386 "No git repository was found at {path}".format(**dict(path=start))
1387 )
1389 def controldir(self) -> str:
1390 """Return the path of the control directory."""
1391 return self._controldir
1393 def commondir(self) -> str:
1394 """Return the path of the common directory.
1396 For a main working tree, it is identical to controldir().
1398 For a linked working tree, it is the control directory of the
1399 main working tree.
1400 """
1401 return self._commondir
1403 def _determine_file_mode(self) -> bool:
1404 """Probe the file-system to determine whether permissions can be trusted.
1406 Returns: True if permissions can be trusted, False otherwise.
1407 """
1408 fname = os.path.join(self.path, ".probe-permissions")
1409 with open(fname, "w") as f:
1410 f.write("")
1412 st1 = os.lstat(fname)
1413 try:
1414 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR)
1415 except PermissionError:
1416 return False
1417 st2 = os.lstat(fname)
1419 os.unlink(fname)
1421 mode_differs = st1.st_mode != st2.st_mode
1422 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0
1424 return mode_differs and st2_has_exec
1426 def _determine_symlinks(self) -> bool:
1427 """Probe the filesystem to determine whether symlinks can be created.
1429 Returns: True if symlinks can be created, False otherwise.
1430 """
1431 # TODO(jelmer): Actually probe disk / look at filesystem
1432 return sys.platform != "win32"
1434 def _put_named_file(self, path: str, contents: bytes) -> None:
1435 """Write a file to the control dir with the given name and contents.
1437 Args:
1438 path: The path to the file, relative to the control dir.
1439 contents: A string to write to the file.
1440 """
1441 path = path.lstrip(os.path.sep)
1442 with GitFile(os.path.join(self.controldir(), path), "wb") as f:
1443 f.write(contents)
1445 def _del_named_file(self, path: str) -> None:
1446 try:
1447 os.unlink(os.path.join(self.controldir(), path))
1448 except FileNotFoundError:
1449 return
1451 def get_named_file(self, path, basedir=None):
1452 """Get a file from the control dir with a specific name.
1454 Although the filename should be interpreted as a filename relative to
1455 the control dir in a disk-based Repo, the object returned need not be
1456 pointing to a file in that location.
1458 Args:
1459 path: The path to the file, relative to the control dir.
1460 basedir: Optional argument that specifies an alternative to the
1461 control dir.
1462 Returns: An open file object, or None if the file does not exist.
1463 """
1464 # TODO(dborowitz): sanitize filenames, since this is used directly by
1465 # the dumb web serving code.
1466 if basedir is None:
1467 basedir = self.controldir()
1468 path = path.lstrip(os.path.sep)
1469 try:
1470 return open(os.path.join(basedir, path), "rb")
1471 except FileNotFoundError:
1472 return None
1474 def index_path(self):
1475 """Return path to the index file."""
1476 return os.path.join(self.controldir(), INDEX_FILENAME)
1478 def open_index(self) -> "Index":
1479 """Open the index for this repository.
1481 Raises:
1482 NoIndexPresent: If no index is present
1483 Returns: The matching `Index`
1484 """
1485 from .index import Index
1487 if not self.has_index():
1488 raise NoIndexPresent
1490 # Check for manyFiles feature configuration
1491 config = self.get_config_stack()
1492 many_files = config.get_boolean(b"feature", b"manyFiles", False)
1493 skip_hash = False
1494 index_version = None
1496 if many_files:
1497 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true
1498 try:
1499 index_version_str = config.get(b"index", b"version")
1500 index_version = int(index_version_str)
1501 except KeyError:
1502 index_version = 4 # Default to version 4 for manyFiles
1503 skip_hash = config.get_boolean(b"index", b"skipHash", True)
1504 else:
1505 # Check for explicit index settings
1506 try:
1507 index_version_str = config.get(b"index", b"version")
1508 index_version = int(index_version_str)
1509 except KeyError:
1510 index_version = None
1511 skip_hash = config.get_boolean(b"index", b"skipHash", False)
1513 return Index(self.index_path(), skip_hash=skip_hash, version=index_version)
1515 def has_index(self) -> bool:
1516 """Check if an index is present."""
1517 # Bare repos must never have index files; non-bare repos may have a
1518 # missing index file, which is treated as empty.
1519 return not self.bare
1521 @replace_me(remove_in="0.26.0")
1522 def stage(
1523 self,
1524 fs_paths: Union[
1525 str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]]
1526 ],
1527 ) -> None:
1528 """Stage a set of paths.
1530 Args:
1531 fs_paths: List of paths, relative to the repository path
1532 """
1533 return self.get_worktree().stage(fs_paths)
1535 @replace_me(remove_in="0.26.0")
1536 def unstage(self, fs_paths: list[str]) -> None:
1537 """Unstage specific file in the index.
1539 Args:
1540 fs_paths: a list of files to unstage,
1541 relative to the repository path.
1542 """
1543 return self.get_worktree().unstage(fs_paths)
1545 def clone(
1546 self,
1547 target_path,
1548 *,
1549 mkdir=True,
1550 bare=False,
1551 origin=b"origin",
1552 checkout=None,
1553 branch=None,
1554 progress=None,
1555 depth: Optional[int] = None,
1556 symlinks=None,
1557 ) -> "Repo":
1558 """Clone this repository.
1560 Args:
1561 target_path: Target path
1562 mkdir: Create the target directory
1563 bare: Whether to create a bare repository
1564 checkout: Whether or not to check-out HEAD after cloning
1565 origin: Base name for refs in target repository
1566 cloned from this repository
1567 branch: Optional branch or tag to be used as HEAD in the new repository
1568 instead of this repository's HEAD.
1569 progress: Optional progress function
1570 depth: Depth at which to fetch
1571 symlinks: Symlinks setting (default to autodetect)
1572 Returns: Created repository as `Repo`
1573 """
1574 encoded_path = os.fsencode(self.path)
1576 if mkdir:
1577 os.mkdir(target_path)
1579 try:
1580 if not bare:
1581 target = Repo.init(target_path, symlinks=symlinks)
1582 if checkout is None:
1583 checkout = True
1584 else:
1585 if checkout:
1586 raise ValueError("checkout and bare are incompatible")
1587 target = Repo.init_bare(target_path)
1589 try:
1590 target_config = target.get_config()
1591 target_config.set((b"remote", origin), b"url", encoded_path)
1592 target_config.set(
1593 (b"remote", origin),
1594 b"fetch",
1595 b"+refs/heads/*:refs/remotes/" + origin + b"/*",
1596 )
1597 target_config.write_to_path()
1599 ref_message = b"clone: from " + encoded_path
1600 self.fetch(target, depth=depth)
1601 target.refs.import_refs(
1602 b"refs/remotes/" + origin,
1603 self.refs.as_dict(b"refs/heads"),
1604 message=ref_message,
1605 )
1606 target.refs.import_refs(
1607 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message
1608 )
1610 head_chain, origin_sha = self.refs.follow(b"HEAD")
1611 origin_head = head_chain[-1] if head_chain else None
1612 if origin_sha and not origin_head:
1613 # set detached HEAD
1614 target.refs[b"HEAD"] = origin_sha
1615 else:
1616 _set_origin_head(target.refs, origin, origin_head)
1617 head_ref = _set_default_branch(
1618 target.refs, origin, origin_head, branch, ref_message
1619 )
1621 # Update target head
1622 if head_ref:
1623 head = _set_head(target.refs, head_ref, ref_message)
1624 else:
1625 head = None
1627 if checkout and head is not None:
1628 target.get_worktree().reset_index()
1629 except BaseException:
1630 target.close()
1631 raise
1632 except BaseException:
1633 if mkdir:
1634 import shutil
1636 shutil.rmtree(target_path)
1637 raise
1638 return target
1640 @replace_me(remove_in="0.26.0")
1641 def reset_index(self, tree: Optional[bytes] = None):
1642 """Reset the index back to a specific tree.
1644 Args:
1645 tree: Tree SHA to reset to, None for current HEAD tree.
1646 """
1647 return self.get_worktree().reset_index(tree)
1649 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]:
1650 """Get condition matchers for includeIf conditions.
1652 Returns a dict of condition prefix to matcher function.
1653 """
1654 from pathlib import Path
1656 from .config import ConditionMatcher, match_glob_pattern
1658 # Add gitdir matchers
1659 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool:
1660 """Match gitdir against a pattern.
1662 Args:
1663 pattern: Pattern to match against
1664 case_sensitive: Whether to match case-sensitively
1666 Returns:
1667 True if gitdir matches pattern
1668 """
1669 # Handle relative patterns (starting with ./)
1670 if pattern.startswith("./"):
1671 # Can't handle relative patterns without config directory context
1672 return False
1674 # Normalize repository path
1675 try:
1676 repo_path = str(Path(self._controldir).resolve())
1677 except (OSError, ValueError):
1678 return False
1680 # Expand ~ in pattern and normalize
1681 pattern = os.path.expanduser(pattern)
1683 # Normalize pattern following Git's rules
1684 pattern = pattern.replace("\\", "/")
1685 if not pattern.startswith(("~/", "./", "/", "**")):
1686 # Check for Windows absolute path
1687 if len(pattern) >= 2 and pattern[1] == ":":
1688 pass
1689 else:
1690 pattern = "**/" + pattern
1691 if pattern.endswith("/"):
1692 pattern = pattern + "**"
1694 # Use the existing _match_gitdir_pattern function
1695 from .config import _match_gitdir_pattern
1697 pattern_bytes = pattern.encode("utf-8", errors="replace")
1698 repo_path_bytes = repo_path.encode("utf-8", errors="replace")
1700 return _match_gitdir_pattern(
1701 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive
1702 )
1704 # Add onbranch matcher
1705 def match_onbranch(pattern: str) -> bool:
1706 """Match current branch against a pattern.
1708 Args:
1709 pattern: Pattern to match against
1711 Returns:
1712 True if current branch matches pattern
1713 """
1714 try:
1715 # Get the current branch using refs
1716 ref_chain, _ = self.refs.follow(b"HEAD")
1717 head_ref = ref_chain[-1] # Get the final resolved ref
1718 except KeyError:
1719 pass
1720 else:
1721 if head_ref and head_ref.startswith(b"refs/heads/"):
1722 # Extract branch name from ref
1723 branch = head_ref[11:].decode("utf-8", errors="replace")
1724 return match_glob_pattern(branch, pattern)
1725 return False
1727 matchers: dict[str, ConditionMatcher] = {
1728 "onbranch:": match_onbranch,
1729 "gitdir:": lambda pattern: match_gitdir(pattern, True),
1730 "gitdir/i:": lambda pattern: match_gitdir(pattern, False),
1731 }
1733 return matchers
1735 def get_worktree_config(self) -> "ConfigFile":
1736 """Get the worktree-specific config.
1738 Returns:
1739 ConfigFile object for the worktree config
1740 """
1741 from .config import ConfigFile
1743 path = os.path.join(self.commondir(), "config.worktree")
1744 try:
1745 # Pass condition matchers for includeIf evaluation
1746 condition_matchers = self._get_config_condition_matchers()
1747 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1748 except FileNotFoundError:
1749 cf = ConfigFile()
1750 cf.path = path
1751 return cf
1753 def get_config(self) -> "ConfigFile":
1754 """Retrieve the config object.
1756 Returns: `ConfigFile` object for the ``.git/config`` file.
1757 """
1758 from .config import ConfigFile
1760 path = os.path.join(self._commondir, "config")
1761 try:
1762 # Pass condition matchers for includeIf evaluation
1763 condition_matchers = self._get_config_condition_matchers()
1764 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1765 except FileNotFoundError:
1766 ret = ConfigFile()
1767 ret.path = path
1768 return ret
1770 def get_rebase_state_manager(self):
1771 """Get the appropriate rebase state manager for this repository.
1773 Returns: DiskRebaseStateManager instance
1774 """
1775 import os
1777 from .rebase import DiskRebaseStateManager
1779 path = os.path.join(self.controldir(), "rebase-merge")
1780 return DiskRebaseStateManager(path)
1782 def get_description(self):
1783 """Retrieve the description of this repository.
1785 Returns: A string describing the repository or None.
1786 """
1787 path = os.path.join(self._controldir, "description")
1788 try:
1789 with GitFile(path, "rb") as f:
1790 return f.read()
1791 except FileNotFoundError:
1792 return None
1794 def __repr__(self) -> str:
1795 """Return string representation of this repository."""
1796 return f"<Repo at {self.path!r}>"
1798 def set_description(self, description) -> None:
1799 """Set the description for this repository.
1801 Args:
1802 description: Text to set as description for this repository.
1803 """
1804 self._put_named_file("description", description)
1806 @classmethod
1807 def _init_maybe_bare(
1808 cls,
1809 path: Union[str, bytes, os.PathLike],
1810 controldir: Union[str, bytes, os.PathLike],
1811 bare,
1812 object_store=None,
1813 config=None,
1814 default_branch=None,
1815 symlinks: Optional[bool] = None,
1816 format: Optional[int] = None,
1817 ):
1818 path = os.fspath(path)
1819 if isinstance(path, bytes):
1820 path = os.fsdecode(path)
1821 controldir = os.fspath(controldir)
1822 if isinstance(controldir, bytes):
1823 controldir = os.fsdecode(controldir)
1824 for d in BASE_DIRECTORIES:
1825 os.mkdir(os.path.join(controldir, *d))
1826 if object_store is None:
1827 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR))
1828 ret = cls(path, bare=bare, object_store=object_store)
1829 if default_branch is None:
1830 if config is None:
1831 from .config import StackedConfig
1833 config = StackedConfig.default()
1834 try:
1835 default_branch = config.get("init", "defaultBranch")
1836 except KeyError:
1837 default_branch = DEFAULT_BRANCH
1838 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch)
1839 ret._init_files(bare=bare, symlinks=symlinks, format=format)
1840 return ret
1842 @classmethod
1843 def init(
1844 cls,
1845 path: Union[str, bytes, os.PathLike],
1846 *,
1847 mkdir: bool = False,
1848 config=None,
1849 default_branch=None,
1850 symlinks: Optional[bool] = None,
1851 format: Optional[int] = None,
1852 ) -> "Repo":
1853 """Create a new repository.
1855 Args:
1856 path: Path in which to create the repository
1857 mkdir: Whether to create the directory
1858 config: Configuration object
1859 default_branch: Default branch name
1860 symlinks: Whether to support symlinks
1861 format: Repository format version (defaults to 0)
1862 Returns: `Repo` instance
1863 """
1864 path = os.fspath(path)
1865 if isinstance(path, bytes):
1866 path = os.fsdecode(path)
1867 if mkdir:
1868 os.mkdir(path)
1869 controldir = os.path.join(path, CONTROLDIR)
1870 os.mkdir(controldir)
1871 _set_filesystem_hidden(controldir)
1872 return cls._init_maybe_bare(
1873 path,
1874 controldir,
1875 False,
1876 config=config,
1877 default_branch=default_branch,
1878 symlinks=symlinks,
1879 format=format,
1880 )
1882 @classmethod
1883 def _init_new_working_directory(
1884 cls,
1885 path: Union[str, bytes, os.PathLike],
1886 main_repo,
1887 identifier=None,
1888 mkdir=False,
1889 ):
1890 """Create a new working directory linked to a repository.
1892 Args:
1893 path: Path in which to create the working tree.
1894 main_repo: Main repository to reference
1895 identifier: Worktree identifier
1896 mkdir: Whether to create the directory
1897 Returns: `Repo` instance
1898 """
1899 path = os.fspath(path)
1900 if isinstance(path, bytes):
1901 path = os.fsdecode(path)
1902 if mkdir:
1903 os.mkdir(path)
1904 if identifier is None:
1905 identifier = os.path.basename(path)
1906 # Ensure we use absolute path for the worktree control directory
1907 main_controldir = os.path.abspath(main_repo.controldir())
1908 main_worktreesdir = os.path.join(main_controldir, WORKTREES)
1909 worktree_controldir = os.path.join(main_worktreesdir, identifier)
1910 gitdirfile = os.path.join(path, CONTROLDIR)
1911 with open(gitdirfile, "wb") as f:
1912 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n")
1913 try:
1914 os.mkdir(main_worktreesdir)
1915 except FileExistsError:
1916 pass
1917 try:
1918 os.mkdir(worktree_controldir)
1919 except FileExistsError:
1920 pass
1921 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f:
1922 f.write(os.fsencode(gitdirfile) + b"\n")
1923 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f:
1924 f.write(b"../..\n")
1925 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f:
1926 f.write(main_repo.head() + b"\n")
1927 r = cls(os.path.normpath(path))
1928 r.get_worktree().reset_index()
1929 return r
1931 @classmethod
1932 def init_bare(
1933 cls,
1934 path: Union[str, bytes, os.PathLike],
1935 *,
1936 mkdir=False,
1937 object_store=None,
1938 config=None,
1939 default_branch=None,
1940 format: Optional[int] = None,
1941 ):
1942 """Create a new bare repository.
1944 ``path`` should already exist and be an empty directory.
1946 Args:
1947 path: Path to create bare repository in
1948 mkdir: Whether to create the directory
1949 object_store: Object store to use
1950 config: Configuration object
1951 default_branch: Default branch name
1952 format: Repository format version (defaults to 0)
1953 Returns: a `Repo` instance
1954 """
1955 path = os.fspath(path)
1956 if isinstance(path, bytes):
1957 path = os.fsdecode(path)
1958 if mkdir:
1959 os.mkdir(path)
1960 return cls._init_maybe_bare(
1961 path,
1962 path,
1963 True,
1964 object_store=object_store,
1965 config=config,
1966 default_branch=default_branch,
1967 format=format,
1968 )
1970 create = init_bare
1972 def close(self) -> None:
1973 """Close any files opened by this repository."""
1974 self.object_store.close()
1975 # Clean up filter context if it was created
1976 if self.filter_context is not None:
1977 self.filter_context.close()
1978 self.filter_context = None
1980 def __enter__(self):
1981 """Enter context manager."""
1982 return self
1984 def __exit__(self, exc_type, exc_val, exc_tb):
1985 """Exit context manager and close repository."""
1986 self.close()
1988 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]:
1989 """Read .gitattributes file from working tree.
1991 Returns:
1992 Dictionary mapping file patterns to attributes
1993 """
1994 gitattributes = {}
1995 gitattributes_path = os.path.join(self.path, ".gitattributes")
1997 if os.path.exists(gitattributes_path):
1998 with open(gitattributes_path, "rb") as f:
1999 for line in f:
2000 line = line.strip()
2001 if not line or line.startswith(b"#"):
2002 continue
2004 parts = line.split()
2005 if len(parts) < 2:
2006 continue
2008 pattern = parts[0]
2009 attrs = {}
2011 for attr in parts[1:]:
2012 if attr.startswith(b"-"):
2013 # Unset attribute
2014 attrs[attr[1:]] = b"false"
2015 elif b"=" in attr:
2016 # Set to value
2017 key, value = attr.split(b"=", 1)
2018 attrs[key] = value
2019 else:
2020 # Set attribute
2021 attrs[attr] = b"true"
2023 gitattributes[pattern] = attrs
2025 return gitattributes
2027 def get_blob_normalizer(self):
2028 """Return a BlobNormalizer object."""
2029 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2031 # Get fresh configuration and GitAttributes
2032 config_stack = self.get_config_stack()
2033 git_attributes = self.get_gitattributes()
2035 # Lazily create FilterContext if needed
2036 if self.filter_context is None:
2037 filter_registry = FilterRegistry(config_stack, self)
2038 self.filter_context = FilterContext(filter_registry)
2039 else:
2040 # Refresh the context with current config to handle config changes
2041 self.filter_context.refresh_config(config_stack)
2043 # Return a new FilterBlobNormalizer with the context
2044 return FilterBlobNormalizer(
2045 config_stack, git_attributes, filter_context=self.filter_context
2046 )
2048 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
2049 """Read gitattributes for the repository.
2051 Args:
2052 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
2054 Returns:
2055 GitAttributes object that can be used to match paths
2056 """
2057 from .attrs import (
2058 GitAttributes,
2059 Pattern,
2060 parse_git_attributes,
2061 )
2063 patterns = []
2065 # Read system gitattributes (TODO: implement this)
2066 # Read global gitattributes (TODO: implement this)
2068 # Read repository .gitattributes from index/tree
2069 if tree is None:
2070 try:
2071 # Try to get from HEAD
2072 head = self[b"HEAD"]
2073 if isinstance(head, Tag):
2074 _cls, obj = head.object
2075 head = self.get_object(obj)
2076 assert isinstance(head, Commit)
2077 tree = head.tree
2078 except KeyError:
2079 # No HEAD, no attributes from tree
2080 pass
2082 if tree is not None:
2083 try:
2084 tree_obj = self[tree]
2085 assert isinstance(tree_obj, Tree)
2086 if b".gitattributes" in tree_obj:
2087 _, attrs_sha = tree_obj[b".gitattributes"]
2088 attrs_blob = self[attrs_sha]
2089 if isinstance(attrs_blob, Blob):
2090 attrs_data = BytesIO(attrs_blob.data)
2091 for pattern_bytes, attrs in parse_git_attributes(attrs_data):
2092 pattern = Pattern(pattern_bytes)
2093 patterns.append((pattern, attrs))
2094 except (KeyError, NotTreeError):
2095 pass
2097 # Read .git/info/attributes
2098 info_attrs_path = os.path.join(self.controldir(), "info", "attributes")
2099 if os.path.exists(info_attrs_path):
2100 with open(info_attrs_path, "rb") as f:
2101 for pattern_bytes, attrs in parse_git_attributes(f):
2102 pattern = Pattern(pattern_bytes)
2103 patterns.append((pattern, attrs))
2105 # Read .gitattributes from working directory (if it exists)
2106 working_attrs_path = os.path.join(self.path, ".gitattributes")
2107 if os.path.exists(working_attrs_path):
2108 with open(working_attrs_path, "rb") as f:
2109 for pattern_bytes, attrs in parse_git_attributes(f):
2110 pattern = Pattern(pattern_bytes)
2111 patterns.append((pattern, attrs))
2113 return GitAttributes(patterns)
2115 @replace_me(remove_in="0.26.0")
2116 def _sparse_checkout_file_path(self) -> str:
2117 """Return the path of the sparse-checkout file in this repo's control dir."""
2118 return self.get_worktree()._sparse_checkout_file_path()
2120 @replace_me(remove_in="0.26.0")
2121 def configure_for_cone_mode(self) -> None:
2122 """Ensure the repository is configured for cone-mode sparse-checkout."""
2123 return self.get_worktree().configure_for_cone_mode()
2125 @replace_me(remove_in="0.26.0")
2126 def infer_cone_mode(self) -> bool:
2127 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
2128 return self.get_worktree().infer_cone_mode()
2130 @replace_me(remove_in="0.26.0")
2131 def get_sparse_checkout_patterns(self) -> list[str]:
2132 """Return a list of sparse-checkout patterns from info/sparse-checkout.
2134 Returns:
2135 A list of patterns. Returns an empty list if the file is missing.
2136 """
2137 return self.get_worktree().get_sparse_checkout_patterns()
2139 @replace_me(remove_in="0.26.0")
2140 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None:
2141 """Write the given sparse-checkout patterns into info/sparse-checkout.
2143 Creates the info/ directory if it does not exist.
2145 Args:
2146 patterns: A list of gitignore-style patterns to store.
2147 """
2148 return self.get_worktree().set_sparse_checkout_patterns(patterns)
2150 @replace_me(remove_in="0.26.0")
2151 def set_cone_mode_patterns(self, dirs: Union[list[str], None] = None) -> None:
2152 """Write the given cone-mode directory patterns into info/sparse-checkout.
2154 For each directory to include, add an inclusion line that "undoes" the prior
2155 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
2156 Never add the same line twice.
2157 """
2158 return self.get_worktree().set_cone_mode_patterns(dirs)
2161class MemoryRepo(BaseRepo):
2162 """Repo that stores refs, objects, and named files in memory.
2164 MemoryRepos are always bare: they have no working tree and no index, since
2165 those have a stronger dependency on the filesystem.
2166 """
2168 def __init__(self) -> None:
2169 """Create a new repository in memory."""
2170 from .config import ConfigFile
2172 self._reflog: list[Any] = []
2173 refs_container = DictRefsContainer({}, logger=self._append_reflog)
2174 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) # type: ignore[arg-type]
2175 self._named_files: dict[str, bytes] = {}
2176 self.bare = True
2177 self._config = ConfigFile()
2178 self._description = None
2179 self.filter_context = None
2181 def _append_reflog(self, *args) -> None:
2182 self._reflog.append(args)
2184 def set_description(self, description) -> None:
2185 """Set the description for this repository.
2187 Args:
2188 description: Text to set as description
2189 """
2190 self._description = description
2192 def get_description(self):
2193 """Get the description of this repository.
2195 Returns:
2196 Repository description as bytes
2197 """
2198 return self._description
2200 def _determine_file_mode(self):
2201 """Probe the file-system to determine whether permissions can be trusted.
2203 Returns: True if permissions can be trusted, False otherwise.
2204 """
2205 return sys.platform != "win32"
2207 def _determine_symlinks(self):
2208 """Probe the file-system to determine whether permissions can be trusted.
2210 Returns: True if permissions can be trusted, False otherwise.
2211 """
2212 return sys.platform != "win32"
2214 def _put_named_file(self, path, contents) -> None:
2215 """Write a file to the control dir with the given name and contents.
2217 Args:
2218 path: The path to the file, relative to the control dir.
2219 contents: A string to write to the file.
2220 """
2221 self._named_files[path] = contents
2223 def _del_named_file(self, path) -> None:
2224 try:
2225 del self._named_files[path]
2226 except KeyError:
2227 pass
2229 def get_named_file(self, path, basedir=None):
2230 """Get a file from the control dir with a specific name.
2232 Although the filename should be interpreted as a filename relative to
2233 the control dir in a disk-baked Repo, the object returned need not be
2234 pointing to a file in that location.
2236 Args:
2237 path: The path to the file, relative to the control dir.
2238 basedir: Optional base directory for the path
2239 Returns: An open file object, or None if the file does not exist.
2240 """
2241 contents = self._named_files.get(path, None)
2242 if contents is None:
2243 return None
2244 return BytesIO(contents)
2246 def open_index(self) -> "Index":
2247 """Fail to open index for this repo, since it is bare.
2249 Raises:
2250 NoIndexPresent: Raised when no index is present
2251 """
2252 raise NoIndexPresent
2254 def get_config(self):
2255 """Retrieve the config object.
2257 Returns: `ConfigFile` object.
2258 """
2259 return self._config
2261 def get_rebase_state_manager(self):
2262 """Get the appropriate rebase state manager for this repository.
2264 Returns: MemoryRebaseStateManager instance
2265 """
2266 from .rebase import MemoryRebaseStateManager
2268 return MemoryRebaseStateManager(self)
2270 def get_blob_normalizer(self):
2271 """Return a BlobNormalizer object for checkin/checkout operations."""
2272 from .filters import FilterBlobNormalizer, FilterContext, FilterRegistry
2274 # Get fresh configuration and GitAttributes
2275 config_stack = self.get_config_stack()
2276 git_attributes = self.get_gitattributes()
2278 # Lazily create FilterContext if needed
2279 if self.filter_context is None:
2280 filter_registry = FilterRegistry(config_stack, self)
2281 self.filter_context = FilterContext(filter_registry)
2282 else:
2283 # Refresh the context with current config to handle config changes
2284 self.filter_context.refresh_config(config_stack)
2286 # Return a new FilterBlobNormalizer with the context
2287 return FilterBlobNormalizer(
2288 config_stack, git_attributes, filter_context=self.filter_context
2289 )
2291 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
2292 """Read gitattributes for the repository."""
2293 from .attrs import GitAttributes
2295 # Memory repos don't have working trees or gitattributes files
2296 # Return empty GitAttributes
2297 return GitAttributes([])
2299 def close(self) -> None:
2300 """Close any resources opened by this repository."""
2301 # Clean up filter context if it was created
2302 if self.filter_context is not None:
2303 self.filter_context.close()
2304 self.filter_context = None
2306 def do_commit(
2307 self,
2308 message: Optional[bytes] = None,
2309 committer: Optional[bytes] = None,
2310 author: Optional[bytes] = None,
2311 commit_timestamp=None,
2312 commit_timezone=None,
2313 author_timestamp=None,
2314 author_timezone=None,
2315 tree: Optional[ObjectID] = None,
2316 encoding: Optional[bytes] = None,
2317 ref: Optional[Ref] = b"HEAD",
2318 merge_heads: Optional[list[ObjectID]] = None,
2319 no_verify: bool = False,
2320 sign: bool = False,
2321 ):
2322 """Create a new commit.
2324 This is a simplified implementation for in-memory repositories that
2325 doesn't support worktree operations or hooks.
2327 Args:
2328 message: Commit message
2329 committer: Committer fullname
2330 author: Author fullname
2331 commit_timestamp: Commit timestamp (defaults to now)
2332 commit_timezone: Commit timestamp timezone (defaults to GMT)
2333 author_timestamp: Author timestamp (defaults to commit timestamp)
2334 author_timezone: Author timestamp timezone (defaults to commit timezone)
2335 tree: SHA1 of the tree root to use
2336 encoding: Encoding
2337 ref: Optional ref to commit to (defaults to current branch).
2338 If None, creates a dangling commit without updating any ref.
2339 merge_heads: Merge heads
2340 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo)
2341 sign: GPG Sign the commit (ignored for MemoryRepo)
2343 Returns:
2344 New commit SHA1
2345 """
2346 import time
2348 from .objects import Commit
2350 if tree is None:
2351 raise ValueError("tree must be specified for MemoryRepo")
2353 c = Commit()
2354 if len(tree) != 40:
2355 raise ValueError("tree must be a 40-byte hex sha string")
2356 c.tree = tree
2358 config = self.get_config_stack()
2359 if merge_heads is None:
2360 merge_heads = []
2361 if committer is None:
2362 committer = get_user_identity(config, kind="COMMITTER")
2363 check_user_identity(committer)
2364 c.committer = committer
2365 if commit_timestamp is None:
2366 commit_timestamp = time.time()
2367 c.commit_time = int(commit_timestamp)
2368 if commit_timezone is None:
2369 commit_timezone = 0
2370 c.commit_timezone = commit_timezone
2371 if author is None:
2372 author = get_user_identity(config, kind="AUTHOR")
2373 c.author = author
2374 check_user_identity(author)
2375 if author_timestamp is None:
2376 author_timestamp = commit_timestamp
2377 c.author_time = int(author_timestamp)
2378 if author_timezone is None:
2379 author_timezone = commit_timezone
2380 c.author_timezone = author_timezone
2381 if encoding is None:
2382 try:
2383 encoding = config.get(("i18n",), "commitEncoding")
2384 except KeyError:
2385 pass
2386 if encoding is not None:
2387 c.encoding = encoding
2389 # Handle message (for MemoryRepo, we don't support callable messages)
2390 if callable(message):
2391 message = message(self, c)
2392 if message is None:
2393 raise ValueError("Message callback returned None")
2395 if message is None:
2396 raise ValueError("No commit message specified")
2398 c.message = message
2400 if ref is None:
2401 # Create a dangling commit
2402 c.parents = merge_heads
2403 self.object_store.add_object(c)
2404 else:
2405 try:
2406 old_head = self.refs[ref]
2407 c.parents = [old_head, *merge_heads]
2408 self.object_store.add_object(c)
2409 ok = self.refs.set_if_equals(
2410 ref,
2411 old_head,
2412 c.id,
2413 message=b"commit: " + message,
2414 committer=committer,
2415 timestamp=commit_timestamp,
2416 timezone=commit_timezone,
2417 )
2418 except KeyError:
2419 c.parents = merge_heads
2420 self.object_store.add_object(c)
2421 ok = self.refs.add_if_new(
2422 ref,
2423 c.id,
2424 message=b"commit: " + message,
2425 committer=committer,
2426 timestamp=commit_timestamp,
2427 timezone=commit_timezone,
2428 )
2429 if not ok:
2430 from .errors import CommitError
2432 raise CommitError(f"{ref!r} changed during commit")
2434 return c.id
2436 @classmethod
2437 def init_bare(cls, objects, refs, format: Optional[int] = None):
2438 """Create a new bare repository in memory.
2440 Args:
2441 objects: Objects for the new repository,
2442 as iterable
2443 refs: Refs as dictionary, mapping names
2444 to object SHA1s
2445 format: Repository format version (defaults to 0)
2446 """
2447 ret = cls()
2448 for obj in objects:
2449 ret.object_store.add_object(obj)
2450 for refname, sha in refs.items():
2451 ret.refs.add_if_new(refname, sha)
2452 ret._init_files(bare=True, format=format)
2453 return ret