Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# repo.py -- For dealing with git repositories.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
24"""Repository access.
26This module contains the base class for git repositories
27(BaseRepo) and an implementation which uses a repository on
28local disk (Repo).
30"""
32import os
33import stat
34import sys
35import time
36import warnings
37from collections.abc import Iterable, Iterator
38from io import BytesIO
39from typing import (
40 TYPE_CHECKING,
41 Any,
42 BinaryIO,
43 Callable,
44 Optional,
45 TypeVar,
46 Union,
47)
49if TYPE_CHECKING:
50 # There are no circular imports here, but we try to defer imports as long
51 # as possible to reduce start-up time for anything that doesn't need
52 # these imports.
53 from .attrs import GitAttributes
54 from .config import ConditionMatcher, ConfigFile, StackedConfig
55 from .index import Index
56 from .line_ending import BlobNormalizer
57 from .notes import Notes
58 from .object_store import BaseObjectStore, GraphWalker, UnpackedObject
59 from .rebase import RebaseStateManager
60 from .walk import Walker
61 from .worktree import WorkTree
63from . import replace_me
64from .errors import (
65 NoIndexPresent,
66 NotBlobError,
67 NotCommitError,
68 NotGitRepository,
69 NotTagError,
70 NotTreeError,
71 RefFormatError,
72)
73from .file import GitFile
74from .hooks import (
75 CommitMsgShellHook,
76 Hook,
77 PostCommitShellHook,
78 PostReceiveShellHook,
79 PreCommitShellHook,
80)
81from .object_store import (
82 DiskObjectStore,
83 MemoryObjectStore,
84 MissingObjectFinder,
85 ObjectStoreGraphWalker,
86 PackBasedObjectStore,
87 find_shallow,
88 peel_sha,
89)
90from .objects import (
91 Blob,
92 Commit,
93 ObjectID,
94 ShaFile,
95 Tag,
96 Tree,
97 check_hexsha,
98 valid_hexsha,
99)
100from .pack import generate_unpacked_objects
101from .refs import (
102 ANNOTATED_TAG_SUFFIX, # noqa: F401
103 LOCAL_BRANCH_PREFIX,
104 LOCAL_TAG_PREFIX, # noqa: F401
105 SYMREF, # noqa: F401
106 DictRefsContainer,
107 DiskRefsContainer,
108 InfoRefsContainer, # noqa: F401
109 Ref,
110 RefsContainer,
111 _set_default_branch,
112 _set_head,
113 _set_origin_head,
114 check_ref_format, # noqa: F401
115 read_packed_refs, # noqa: F401
116 read_packed_refs_with_peeled, # noqa: F401
117 serialize_refs,
118 write_packed_refs, # noqa: F401
119)
121CONTROLDIR = ".git"
122OBJECTDIR = "objects"
124T = TypeVar("T", bound="ShaFile")
125REFSDIR = "refs"
126REFSDIR_TAGS = "tags"
127REFSDIR_HEADS = "heads"
128INDEX_FILENAME = "index"
129COMMONDIR = "commondir"
130GITDIR = "gitdir"
131WORKTREES = "worktrees"
133BASE_DIRECTORIES = [
134 ["branches"],
135 [REFSDIR],
136 [REFSDIR, REFSDIR_TAGS],
137 [REFSDIR, REFSDIR_HEADS],
138 ["hooks"],
139 ["info"],
140]
142DEFAULT_BRANCH = b"master"
145class InvalidUserIdentity(Exception):
146 """User identity is not of the format 'user <email>'."""
148 def __init__(self, identity: str) -> None:
149 """Initialize InvalidUserIdentity exception."""
150 self.identity = identity
153class DefaultIdentityNotFound(Exception):
154 """Default identity could not be determined."""
157# TODO(jelmer): Cache?
158def _get_default_identity() -> tuple[str, str]:
159 import socket
161 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
162 username = os.environ.get(name)
163 if username:
164 break
165 else:
166 username = None
168 try:
169 import pwd
170 except ImportError:
171 fullname = None
172 else:
173 try:
174 entry = pwd.getpwuid(os.getuid()) # type: ignore
175 except KeyError:
176 fullname = None
177 else:
178 if getattr(entry, "gecos", None):
179 fullname = entry.pw_gecos.split(",")[0]
180 else:
181 fullname = None
182 if username is None:
183 username = entry.pw_name
184 if not fullname:
185 if username is None:
186 raise DefaultIdentityNotFound("no username found")
187 fullname = username
188 email = os.environ.get("EMAIL")
189 if email is None:
190 if username is None:
191 raise DefaultIdentityNotFound("no username found")
192 email = f"{username}@{socket.gethostname()}"
193 return (fullname, email)
196def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes:
197 """Determine the identity to use for new commits.
199 If kind is set, this first checks
200 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL.
202 If those variables are not set, then it will fall back
203 to reading the user.name and user.email settings from
204 the specified configuration.
206 If that also fails, then it will fall back to using
207 the current users' identity as obtained from the host
208 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f).
210 Args:
211 config: Configuration stack to read from
212 kind: Optional kind to return identity for,
213 usually either "AUTHOR" or "COMMITTER".
215 Returns:
216 A user identity
217 """
218 user: Optional[bytes] = None
219 email: Optional[bytes] = None
220 if kind:
221 user_uc = os.environ.get("GIT_" + kind + "_NAME")
222 if user_uc is not None:
223 user = user_uc.encode("utf-8")
224 email_uc = os.environ.get("GIT_" + kind + "_EMAIL")
225 if email_uc is not None:
226 email = email_uc.encode("utf-8")
227 if user is None:
228 try:
229 user = config.get(("user",), "name")
230 except KeyError:
231 user = None
232 if email is None:
233 try:
234 email = config.get(("user",), "email")
235 except KeyError:
236 email = None
237 default_user, default_email = _get_default_identity()
238 if user is None:
239 user = default_user.encode("utf-8")
240 if email is None:
241 email = default_email.encode("utf-8")
242 if email.startswith(b"<") and email.endswith(b">"):
243 email = email[1:-1]
244 return user + b" <" + email + b">"
247def check_user_identity(identity: bytes) -> None:
248 """Verify that a user identity is formatted correctly.
250 Args:
251 identity: User identity bytestring
252 Raises:
253 InvalidUserIdentity: Raised when identity is invalid
254 """
255 try:
256 fst, snd = identity.split(b" <", 1)
257 except ValueError as exc:
258 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc
259 if b">" not in snd:
260 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
261 if b"\0" in identity or b"\n" in identity:
262 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
265def parse_graftpoints(
266 graftpoints: Iterable[bytes],
267) -> dict[bytes, list[bytes]]:
268 """Convert a list of graftpoints into a dict.
270 Args:
271 graftpoints: Iterator of graftpoint lines
273 Each line is formatted as:
274 <commit sha1> <parent sha1> [<parent sha1>]*
276 Resulting dictionary is:
277 <commit sha1>: [<parent sha1>*]
279 https://git.wiki.kernel.org/index.php/GraftPoint
280 """
281 grafts = {}
282 for line in graftpoints:
283 raw_graft = line.split(None, 1)
285 commit = raw_graft[0]
286 if len(raw_graft) == 2:
287 parents = raw_graft[1].split()
288 else:
289 parents = []
291 for sha in [commit, *parents]:
292 check_hexsha(sha, "Invalid graftpoint")
294 grafts[commit] = parents
295 return grafts
298def serialize_graftpoints(graftpoints: dict[bytes, list[bytes]]) -> bytes:
299 """Convert a dictionary of grafts into string.
301 The graft dictionary is:
302 <commit sha1>: [<parent sha1>*]
304 Each line is formatted as:
305 <commit sha1> <parent sha1> [<parent sha1>]*
307 https://git.wiki.kernel.org/index.php/GraftPoint
309 """
310 graft_lines = []
311 for commit, parents in graftpoints.items():
312 if parents:
313 graft_lines.append(commit + b" " + b" ".join(parents))
314 else:
315 graft_lines.append(commit)
316 return b"\n".join(graft_lines)
319def _set_filesystem_hidden(path: str) -> None:
320 """Mark path as to be hidden if supported by platform and filesystem.
322 On win32 uses SetFileAttributesW api:
323 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw>
324 """
325 if sys.platform == "win32":
326 import ctypes
327 from ctypes.wintypes import BOOL, DWORD, LPCWSTR
329 FILE_ATTRIBUTE_HIDDEN = 2
330 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)(
331 ("SetFileAttributesW", ctypes.windll.kernel32)
332 )
334 if isinstance(path, bytes):
335 path = os.fsdecode(path)
336 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN):
337 pass # Could raise or log `ctypes.WinError()` here
339 # Could implement other platform specific filesystem hiding here
342class ParentsProvider:
343 """Provider for commit parent information."""
345 def __init__(
346 self,
347 store: "BaseObjectStore",
348 grafts: dict = {},
349 shallows: Iterable[bytes] = [],
350 ) -> None:
351 """Initialize ParentsProvider.
353 Args:
354 store: Object store to use
355 grafts: Graft information
356 shallows: Shallow commit SHAs
357 """
358 self.store = store
359 self.grafts = grafts
360 self.shallows = set(shallows)
362 # Get commit graph once at initialization for performance
363 self.commit_graph = store.get_commit_graph()
365 def get_parents(
366 self, commit_id: bytes, commit: Optional[Commit] = None
367 ) -> list[bytes]:
368 """Get parents for a commit using the parents provider."""
369 try:
370 return self.grafts[commit_id]
371 except KeyError:
372 pass
373 if commit_id in self.shallows:
374 return []
376 # Try to use commit graph for faster parent lookup
377 if self.commit_graph:
378 parents = self.commit_graph.get_parents(commit_id)
379 if parents is not None:
380 return parents
382 # Fallback to reading the commit object
383 if commit is None:
384 obj = self.store[commit_id]
385 assert isinstance(obj, Commit)
386 commit = obj
387 return commit.parents
390class BaseRepo:
391 """Base class for a git repository.
393 This base class is meant to be used for Repository implementations that e.g.
394 work on top of a different transport than a standard filesystem path.
396 Attributes:
397 object_store: Dictionary-like object for accessing
398 the objects
399 refs: Dictionary-like object with the refs in this
400 repository
401 """
403 def __init__(self, object_store: PackBasedObjectStore, refs: RefsContainer) -> None:
404 """Open a repository.
406 This shouldn't be called directly, but rather through one of the
407 base classes, such as MemoryRepo or Repo.
409 Args:
410 object_store: Object store to use
411 refs: Refs container to use
412 """
413 self.object_store = object_store
414 self.refs = refs
416 self._graftpoints: dict[bytes, list[bytes]] = {}
417 self.hooks: dict[str, Hook] = {}
419 def _determine_file_mode(self) -> bool:
420 """Probe the file-system to determine whether permissions can be trusted.
422 Returns: True if permissions can be trusted, False otherwise.
423 """
424 raise NotImplementedError(self._determine_file_mode)
426 def _determine_symlinks(self) -> bool:
427 """Probe the filesystem to determine whether symlinks can be created.
429 Returns: True if symlinks can be created, False otherwise.
430 """
431 # For now, just mimic the old behaviour
432 return sys.platform != "win32"
434 def _init_files(
435 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None
436 ) -> None:
437 """Initialize a default set of named files."""
438 from .config import ConfigFile
440 self._put_named_file("description", b"Unnamed repository")
441 f = BytesIO()
442 cf = ConfigFile()
443 if format is None:
444 format = 0
445 if format not in (0, 1):
446 raise ValueError(f"Unsupported repository format version: {format}")
447 cf.set("core", "repositoryformatversion", str(format))
448 if self._determine_file_mode():
449 cf.set("core", "filemode", True)
450 else:
451 cf.set("core", "filemode", False)
453 if symlinks is None and not bare:
454 symlinks = self._determine_symlinks()
456 if symlinks is False:
457 cf.set("core", "symlinks", symlinks)
459 cf.set("core", "bare", bare)
460 cf.set("core", "logallrefupdates", True)
461 cf.write_to_file(f)
462 self._put_named_file("config", f.getvalue())
463 self._put_named_file(os.path.join("info", "exclude"), b"")
465 def get_named_file(self, path: str) -> Optional[BinaryIO]:
466 """Get a file from the control dir with a specific name.
468 Although the filename should be interpreted as a filename relative to
469 the control dir in a disk-based Repo, the object returned need not be
470 pointing to a file in that location.
472 Args:
473 path: The path to the file, relative to the control dir.
474 Returns: An open file object, or None if the file does not exist.
475 """
476 raise NotImplementedError(self.get_named_file)
478 def _put_named_file(self, path: str, contents: bytes) -> None:
479 """Write a file to the control dir with the given name and contents.
481 Args:
482 path: The path to the file, relative to the control dir.
483 contents: A string to write to the file.
484 """
485 raise NotImplementedError(self._put_named_file)
487 def _del_named_file(self, path: str) -> None:
488 """Delete a file in the control directory with the given name."""
489 raise NotImplementedError(self._del_named_file)
491 def open_index(self) -> "Index":
492 """Open the index for this repository.
494 Raises:
495 NoIndexPresent: If no index is present
496 Returns: The matching `Index`
497 """
498 raise NotImplementedError(self.open_index)
500 def fetch(
501 self,
502 target: "BaseRepo",
503 determine_wants: Optional[Callable] = None,
504 progress: Optional[Callable] = None,
505 depth: Optional[int] = None,
506 ) -> dict:
507 """Fetch objects into another repository.
509 Args:
510 target: The target repository
511 determine_wants: Optional function to determine what refs to
512 fetch.
513 progress: Optional progress function
514 depth: Optional shallow fetch depth
515 Returns: The local refs
516 """
517 if determine_wants is None:
518 determine_wants = target.object_store.determine_wants_all
519 count, pack_data = self.fetch_pack_data(
520 determine_wants,
521 target.get_graph_walker(),
522 progress=progress,
523 depth=depth,
524 )
525 target.object_store.add_pack_data(count, pack_data, progress)
526 return self.get_refs()
528 def fetch_pack_data(
529 self,
530 determine_wants: Callable,
531 graph_walker: "GraphWalker",
532 progress: Optional[Callable],
533 *,
534 get_tagged: Optional[Callable] = None,
535 depth: Optional[int] = None,
536 ) -> tuple:
537 """Fetch the pack data required for a set of revisions.
539 Args:
540 determine_wants: Function that takes a dictionary with heads
541 and returns the list of heads to fetch.
542 graph_walker: Object that can iterate over the list of revisions
543 to fetch and has an "ack" method that will be called to acknowledge
544 that a revision is present.
545 progress: Simple progress function that will be called with
546 updated progress strings.
547 get_tagged: Function that returns a dict of pointed-to sha ->
548 tag sha for including tags.
549 depth: Shallow fetch depth
550 Returns: count and iterator over pack data
551 """
552 missing_objects = self.find_missing_objects(
553 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth
554 )
555 if missing_objects is None:
556 return 0, iter([])
557 remote_has = missing_objects.get_remote_has()
558 object_ids = list(missing_objects)
559 return len(object_ids), generate_unpacked_objects(
560 self.object_store, object_ids, progress=progress, other_haves=remote_has
561 )
563 def find_missing_objects(
564 self,
565 determine_wants: Callable,
566 graph_walker: "GraphWalker",
567 progress: Optional[Callable],
568 *,
569 get_tagged: Optional[Callable] = None,
570 depth: Optional[int] = None,
571 ) -> Optional[MissingObjectFinder]:
572 """Fetch the missing objects required for a set of revisions.
574 Args:
575 determine_wants: Function that takes a dictionary with heads
576 and returns the list of heads to fetch.
577 graph_walker: Object that can iterate over the list of revisions
578 to fetch and has an "ack" method that will be called to acknowledge
579 that a revision is present.
580 progress: Simple progress function that will be called with
581 updated progress strings.
582 get_tagged: Function that returns a dict of pointed-to sha ->
583 tag sha for including tags.
584 depth: Shallow fetch depth
585 Returns: iterator over objects, with __len__ implemented
586 """
587 refs = serialize_refs(self.object_store, self.get_refs())
589 wants = determine_wants(refs)
590 if not isinstance(wants, list):
591 raise TypeError("determine_wants() did not return a list")
593 current_shallow = set(getattr(graph_walker, "shallow", set()))
595 if depth not in (None, 0):
596 assert depth is not None
597 shallow, not_shallow = find_shallow(self.object_store, wants, depth)
598 # Only update if graph_walker has shallow attribute
599 if hasattr(graph_walker, "shallow"):
600 graph_walker.shallow.update(shallow - not_shallow)
601 new_shallow = graph_walker.shallow - current_shallow
602 unshallow = graph_walker.unshallow = not_shallow & current_shallow # type: ignore[attr-defined]
603 if hasattr(graph_walker, "update_shallow"):
604 graph_walker.update_shallow(new_shallow, unshallow)
605 else:
606 unshallow = getattr(graph_walker, "unshallow", set())
608 if wants == []:
609 # TODO(dborowitz): find a way to short-circuit that doesn't change
610 # this interface.
612 if getattr(graph_walker, "shallow", set()) or unshallow:
613 # Do not send a pack in shallow short-circuit path
614 return None
616 class DummyMissingObjectFinder:
617 """Dummy finder that returns no missing objects."""
619 def get_remote_has(self) -> None:
620 """Get remote has (always returns None).
622 Returns:
623 None
624 """
625 return None
627 def __len__(self) -> int:
628 return 0
630 def __iter__(self) -> Iterator[tuple[bytes, Optional[bytes]]]:
631 yield from []
633 return DummyMissingObjectFinder() # type: ignore
635 # If the graph walker is set up with an implementation that can
636 # ACK/NAK to the wire, it will write data to the client through
637 # this call as a side-effect.
638 haves = self.object_store.find_common_revisions(graph_walker)
640 # Deal with shallow requests separately because the haves do
641 # not reflect what objects are missing
642 if getattr(graph_walker, "shallow", set()) or unshallow:
643 # TODO: filter the haves commits from iter_shas. the specific
644 # commits aren't missing.
645 haves = []
647 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow)
649 def get_parents(commit: Commit) -> list[bytes]:
650 """Get parents for a commit using the parents provider.
652 Args:
653 commit: Commit object
655 Returns:
656 List of parent commit SHAs
657 """
658 return parents_provider.get_parents(commit.id, commit)
660 return MissingObjectFinder(
661 self.object_store,
662 haves=haves,
663 wants=wants,
664 shallow=getattr(graph_walker, "shallow", set()),
665 progress=progress,
666 get_tagged=get_tagged,
667 get_parents=get_parents,
668 )
670 def generate_pack_data(
671 self,
672 have: Iterable[ObjectID],
673 want: Iterable[ObjectID],
674 progress: Optional[Callable[[str], None]] = None,
675 ofs_delta: Optional[bool] = None,
676 ) -> tuple[int, Iterator["UnpackedObject"]]:
677 """Generate pack data objects for a set of wants/haves.
679 Args:
680 have: List of SHA1s of objects that should not be sent
681 want: List of SHA1s of objects that should be sent
682 ofs_delta: Whether OFS deltas can be included
683 progress: Optional progress reporting method
684 """
685 return self.object_store.generate_pack_data(
686 have,
687 want,
688 shallow=self.get_shallow(),
689 progress=progress,
690 ofs_delta=ofs_delta,
691 )
693 def get_graph_walker(
694 self, heads: Optional[list[ObjectID]] = None
695 ) -> ObjectStoreGraphWalker:
696 """Retrieve a graph walker.
698 A graph walker is used by a remote repository (or proxy)
699 to find out which objects are present in this repository.
701 Args:
702 heads: Repository heads to use (optional)
703 Returns: A graph walker object
704 """
705 if heads is None:
706 heads = [
707 sha
708 for sha in self.refs.as_dict(b"refs/heads").values()
709 if sha in self.object_store
710 ]
711 parents_provider = ParentsProvider(self.object_store)
712 return ObjectStoreGraphWalker(
713 heads,
714 parents_provider.get_parents,
715 shallow=self.get_shallow(),
716 update_shallow=self.update_shallow,
717 )
719 def get_refs(self) -> dict[bytes, bytes]:
720 """Get dictionary with all refs.
722 Returns: A ``dict`` mapping ref names to SHA1s
723 """
724 return self.refs.as_dict()
726 def head(self) -> bytes:
727 """Return the SHA1 pointed at by HEAD."""
728 # TODO: move this method to WorkTree
729 return self.refs[b"HEAD"]
731 def _get_object(self, sha: bytes, cls: type[T]) -> T:
732 assert len(sha) in (20, 40)
733 ret = self.get_object(sha)
734 if not isinstance(ret, cls):
735 if cls is Commit:
736 raise NotCommitError(ret.id)
737 elif cls is Blob:
738 raise NotBlobError(ret.id)
739 elif cls is Tree:
740 raise NotTreeError(ret.id)
741 elif cls is Tag:
742 raise NotTagError(ret.id)
743 else:
744 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}")
745 return ret
747 def get_object(self, sha: bytes) -> ShaFile:
748 """Retrieve the object with the specified SHA.
750 Args:
751 sha: SHA to retrieve
752 Returns: A ShaFile object
753 Raises:
754 KeyError: when the object can not be found
755 """
756 return self.object_store[sha]
758 def parents_provider(self) -> ParentsProvider:
759 """Get a parents provider for this repository.
761 Returns:
762 ParentsProvider instance configured with grafts and shallows
763 """
764 return ParentsProvider(
765 self.object_store,
766 grafts=self._graftpoints,
767 shallows=self.get_shallow(),
768 )
770 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]:
771 """Retrieve the parents of a specific commit.
773 If the specific commit is a graftpoint, the graft parents
774 will be returned instead.
776 Args:
777 sha: SHA of the commit for which to retrieve the parents
778 commit: Optional commit matching the sha
779 Returns: List of parents
780 """
781 return self.parents_provider().get_parents(sha, commit)
783 def get_config(self) -> "ConfigFile":
784 """Retrieve the config object.
786 Returns: `ConfigFile` object for the ``.git/config`` file.
787 """
788 raise NotImplementedError(self.get_config)
790 def get_worktree_config(self) -> "ConfigFile":
791 """Retrieve the worktree config object."""
792 raise NotImplementedError(self.get_worktree_config)
794 def get_description(self) -> Optional[str]:
795 """Retrieve the description for this repository.
797 Returns: String with the description of the repository
798 as set by the user.
799 """
800 raise NotImplementedError(self.get_description)
802 def set_description(self, description: bytes) -> None:
803 """Set the description for this repository.
805 Args:
806 description: Text to set as description for this repository.
807 """
808 raise NotImplementedError(self.set_description)
810 def get_rebase_state_manager(self) -> "RebaseStateManager":
811 """Get the appropriate rebase state manager for this repository.
813 Returns: RebaseStateManager instance
814 """
815 raise NotImplementedError(self.get_rebase_state_manager)
817 def get_blob_normalizer(self) -> "BlobNormalizer":
818 """Return a BlobNormalizer object for checkin/checkout operations.
820 Returns: BlobNormalizer instance
821 """
822 raise NotImplementedError(self.get_blob_normalizer)
824 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
825 """Read gitattributes for the repository.
827 Args:
828 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
830 Returns:
831 GitAttributes object that can be used to match paths
832 """
833 raise NotImplementedError(self.get_gitattributes)
835 def get_config_stack(self) -> "StackedConfig":
836 """Return a config stack for this repository.
838 This stack accesses the configuration for both this repository
839 itself (.git/config) and the global configuration, which usually
840 lives in ~/.gitconfig.
842 Returns: `Config` instance for this repository
843 """
844 from .config import ConfigFile, StackedConfig
846 local_config = self.get_config()
847 backends: list[ConfigFile] = [local_config]
848 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
849 backends.append(self.get_worktree_config())
851 backends += StackedConfig.default_backends()
852 return StackedConfig(backends, writable=local_config)
854 def get_shallow(self) -> set[ObjectID]:
855 """Get the set of shallow commits.
857 Returns: Set of shallow commits.
858 """
859 f = self.get_named_file("shallow")
860 if f is None:
861 return set()
862 with f:
863 return {line.strip() for line in f}
865 def update_shallow(
866 self, new_shallow: Optional[set[bytes]], new_unshallow: Optional[set[bytes]]
867 ) -> None:
868 """Update the list of shallow objects.
870 Args:
871 new_shallow: Newly shallow objects
872 new_unshallow: Newly no longer shallow objects
873 """
874 shallow = self.get_shallow()
875 if new_shallow:
876 shallow.update(new_shallow)
877 if new_unshallow:
878 shallow.difference_update(new_unshallow)
879 if shallow:
880 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow]))
881 else:
882 self._del_named_file("shallow")
884 def get_peeled(self, ref: Ref) -> ObjectID:
885 """Get the peeled value of a ref.
887 Args:
888 ref: The refname to peel.
889 Returns: The fully-peeled SHA1 of a tag object, after peeling all
890 intermediate tags; if the original ref does not point to a tag,
891 this will equal the original SHA1.
892 """
893 cached = self.refs.get_peeled(ref)
894 if cached is not None:
895 return cached
896 return peel_sha(self.object_store, self.refs[ref])[1].id
898 @property
899 def notes(self) -> "Notes":
900 """Access notes functionality for this repository.
902 Returns:
903 Notes object for accessing notes
904 """
905 from .notes import Notes
907 return Notes(self.object_store, self.refs)
909 def get_walker(self, include: Optional[list[bytes]] = None, **kwargs) -> "Walker":
910 """Obtain a walker for this repository.
912 Args:
913 include: Iterable of SHAs of commits to include along with their
914 ancestors. Defaults to [HEAD]
915 **kwargs: Additional keyword arguments including:
917 * exclude: Iterable of SHAs of commits to exclude along with their
918 ancestors, overriding includes.
919 * order: ORDER_* constant specifying the order of results.
920 Anything other than ORDER_DATE may result in O(n) memory usage.
921 * reverse: If True, reverse the order of output, requiring O(n)
922 memory.
923 * max_entries: The maximum number of entries to yield, or None for
924 no limit.
925 * paths: Iterable of file or subtree paths to show entries for.
926 * rename_detector: diff.RenameDetector object for detecting
927 renames.
928 * follow: If True, follow path across renames/copies. Forces a
929 default rename_detector.
930 * since: Timestamp to list commits after.
931 * until: Timestamp to list commits before.
932 * queue_cls: A class to use for a queue of commits, supporting the
933 iterator protocol. The constructor takes a single argument, the Walker.
935 Returns: A `Walker` object
936 """
937 from .walk import Walker
939 if include is None:
940 include = [self.head()]
942 kwargs["get_parents"] = lambda commit: self.get_parents(commit.id, commit)
944 return Walker(self.object_store, include, **kwargs)
946 def __getitem__(self, name: Union[ObjectID, Ref]) -> "ShaFile":
947 """Retrieve a Git object by SHA1 or ref.
949 Args:
950 name: A Git object SHA1 or a ref name
951 Returns: A `ShaFile` object, such as a Commit or Blob
952 Raises:
953 KeyError: when the specified ref or object does not exist
954 """
955 if not isinstance(name, bytes):
956 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}")
957 if len(name) in (20, 40):
958 try:
959 return self.object_store[name]
960 except (KeyError, ValueError):
961 pass
962 try:
963 return self.object_store[self.refs[name]]
964 except RefFormatError as exc:
965 raise KeyError(name) from exc
967 def __contains__(self, name: bytes) -> bool:
968 """Check if a specific Git object or ref is present.
970 Args:
971 name: Git object SHA1 or ref name
972 """
973 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)):
974 return name in self.object_store or name in self.refs
975 else:
976 return name in self.refs
978 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None:
979 """Set a ref.
981 Args:
982 name: ref name
983 value: Ref value - either a ShaFile object, or a hex sha
984 """
985 if name.startswith(b"refs/") or name == b"HEAD":
986 if isinstance(value, ShaFile):
987 self.refs[name] = value.id
988 elif isinstance(value, bytes):
989 self.refs[name] = value
990 else:
991 raise TypeError(value)
992 else:
993 raise ValueError(name)
995 def __delitem__(self, name: bytes) -> None:
996 """Remove a ref.
998 Args:
999 name: Name of the ref to remove
1000 """
1001 if name.startswith(b"refs/") or name == b"HEAD":
1002 del self.refs[name]
1003 else:
1004 raise ValueError(name)
1006 def _get_user_identity(
1007 self, config: "StackedConfig", kind: Optional[str] = None
1008 ) -> bytes:
1009 """Determine the identity to use for new commits."""
1010 warnings.warn(
1011 "use get_user_identity() rather than Repo._get_user_identity",
1012 DeprecationWarning,
1013 )
1014 return get_user_identity(config)
1016 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None:
1017 """Add or modify graftpoints.
1019 Args:
1020 updated_graftpoints: Dict of commit shas to list of parent shas
1021 """
1022 # Simple validation
1023 for commit, parents in updated_graftpoints.items():
1024 for sha in [commit, *parents]:
1025 check_hexsha(sha, "Invalid graftpoint")
1027 self._graftpoints.update(updated_graftpoints)
1029 def _remove_graftpoints(self, to_remove: list[bytes] = []) -> None:
1030 """Remove graftpoints.
1032 Args:
1033 to_remove: List of commit shas
1034 """
1035 for sha in to_remove:
1036 del self._graftpoints[sha]
1038 def _read_heads(self, name: str) -> list[bytes]:
1039 f = self.get_named_file(name)
1040 if f is None:
1041 return []
1042 with f:
1043 return [line.strip() for line in f.readlines() if line.strip()]
1045 def get_worktree(self) -> "WorkTree":
1046 """Get the working tree for this repository.
1048 Returns:
1049 WorkTree instance for performing working tree operations
1051 Raises:
1052 NotImplementedError: If the repository doesn't support working trees
1053 """
1054 raise NotImplementedError(
1055 "Working tree operations not supported by this repository type"
1056 )
1058 @replace_me(remove_in="0.26.0")
1059 def do_commit(
1060 self,
1061 message: Optional[bytes] = None,
1062 committer: Optional[bytes] = None,
1063 author: Optional[bytes] = None,
1064 commit_timestamp: Optional[float] = None,
1065 commit_timezone: Optional[int] = None,
1066 author_timestamp: Optional[float] = None,
1067 author_timezone: Optional[int] = None,
1068 tree: Optional[ObjectID] = None,
1069 encoding: Optional[bytes] = None,
1070 ref: Optional[Ref] = b"HEAD",
1071 merge_heads: Optional[list[ObjectID]] = None,
1072 no_verify: bool = False,
1073 sign: bool = False,
1074 ) -> bytes:
1075 """Create a new commit.
1077 If not specified, committer and author default to
1078 get_user_identity(..., 'COMMITTER')
1079 and get_user_identity(..., 'AUTHOR') respectively.
1081 Args:
1082 message: Commit message (bytes or callable that takes (repo, commit)
1083 and returns bytes)
1084 committer: Committer fullname
1085 author: Author fullname
1086 commit_timestamp: Commit timestamp (defaults to now)
1087 commit_timezone: Commit timestamp timezone (defaults to GMT)
1088 author_timestamp: Author timestamp (defaults to commit
1089 timestamp)
1090 author_timezone: Author timestamp timezone
1091 (defaults to commit timestamp timezone)
1092 tree: SHA1 of the tree root to use (if not specified the
1093 current index will be committed).
1094 encoding: Encoding
1095 ref: Optional ref to commit to (defaults to current branch).
1096 If None, creates a dangling commit without updating any ref.
1097 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
1098 no_verify: Skip pre-commit and commit-msg hooks
1099 sign: GPG Sign the commit (bool, defaults to False,
1100 pass True to use default GPG key,
1101 pass a str containing Key ID to use a specific GPG key)
1103 Returns:
1104 New commit SHA1
1105 """
1106 return self.get_worktree().commit(
1107 message=message,
1108 committer=committer,
1109 author=author,
1110 commit_timestamp=commit_timestamp,
1111 commit_timezone=commit_timezone,
1112 author_timestamp=author_timestamp,
1113 author_timezone=author_timezone,
1114 tree=tree,
1115 encoding=encoding,
1116 ref=ref,
1117 merge_heads=merge_heads,
1118 no_verify=no_verify,
1119 sign=sign,
1120 )
1123def read_gitfile(f: BinaryIO) -> str:
1124 """Read a ``.git`` file.
1126 The first line of the file should start with "gitdir: "
1128 Args:
1129 f: File-like object to read from
1130 Returns: A path
1131 """
1132 cs = f.read()
1133 if not cs.startswith(b"gitdir: "):
1134 raise ValueError("Expected file to start with 'gitdir: '")
1135 return cs[len(b"gitdir: ") :].rstrip(b"\n").decode("utf-8")
1138class UnsupportedVersion(Exception):
1139 """Unsupported repository version."""
1141 def __init__(self, version) -> None:
1142 """Initialize UnsupportedVersion exception.
1144 Args:
1145 version: The unsupported repository version
1146 """
1147 self.version = version
1150class UnsupportedExtension(Exception):
1151 """Unsupported repository extension."""
1153 def __init__(self, extension) -> None:
1154 """Initialize UnsupportedExtension exception.
1156 Args:
1157 extension: The unsupported repository extension
1158 """
1159 self.extension = extension
1162class Repo(BaseRepo):
1163 """A git repository backed by local disk.
1165 To open an existing repository, call the constructor with
1166 the path of the repository.
1168 To create a new repository, use the Repo.init class method.
1170 Note that a repository object may hold on to resources such
1171 as file handles for performance reasons; call .close() to free
1172 up those resources.
1174 Attributes:
1175 path: Path to the working copy (if it exists) or repository control
1176 directory (if the repository is bare)
1177 bare: Whether this is a bare repository
1178 """
1180 path: str
1181 bare: bool
1182 object_store: DiskObjectStore
1184 def __init__(
1185 self,
1186 root: Union[str, bytes, os.PathLike],
1187 object_store: Optional[PackBasedObjectStore] = None,
1188 bare: Optional[bool] = None,
1189 ) -> None:
1190 """Open a repository on disk.
1192 Args:
1193 root: Path to the repository's root.
1194 object_store: ObjectStore to use; if omitted, we use the
1195 repository's default object store
1196 bare: True if this is a bare repository.
1197 """
1198 root = os.fspath(root)
1199 if isinstance(root, bytes):
1200 root = os.fsdecode(root)
1201 hidden_path = os.path.join(root, CONTROLDIR)
1202 if bare is None:
1203 if os.path.isfile(hidden_path) or os.path.isdir(
1204 os.path.join(hidden_path, OBJECTDIR)
1205 ):
1206 bare = False
1207 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(
1208 os.path.join(root, REFSDIR)
1209 ):
1210 bare = True
1211 else:
1212 raise NotGitRepository(
1213 "No git repository was found at {path}".format(**dict(path=root))
1214 )
1216 self.bare = bare
1217 if bare is False:
1218 if os.path.isfile(hidden_path):
1219 with open(hidden_path, "rb") as f:
1220 path = read_gitfile(f)
1221 self._controldir = os.path.join(root, path)
1222 else:
1223 self._controldir = hidden_path
1224 else:
1225 self._controldir = root
1226 commondir = self.get_named_file(COMMONDIR)
1227 if commondir is not None:
1228 with commondir:
1229 self._commondir = os.path.join(
1230 self.controldir(),
1231 os.fsdecode(commondir.read().rstrip(b"\r\n")),
1232 )
1233 else:
1234 self._commondir = self._controldir
1235 self.path = root
1237 # Initialize refs early so they're available for config condition matchers
1238 self.refs = DiskRefsContainer(
1239 self.commondir(), self._controldir, logger=self._write_reflog
1240 )
1242 # Initialize worktrees container
1243 from .worktree import WorkTreeContainer
1245 self.worktrees = WorkTreeContainer(self)
1247 config = self.get_config()
1248 try:
1249 repository_format_version = config.get("core", "repositoryformatversion")
1250 format_version = (
1251 0
1252 if repository_format_version is None
1253 else int(repository_format_version)
1254 )
1255 except KeyError:
1256 format_version = 0
1258 if format_version not in (0, 1):
1259 raise UnsupportedVersion(format_version)
1261 # Track extensions we encounter
1262 has_reftable_extension = False
1263 for extension, value in config.items((b"extensions",)):
1264 if extension.lower() == b"refstorage":
1265 if value == b"reftable":
1266 has_reftable_extension = True
1267 else:
1268 raise UnsupportedExtension(f"refStorage = {value.decode()}")
1269 elif extension.lower() not in (b"worktreeconfig",):
1270 raise UnsupportedExtension(extension)
1272 if object_store is None:
1273 object_store = DiskObjectStore.from_config(
1274 os.path.join(self.commondir(), OBJECTDIR), config
1275 )
1277 # Use reftable if extension is configured
1278 if has_reftable_extension:
1279 from .reftable import ReftableRefsContainer
1281 self.refs = ReftableRefsContainer(self.commondir())
1282 # Update worktrees container after refs change
1283 self.worktrees = WorkTreeContainer(self)
1284 BaseRepo.__init__(self, object_store, self.refs)
1286 self._graftpoints = {}
1287 graft_file = self.get_named_file(
1288 os.path.join("info", "grafts"), basedir=self.commondir()
1289 )
1290 if graft_file:
1291 with graft_file:
1292 self._graftpoints.update(parse_graftpoints(graft_file))
1293 graft_file = self.get_named_file("shallow", basedir=self.commondir())
1294 if graft_file:
1295 with graft_file:
1296 self._graftpoints.update(parse_graftpoints(graft_file))
1298 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir())
1299 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir())
1300 self.hooks["post-commit"] = PostCommitShellHook(self.controldir())
1301 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir())
1303 def get_worktree(self) -> "WorkTree":
1304 """Get the working tree for this repository.
1306 Returns:
1307 WorkTree instance for performing working tree operations
1308 """
1309 from .worktree import WorkTree
1311 return WorkTree(self, self.path)
1313 def _write_reflog(
1314 self, ref, old_sha, new_sha, committer, timestamp, timezone, message
1315 ) -> None:
1316 from .reflog import format_reflog_line
1318 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref))
1319 try:
1320 os.makedirs(os.path.dirname(path))
1321 except FileExistsError:
1322 pass
1323 if committer is None:
1324 config = self.get_config_stack()
1325 committer = get_user_identity(config)
1326 check_user_identity(committer)
1327 if timestamp is None:
1328 timestamp = int(time.time())
1329 if timezone is None:
1330 timezone = 0 # FIXME
1331 with open(path, "ab") as f:
1332 f.write(
1333 format_reflog_line(
1334 old_sha, new_sha, committer, timestamp, timezone, message
1335 )
1336 + b"\n"
1337 )
1339 def read_reflog(self, ref):
1340 """Read reflog entries for a reference.
1342 Args:
1343 ref: Reference name (e.g. b'HEAD', b'refs/heads/master')
1345 Yields:
1346 reflog.Entry objects in chronological order (oldest first)
1347 """
1348 from .reflog import read_reflog
1350 path = os.path.join(self.controldir(), "logs", os.fsdecode(ref))
1351 try:
1352 with open(path, "rb") as f:
1353 yield from read_reflog(f)
1354 except FileNotFoundError:
1355 return
1357 @classmethod
1358 def discover(cls, start="."):
1359 """Iterate parent directories to discover a repository.
1361 Return a Repo object for the first parent directory that looks like a
1362 Git repository.
1364 Args:
1365 start: The directory to start discovery from (defaults to '.')
1366 """
1367 remaining = True
1368 path = os.path.abspath(start)
1369 while remaining:
1370 try:
1371 return cls(path)
1372 except NotGitRepository:
1373 path, remaining = os.path.split(path)
1374 raise NotGitRepository(
1375 "No git repository was found at {path}".format(**dict(path=start))
1376 )
1378 def controldir(self) -> str:
1379 """Return the path of the control directory."""
1380 return self._controldir
1382 def commondir(self) -> str:
1383 """Return the path of the common directory.
1385 For a main working tree, it is identical to controldir().
1387 For a linked working tree, it is the control directory of the
1388 main working tree.
1389 """
1390 return self._commondir
1392 def _determine_file_mode(self) -> bool:
1393 """Probe the file-system to determine whether permissions can be trusted.
1395 Returns: True if permissions can be trusted, False otherwise.
1396 """
1397 fname = os.path.join(self.path, ".probe-permissions")
1398 with open(fname, "w") as f:
1399 f.write("")
1401 st1 = os.lstat(fname)
1402 try:
1403 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR)
1404 except PermissionError:
1405 return False
1406 st2 = os.lstat(fname)
1408 os.unlink(fname)
1410 mode_differs = st1.st_mode != st2.st_mode
1411 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0
1413 return mode_differs and st2_has_exec
1415 def _determine_symlinks(self) -> bool:
1416 """Probe the filesystem to determine whether symlinks can be created.
1418 Returns: True if symlinks can be created, False otherwise.
1419 """
1420 # TODO(jelmer): Actually probe disk / look at filesystem
1421 return sys.platform != "win32"
1423 def _put_named_file(self, path: str, contents: bytes) -> None:
1424 """Write a file to the control dir with the given name and contents.
1426 Args:
1427 path: The path to the file, relative to the control dir.
1428 contents: A string to write to the file.
1429 """
1430 path = path.lstrip(os.path.sep)
1431 with GitFile(os.path.join(self.controldir(), path), "wb") as f:
1432 f.write(contents)
1434 def _del_named_file(self, path: str) -> None:
1435 try:
1436 os.unlink(os.path.join(self.controldir(), path))
1437 except FileNotFoundError:
1438 return
1440 def get_named_file(self, path, basedir=None):
1441 """Get a file from the control dir with a specific name.
1443 Although the filename should be interpreted as a filename relative to
1444 the control dir in a disk-based Repo, the object returned need not be
1445 pointing to a file in that location.
1447 Args:
1448 path: The path to the file, relative to the control dir.
1449 basedir: Optional argument that specifies an alternative to the
1450 control dir.
1451 Returns: An open file object, or None if the file does not exist.
1452 """
1453 # TODO(dborowitz): sanitize filenames, since this is used directly by
1454 # the dumb web serving code.
1455 if basedir is None:
1456 basedir = self.controldir()
1457 path = path.lstrip(os.path.sep)
1458 try:
1459 return open(os.path.join(basedir, path), "rb")
1460 except FileNotFoundError:
1461 return None
1463 def index_path(self):
1464 """Return path to the index file."""
1465 return os.path.join(self.controldir(), INDEX_FILENAME)
1467 def open_index(self) -> "Index":
1468 """Open the index for this repository.
1470 Raises:
1471 NoIndexPresent: If no index is present
1472 Returns: The matching `Index`
1473 """
1474 from .index import Index
1476 if not self.has_index():
1477 raise NoIndexPresent
1479 # Check for manyFiles feature configuration
1480 config = self.get_config_stack()
1481 many_files = config.get_boolean(b"feature", b"manyFiles", False)
1482 skip_hash = False
1483 index_version = None
1485 if many_files:
1486 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true
1487 try:
1488 index_version_str = config.get(b"index", b"version")
1489 index_version = int(index_version_str)
1490 except KeyError:
1491 index_version = 4 # Default to version 4 for manyFiles
1492 skip_hash = config.get_boolean(b"index", b"skipHash", True)
1493 else:
1494 # Check for explicit index settings
1495 try:
1496 index_version_str = config.get(b"index", b"version")
1497 index_version = int(index_version_str)
1498 except KeyError:
1499 index_version = None
1500 skip_hash = config.get_boolean(b"index", b"skipHash", False)
1502 return Index(self.index_path(), skip_hash=skip_hash, version=index_version)
1504 def has_index(self) -> bool:
1505 """Check if an index is present."""
1506 # Bare repos must never have index files; non-bare repos may have a
1507 # missing index file, which is treated as empty.
1508 return not self.bare
1510 @replace_me(remove_in="0.26.0")
1511 def stage(
1512 self,
1513 fs_paths: Union[
1514 str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]]
1515 ],
1516 ) -> None:
1517 """Stage a set of paths.
1519 Args:
1520 fs_paths: List of paths, relative to the repository path
1521 """
1522 return self.get_worktree().stage(fs_paths)
1524 @replace_me(remove_in="0.26.0")
1525 def unstage(self, fs_paths: list[str]) -> None:
1526 """Unstage specific file in the index.
1528 Args:
1529 fs_paths: a list of files to unstage,
1530 relative to the repository path.
1531 """
1532 return self.get_worktree().unstage(fs_paths)
1534 def clone(
1535 self,
1536 target_path,
1537 *,
1538 mkdir=True,
1539 bare=False,
1540 origin=b"origin",
1541 checkout=None,
1542 branch=None,
1543 progress=None,
1544 depth: Optional[int] = None,
1545 symlinks=None,
1546 ) -> "Repo":
1547 """Clone this repository.
1549 Args:
1550 target_path: Target path
1551 mkdir: Create the target directory
1552 bare: Whether to create a bare repository
1553 checkout: Whether or not to check-out HEAD after cloning
1554 origin: Base name for refs in target repository
1555 cloned from this repository
1556 branch: Optional branch or tag to be used as HEAD in the new repository
1557 instead of this repository's HEAD.
1558 progress: Optional progress function
1559 depth: Depth at which to fetch
1560 symlinks: Symlinks setting (default to autodetect)
1561 Returns: Created repository as `Repo`
1562 """
1563 encoded_path = os.fsencode(self.path)
1565 if mkdir:
1566 os.mkdir(target_path)
1568 try:
1569 if not bare:
1570 target = Repo.init(target_path, symlinks=symlinks)
1571 if checkout is None:
1572 checkout = True
1573 else:
1574 if checkout:
1575 raise ValueError("checkout and bare are incompatible")
1576 target = Repo.init_bare(target_path)
1578 try:
1579 target_config = target.get_config()
1580 target_config.set((b"remote", origin), b"url", encoded_path)
1581 target_config.set(
1582 (b"remote", origin),
1583 b"fetch",
1584 b"+refs/heads/*:refs/remotes/" + origin + b"/*",
1585 )
1586 target_config.write_to_path()
1588 ref_message = b"clone: from " + encoded_path
1589 self.fetch(target, depth=depth)
1590 target.refs.import_refs(
1591 b"refs/remotes/" + origin,
1592 self.refs.as_dict(b"refs/heads"),
1593 message=ref_message,
1594 )
1595 target.refs.import_refs(
1596 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message
1597 )
1599 head_chain, origin_sha = self.refs.follow(b"HEAD")
1600 origin_head = head_chain[-1] if head_chain else None
1601 if origin_sha and not origin_head:
1602 # set detached HEAD
1603 target.refs[b"HEAD"] = origin_sha
1604 else:
1605 _set_origin_head(target.refs, origin, origin_head)
1606 head_ref = _set_default_branch(
1607 target.refs, origin, origin_head, branch, ref_message
1608 )
1610 # Update target head
1611 if head_ref:
1612 head = _set_head(target.refs, head_ref, ref_message)
1613 else:
1614 head = None
1616 if checkout and head is not None:
1617 target.get_worktree().reset_index()
1618 except BaseException:
1619 target.close()
1620 raise
1621 except BaseException:
1622 if mkdir:
1623 import shutil
1625 shutil.rmtree(target_path)
1626 raise
1627 return target
1629 @replace_me(remove_in="0.26.0")
1630 def reset_index(self, tree: Optional[bytes] = None):
1631 """Reset the index back to a specific tree.
1633 Args:
1634 tree: Tree SHA to reset to, None for current HEAD tree.
1635 """
1636 return self.get_worktree().reset_index(tree)
1638 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]:
1639 """Get condition matchers for includeIf conditions.
1641 Returns a dict of condition prefix to matcher function.
1642 """
1643 from pathlib import Path
1645 from .config import ConditionMatcher, match_glob_pattern
1647 # Add gitdir matchers
1648 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool:
1649 """Match gitdir against a pattern.
1651 Args:
1652 pattern: Pattern to match against
1653 case_sensitive: Whether to match case-sensitively
1655 Returns:
1656 True if gitdir matches pattern
1657 """
1658 # Handle relative patterns (starting with ./)
1659 if pattern.startswith("./"):
1660 # Can't handle relative patterns without config directory context
1661 return False
1663 # Normalize repository path
1664 try:
1665 repo_path = str(Path(self._controldir).resolve())
1666 except (OSError, ValueError):
1667 return False
1669 # Expand ~ in pattern and normalize
1670 pattern = os.path.expanduser(pattern)
1672 # Normalize pattern following Git's rules
1673 pattern = pattern.replace("\\", "/")
1674 if not pattern.startswith(("~/", "./", "/", "**")):
1675 # Check for Windows absolute path
1676 if len(pattern) >= 2 and pattern[1] == ":":
1677 pass
1678 else:
1679 pattern = "**/" + pattern
1680 if pattern.endswith("/"):
1681 pattern = pattern + "**"
1683 # Use the existing _match_gitdir_pattern function
1684 from .config import _match_gitdir_pattern
1686 pattern_bytes = pattern.encode("utf-8", errors="replace")
1687 repo_path_bytes = repo_path.encode("utf-8", errors="replace")
1689 return _match_gitdir_pattern(
1690 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive
1691 )
1693 # Add onbranch matcher
1694 def match_onbranch(pattern: str) -> bool:
1695 """Match current branch against a pattern.
1697 Args:
1698 pattern: Pattern to match against
1700 Returns:
1701 True if current branch matches pattern
1702 """
1703 try:
1704 # Get the current branch using refs
1705 ref_chain, _ = self.refs.follow(b"HEAD")
1706 head_ref = ref_chain[-1] # Get the final resolved ref
1707 except KeyError:
1708 pass
1709 else:
1710 if head_ref and head_ref.startswith(b"refs/heads/"):
1711 # Extract branch name from ref
1712 branch = head_ref[11:].decode("utf-8", errors="replace")
1713 return match_glob_pattern(branch, pattern)
1714 return False
1716 matchers: dict[str, ConditionMatcher] = {
1717 "onbranch:": match_onbranch,
1718 "gitdir:": lambda pattern: match_gitdir(pattern, True),
1719 "gitdir/i:": lambda pattern: match_gitdir(pattern, False),
1720 }
1722 return matchers
1724 def get_worktree_config(self) -> "ConfigFile":
1725 """Get the worktree-specific config.
1727 Returns:
1728 ConfigFile object for the worktree config
1729 """
1730 from .config import ConfigFile
1732 path = os.path.join(self.commondir(), "config.worktree")
1733 try:
1734 # Pass condition matchers for includeIf evaluation
1735 condition_matchers = self._get_config_condition_matchers()
1736 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1737 except FileNotFoundError:
1738 cf = ConfigFile()
1739 cf.path = path
1740 return cf
1742 def get_config(self) -> "ConfigFile":
1743 """Retrieve the config object.
1745 Returns: `ConfigFile` object for the ``.git/config`` file.
1746 """
1747 from .config import ConfigFile
1749 path = os.path.join(self._commondir, "config")
1750 try:
1751 # Pass condition matchers for includeIf evaluation
1752 condition_matchers = self._get_config_condition_matchers()
1753 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1754 except FileNotFoundError:
1755 ret = ConfigFile()
1756 ret.path = path
1757 return ret
1759 def get_rebase_state_manager(self):
1760 """Get the appropriate rebase state manager for this repository.
1762 Returns: DiskRebaseStateManager instance
1763 """
1764 import os
1766 from .rebase import DiskRebaseStateManager
1768 path = os.path.join(self.controldir(), "rebase-merge")
1769 return DiskRebaseStateManager(path)
1771 def get_description(self):
1772 """Retrieve the description of this repository.
1774 Returns: A string describing the repository or None.
1775 """
1776 path = os.path.join(self._controldir, "description")
1777 try:
1778 with GitFile(path, "rb") as f:
1779 return f.read()
1780 except FileNotFoundError:
1781 return None
1783 def __repr__(self) -> str:
1784 """Return string representation of this repository."""
1785 return f"<Repo at {self.path!r}>"
1787 def set_description(self, description) -> None:
1788 """Set the description for this repository.
1790 Args:
1791 description: Text to set as description for this repository.
1792 """
1793 self._put_named_file("description", description)
1795 @classmethod
1796 def _init_maybe_bare(
1797 cls,
1798 path: Union[str, bytes, os.PathLike],
1799 controldir: Union[str, bytes, os.PathLike],
1800 bare,
1801 object_store=None,
1802 config=None,
1803 default_branch=None,
1804 symlinks: Optional[bool] = None,
1805 format: Optional[int] = None,
1806 ):
1807 path = os.fspath(path)
1808 if isinstance(path, bytes):
1809 path = os.fsdecode(path)
1810 controldir = os.fspath(controldir)
1811 if isinstance(controldir, bytes):
1812 controldir = os.fsdecode(controldir)
1813 for d in BASE_DIRECTORIES:
1814 os.mkdir(os.path.join(controldir, *d))
1815 if object_store is None:
1816 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR))
1817 ret = cls(path, bare=bare, object_store=object_store)
1818 if default_branch is None:
1819 if config is None:
1820 from .config import StackedConfig
1822 config = StackedConfig.default()
1823 try:
1824 default_branch = config.get("init", "defaultBranch")
1825 except KeyError:
1826 default_branch = DEFAULT_BRANCH
1827 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch)
1828 ret._init_files(bare=bare, symlinks=symlinks, format=format)
1829 return ret
1831 @classmethod
1832 def init(
1833 cls,
1834 path: Union[str, bytes, os.PathLike],
1835 *,
1836 mkdir: bool = False,
1837 config=None,
1838 default_branch=None,
1839 symlinks: Optional[bool] = None,
1840 format: Optional[int] = None,
1841 ) -> "Repo":
1842 """Create a new repository.
1844 Args:
1845 path: Path in which to create the repository
1846 mkdir: Whether to create the directory
1847 config: Configuration object
1848 default_branch: Default branch name
1849 symlinks: Whether to support symlinks
1850 format: Repository format version (defaults to 0)
1851 Returns: `Repo` instance
1852 """
1853 path = os.fspath(path)
1854 if isinstance(path, bytes):
1855 path = os.fsdecode(path)
1856 if mkdir:
1857 os.mkdir(path)
1858 controldir = os.path.join(path, CONTROLDIR)
1859 os.mkdir(controldir)
1860 _set_filesystem_hidden(controldir)
1861 return cls._init_maybe_bare(
1862 path,
1863 controldir,
1864 False,
1865 config=config,
1866 default_branch=default_branch,
1867 symlinks=symlinks,
1868 format=format,
1869 )
1871 @classmethod
1872 def _init_new_working_directory(
1873 cls,
1874 path: Union[str, bytes, os.PathLike],
1875 main_repo,
1876 identifier=None,
1877 mkdir=False,
1878 ):
1879 """Create a new working directory linked to a repository.
1881 Args:
1882 path: Path in which to create the working tree.
1883 main_repo: Main repository to reference
1884 identifier: Worktree identifier
1885 mkdir: Whether to create the directory
1886 Returns: `Repo` instance
1887 """
1888 path = os.fspath(path)
1889 if isinstance(path, bytes):
1890 path = os.fsdecode(path)
1891 if mkdir:
1892 os.mkdir(path)
1893 if identifier is None:
1894 identifier = os.path.basename(path)
1895 # Ensure we use absolute path for the worktree control directory
1896 main_controldir = os.path.abspath(main_repo.controldir())
1897 main_worktreesdir = os.path.join(main_controldir, WORKTREES)
1898 worktree_controldir = os.path.join(main_worktreesdir, identifier)
1899 gitdirfile = os.path.join(path, CONTROLDIR)
1900 with open(gitdirfile, "wb") as f:
1901 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n")
1902 try:
1903 os.mkdir(main_worktreesdir)
1904 except FileExistsError:
1905 pass
1906 try:
1907 os.mkdir(worktree_controldir)
1908 except FileExistsError:
1909 pass
1910 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f:
1911 f.write(os.fsencode(gitdirfile) + b"\n")
1912 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f:
1913 f.write(b"../..\n")
1914 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f:
1915 f.write(main_repo.head() + b"\n")
1916 r = cls(os.path.normpath(path))
1917 r.get_worktree().reset_index()
1918 return r
1920 @classmethod
1921 def init_bare(
1922 cls,
1923 path: Union[str, bytes, os.PathLike],
1924 *,
1925 mkdir=False,
1926 object_store=None,
1927 config=None,
1928 default_branch=None,
1929 format: Optional[int] = None,
1930 ):
1931 """Create a new bare repository.
1933 ``path`` should already exist and be an empty directory.
1935 Args:
1936 path: Path to create bare repository in
1937 mkdir: Whether to create the directory
1938 object_store: Object store to use
1939 config: Configuration object
1940 default_branch: Default branch name
1941 format: Repository format version (defaults to 0)
1942 Returns: a `Repo` instance
1943 """
1944 path = os.fspath(path)
1945 if isinstance(path, bytes):
1946 path = os.fsdecode(path)
1947 if mkdir:
1948 os.mkdir(path)
1949 return cls._init_maybe_bare(
1950 path,
1951 path,
1952 True,
1953 object_store=object_store,
1954 config=config,
1955 default_branch=default_branch,
1956 format=format,
1957 )
1959 create = init_bare
1961 def close(self) -> None:
1962 """Close any files opened by this repository."""
1963 self.object_store.close()
1965 def __enter__(self):
1966 """Enter context manager."""
1967 return self
1969 def __exit__(self, exc_type, exc_val, exc_tb):
1970 """Exit context manager and close repository."""
1971 self.close()
1973 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]:
1974 """Read .gitattributes file from working tree.
1976 Returns:
1977 Dictionary mapping file patterns to attributes
1978 """
1979 gitattributes = {}
1980 gitattributes_path = os.path.join(self.path, ".gitattributes")
1982 if os.path.exists(gitattributes_path):
1983 with open(gitattributes_path, "rb") as f:
1984 for line in f:
1985 line = line.strip()
1986 if not line or line.startswith(b"#"):
1987 continue
1989 parts = line.split()
1990 if len(parts) < 2:
1991 continue
1993 pattern = parts[0]
1994 attrs = {}
1996 for attr in parts[1:]:
1997 if attr.startswith(b"-"):
1998 # Unset attribute
1999 attrs[attr[1:]] = b"false"
2000 elif b"=" in attr:
2001 # Set to value
2002 key, value = attr.split(b"=", 1)
2003 attrs[key] = value
2004 else:
2005 # Set attribute
2006 attrs[attr] = b"true"
2008 gitattributes[pattern] = attrs
2010 return gitattributes
2012 def get_blob_normalizer(self):
2013 """Return a BlobNormalizer object."""
2014 from .filters import FilterBlobNormalizer, FilterRegistry
2016 # Get proper GitAttributes object
2017 git_attributes = self.get_gitattributes()
2018 config_stack = self.get_config_stack()
2020 # Create FilterRegistry with repo reference
2021 filter_registry = FilterRegistry(config_stack, self)
2023 # Return FilterBlobNormalizer which handles all filters including line endings
2024 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self)
2026 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
2027 """Read gitattributes for the repository.
2029 Args:
2030 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
2032 Returns:
2033 GitAttributes object that can be used to match paths
2034 """
2035 from .attrs import (
2036 GitAttributes,
2037 Pattern,
2038 parse_git_attributes,
2039 )
2041 patterns = []
2043 # Read system gitattributes (TODO: implement this)
2044 # Read global gitattributes (TODO: implement this)
2046 # Read repository .gitattributes from index/tree
2047 if tree is None:
2048 try:
2049 # Try to get from HEAD
2050 head = self[b"HEAD"]
2051 if isinstance(head, Tag):
2052 _cls, obj = head.object
2053 head = self.get_object(obj)
2054 assert isinstance(head, Commit)
2055 tree = head.tree
2056 except KeyError:
2057 # No HEAD, no attributes from tree
2058 pass
2060 if tree is not None:
2061 try:
2062 tree_obj = self[tree]
2063 assert isinstance(tree_obj, Tree)
2064 if b".gitattributes" in tree_obj:
2065 _, attrs_sha = tree_obj[b".gitattributes"]
2066 attrs_blob = self[attrs_sha]
2067 if isinstance(attrs_blob, Blob):
2068 attrs_data = BytesIO(attrs_blob.data)
2069 for pattern_bytes, attrs in parse_git_attributes(attrs_data):
2070 pattern = Pattern(pattern_bytes)
2071 patterns.append((pattern, attrs))
2072 except (KeyError, NotTreeError):
2073 pass
2075 # Read .git/info/attributes
2076 info_attrs_path = os.path.join(self.controldir(), "info", "attributes")
2077 if os.path.exists(info_attrs_path):
2078 with open(info_attrs_path, "rb") as f:
2079 for pattern_bytes, attrs in parse_git_attributes(f):
2080 pattern = Pattern(pattern_bytes)
2081 patterns.append((pattern, attrs))
2083 # Read .gitattributes from working directory (if it exists)
2084 working_attrs_path = os.path.join(self.path, ".gitattributes")
2085 if os.path.exists(working_attrs_path):
2086 with open(working_attrs_path, "rb") as f:
2087 for pattern_bytes, attrs in parse_git_attributes(f):
2088 pattern = Pattern(pattern_bytes)
2089 patterns.append((pattern, attrs))
2091 return GitAttributes(patterns)
2093 @replace_me(remove_in="0.26.0")
2094 def _sparse_checkout_file_path(self) -> str:
2095 """Return the path of the sparse-checkout file in this repo's control dir."""
2096 return self.get_worktree()._sparse_checkout_file_path()
2098 @replace_me(remove_in="0.26.0")
2099 def configure_for_cone_mode(self) -> None:
2100 """Ensure the repository is configured for cone-mode sparse-checkout."""
2101 return self.get_worktree().configure_for_cone_mode()
2103 @replace_me(remove_in="0.26.0")
2104 def infer_cone_mode(self) -> bool:
2105 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
2106 return self.get_worktree().infer_cone_mode()
2108 @replace_me(remove_in="0.26.0")
2109 def get_sparse_checkout_patterns(self) -> list[str]:
2110 """Return a list of sparse-checkout patterns from info/sparse-checkout.
2112 Returns:
2113 A list of patterns. Returns an empty list if the file is missing.
2114 """
2115 return self.get_worktree().get_sparse_checkout_patterns()
2117 @replace_me(remove_in="0.26.0")
2118 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None:
2119 """Write the given sparse-checkout patterns into info/sparse-checkout.
2121 Creates the info/ directory if it does not exist.
2123 Args:
2124 patterns: A list of gitignore-style patterns to store.
2125 """
2126 return self.get_worktree().set_sparse_checkout_patterns(patterns)
2128 @replace_me(remove_in="0.26.0")
2129 def set_cone_mode_patterns(self, dirs: Union[list[str], None] = None) -> None:
2130 """Write the given cone-mode directory patterns into info/sparse-checkout.
2132 For each directory to include, add an inclusion line that "undoes" the prior
2133 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
2134 Never add the same line twice.
2135 """
2136 return self.get_worktree().set_cone_mode_patterns(dirs)
2139class MemoryRepo(BaseRepo):
2140 """Repo that stores refs, objects, and named files in memory.
2142 MemoryRepos are always bare: they have no working tree and no index, since
2143 those have a stronger dependency on the filesystem.
2144 """
2146 def __init__(self) -> None:
2147 """Create a new repository in memory."""
2148 from .config import ConfigFile
2150 self._reflog: list[Any] = []
2151 refs_container = DictRefsContainer({}, logger=self._append_reflog)
2152 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) # type: ignore[arg-type]
2153 self._named_files: dict[str, bytes] = {}
2154 self.bare = True
2155 self._config = ConfigFile()
2156 self._description = None
2158 def _append_reflog(self, *args) -> None:
2159 self._reflog.append(args)
2161 def set_description(self, description) -> None:
2162 """Set the description for this repository.
2164 Args:
2165 description: Text to set as description
2166 """
2167 self._description = description
2169 def get_description(self):
2170 """Get the description of this repository.
2172 Returns:
2173 Repository description as bytes
2174 """
2175 return self._description
2177 def _determine_file_mode(self):
2178 """Probe the file-system to determine whether permissions can be trusted.
2180 Returns: True if permissions can be trusted, False otherwise.
2181 """
2182 return sys.platform != "win32"
2184 def _determine_symlinks(self):
2185 """Probe the file-system to determine whether permissions can be trusted.
2187 Returns: True if permissions can be trusted, False otherwise.
2188 """
2189 return sys.platform != "win32"
2191 def _put_named_file(self, path, contents) -> None:
2192 """Write a file to the control dir with the given name and contents.
2194 Args:
2195 path: The path to the file, relative to the control dir.
2196 contents: A string to write to the file.
2197 """
2198 self._named_files[path] = contents
2200 def _del_named_file(self, path) -> None:
2201 try:
2202 del self._named_files[path]
2203 except KeyError:
2204 pass
2206 def get_named_file(self, path, basedir=None):
2207 """Get a file from the control dir with a specific name.
2209 Although the filename should be interpreted as a filename relative to
2210 the control dir in a disk-baked Repo, the object returned need not be
2211 pointing to a file in that location.
2213 Args:
2214 path: The path to the file, relative to the control dir.
2215 basedir: Optional base directory for the path
2216 Returns: An open file object, or None if the file does not exist.
2217 """
2218 contents = self._named_files.get(path, None)
2219 if contents is None:
2220 return None
2221 return BytesIO(contents)
2223 def open_index(self) -> "Index":
2224 """Fail to open index for this repo, since it is bare.
2226 Raises:
2227 NoIndexPresent: Raised when no index is present
2228 """
2229 raise NoIndexPresent
2231 def get_config(self):
2232 """Retrieve the config object.
2234 Returns: `ConfigFile` object.
2235 """
2236 return self._config
2238 def get_rebase_state_manager(self):
2239 """Get the appropriate rebase state manager for this repository.
2241 Returns: MemoryRebaseStateManager instance
2242 """
2243 from .rebase import MemoryRebaseStateManager
2245 return MemoryRebaseStateManager(self)
2247 def get_blob_normalizer(self):
2248 """Return a BlobNormalizer object for checkin/checkout operations."""
2249 from .filters import FilterBlobNormalizer, FilterRegistry
2251 # Get GitAttributes object
2252 git_attributes = self.get_gitattributes()
2253 config_stack = self.get_config_stack()
2255 # Create FilterRegistry with repo reference
2256 filter_registry = FilterRegistry(config_stack, self)
2258 # Return FilterBlobNormalizer which handles all filters
2259 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self)
2261 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
2262 """Read gitattributes for the repository."""
2263 from .attrs import GitAttributes
2265 # Memory repos don't have working trees or gitattributes files
2266 # Return empty GitAttributes
2267 return GitAttributes([])
2269 def do_commit(
2270 self,
2271 message: Optional[bytes] = None,
2272 committer: Optional[bytes] = None,
2273 author: Optional[bytes] = None,
2274 commit_timestamp=None,
2275 commit_timezone=None,
2276 author_timestamp=None,
2277 author_timezone=None,
2278 tree: Optional[ObjectID] = None,
2279 encoding: Optional[bytes] = None,
2280 ref: Optional[Ref] = b"HEAD",
2281 merge_heads: Optional[list[ObjectID]] = None,
2282 no_verify: bool = False,
2283 sign: bool = False,
2284 ):
2285 """Create a new commit.
2287 This is a simplified implementation for in-memory repositories that
2288 doesn't support worktree operations or hooks.
2290 Args:
2291 message: Commit message
2292 committer: Committer fullname
2293 author: Author fullname
2294 commit_timestamp: Commit timestamp (defaults to now)
2295 commit_timezone: Commit timestamp timezone (defaults to GMT)
2296 author_timestamp: Author timestamp (defaults to commit timestamp)
2297 author_timezone: Author timestamp timezone (defaults to commit timezone)
2298 tree: SHA1 of the tree root to use
2299 encoding: Encoding
2300 ref: Optional ref to commit to (defaults to current branch).
2301 If None, creates a dangling commit without updating any ref.
2302 merge_heads: Merge heads
2303 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo)
2304 sign: GPG Sign the commit (ignored for MemoryRepo)
2306 Returns:
2307 New commit SHA1
2308 """
2309 import time
2311 from .objects import Commit
2313 if tree is None:
2314 raise ValueError("tree must be specified for MemoryRepo")
2316 c = Commit()
2317 if len(tree) != 40:
2318 raise ValueError("tree must be a 40-byte hex sha string")
2319 c.tree = tree
2321 config = self.get_config_stack()
2322 if merge_heads is None:
2323 merge_heads = []
2324 if committer is None:
2325 committer = get_user_identity(config, kind="COMMITTER")
2326 check_user_identity(committer)
2327 c.committer = committer
2328 if commit_timestamp is None:
2329 commit_timestamp = time.time()
2330 c.commit_time = int(commit_timestamp)
2331 if commit_timezone is None:
2332 commit_timezone = 0
2333 c.commit_timezone = commit_timezone
2334 if author is None:
2335 author = get_user_identity(config, kind="AUTHOR")
2336 c.author = author
2337 check_user_identity(author)
2338 if author_timestamp is None:
2339 author_timestamp = commit_timestamp
2340 c.author_time = int(author_timestamp)
2341 if author_timezone is None:
2342 author_timezone = commit_timezone
2343 c.author_timezone = author_timezone
2344 if encoding is None:
2345 try:
2346 encoding = config.get(("i18n",), "commitEncoding")
2347 except KeyError:
2348 pass
2349 if encoding is not None:
2350 c.encoding = encoding
2352 # Handle message (for MemoryRepo, we don't support callable messages)
2353 if callable(message):
2354 message = message(self, c)
2355 if message is None:
2356 raise ValueError("Message callback returned None")
2358 if message is None:
2359 raise ValueError("No commit message specified")
2361 c.message = message
2363 if ref is None:
2364 # Create a dangling commit
2365 c.parents = merge_heads
2366 self.object_store.add_object(c)
2367 else:
2368 try:
2369 old_head = self.refs[ref]
2370 c.parents = [old_head, *merge_heads]
2371 self.object_store.add_object(c)
2372 ok = self.refs.set_if_equals(
2373 ref,
2374 old_head,
2375 c.id,
2376 message=b"commit: " + message,
2377 committer=committer,
2378 timestamp=commit_timestamp,
2379 timezone=commit_timezone,
2380 )
2381 except KeyError:
2382 c.parents = merge_heads
2383 self.object_store.add_object(c)
2384 ok = self.refs.add_if_new(
2385 ref,
2386 c.id,
2387 message=b"commit: " + message,
2388 committer=committer,
2389 timestamp=commit_timestamp,
2390 timezone=commit_timezone,
2391 )
2392 if not ok:
2393 from .errors import CommitError
2395 raise CommitError(f"{ref!r} changed during commit")
2397 return c.id
2399 @classmethod
2400 def init_bare(cls, objects, refs, format: Optional[int] = None):
2401 """Create a new bare repository in memory.
2403 Args:
2404 objects: Objects for the new repository,
2405 as iterable
2406 refs: Refs as dictionary, mapping names
2407 to object SHA1s
2408 format: Repository format version (defaults to 0)
2409 """
2410 ret = cls()
2411 for obj in objects:
2412 ret.object_store.add_object(obj)
2413 for refname, sha in refs.items():
2414 ret.refs.add_if_new(refname, sha)
2415 ret._init_files(bare=True, format=format)
2416 return ret