Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/dulwich/repo.py: 39%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# repo.py -- For dealing with git repositories.
2# Copyright (C) 2007 James Westby <jw+debian@jameswestby.net>
3# Copyright (C) 2008-2013 Jelmer Vernooij <jelmer@jelmer.uk>
4#
5# SPDX-License-Identifier: Apache-2.0 OR GPL-2.0-or-later
6# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
7# General Public License as published by the Free Software Foundation; version 2.0
8# or (at your option) any later version. You can redistribute it and/or
9# modify it under the terms of either of these two licenses.
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17# You should have received a copy of the licenses; if not, see
18# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
19# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
20# License, Version 2.0.
21#
24"""Repository access.
26This module contains the base class for git repositories
27(BaseRepo) and an implementation which uses a repository on
28local disk (Repo).
30"""
32import os
33import stat
34import sys
35import time
36import warnings
37from collections.abc import Iterable, Iterator
38from io import BytesIO
39from typing import (
40 TYPE_CHECKING,
41 Any,
42 BinaryIO,
43 Callable,
44 Optional,
45 TypeVar,
46 Union,
47)
49if TYPE_CHECKING:
50 # There are no circular imports here, but we try to defer imports as long
51 # as possible to reduce start-up time for anything that doesn't need
52 # these imports.
53 from .attrs import GitAttributes
54 from .config import ConditionMatcher, ConfigFile, StackedConfig
55 from .index import Index
56 from .line_ending import BlobNormalizer
57 from .notes import Notes
58 from .object_store import BaseObjectStore, GraphWalker, UnpackedObject
59 from .rebase import RebaseStateManager
60 from .walk import Walker
61 from .worktree import WorkTree
63from . import replace_me
64from .errors import (
65 NoIndexPresent,
66 NotBlobError,
67 NotCommitError,
68 NotGitRepository,
69 NotTagError,
70 NotTreeError,
71 RefFormatError,
72)
73from .file import GitFile
74from .hooks import (
75 CommitMsgShellHook,
76 Hook,
77 PostCommitShellHook,
78 PostReceiveShellHook,
79 PreCommitShellHook,
80)
81from .object_store import (
82 DiskObjectStore,
83 MemoryObjectStore,
84 MissingObjectFinder,
85 ObjectStoreGraphWalker,
86 PackBasedObjectStore,
87 find_shallow,
88 peel_sha,
89)
90from .objects import (
91 Blob,
92 Commit,
93 ObjectID,
94 ShaFile,
95 Tag,
96 Tree,
97 check_hexsha,
98 valid_hexsha,
99)
100from .pack import generate_unpacked_objects
101from .refs import (
102 ANNOTATED_TAG_SUFFIX, # noqa: F401
103 LOCAL_BRANCH_PREFIX,
104 LOCAL_TAG_PREFIX, # noqa: F401
105 SYMREF, # noqa: F401
106 DictRefsContainer,
107 DiskRefsContainer,
108 InfoRefsContainer, # noqa: F401
109 Ref,
110 RefsContainer,
111 _set_default_branch,
112 _set_head,
113 _set_origin_head,
114 check_ref_format, # noqa: F401
115 is_per_worktree_ref,
116 read_packed_refs, # noqa: F401
117 read_packed_refs_with_peeled, # noqa: F401
118 serialize_refs,
119 write_packed_refs, # noqa: F401
120)
122CONTROLDIR = ".git"
123OBJECTDIR = "objects"
125T = TypeVar("T", bound="ShaFile")
126REFSDIR = "refs"
127REFSDIR_TAGS = "tags"
128REFSDIR_HEADS = "heads"
129INDEX_FILENAME = "index"
130COMMONDIR = "commondir"
131GITDIR = "gitdir"
132WORKTREES = "worktrees"
134BASE_DIRECTORIES = [
135 ["branches"],
136 [REFSDIR],
137 [REFSDIR, REFSDIR_TAGS],
138 [REFSDIR, REFSDIR_HEADS],
139 ["hooks"],
140 ["info"],
141]
143DEFAULT_BRANCH = b"master"
146class InvalidUserIdentity(Exception):
147 """User identity is not of the format 'user <email>'."""
149 def __init__(self, identity: str) -> None:
150 """Initialize InvalidUserIdentity exception."""
151 self.identity = identity
154class DefaultIdentityNotFound(Exception):
155 """Default identity could not be determined."""
158# TODO(jelmer): Cache?
159def _get_default_identity() -> tuple[str, str]:
160 import socket
162 for name in ("LOGNAME", "USER", "LNAME", "USERNAME"):
163 username = os.environ.get(name)
164 if username:
165 break
166 else:
167 username = None
169 try:
170 import pwd
171 except ImportError:
172 fullname = None
173 else:
174 try:
175 entry = pwd.getpwuid(os.getuid()) # type: ignore
176 except KeyError:
177 fullname = None
178 else:
179 if getattr(entry, "gecos", None):
180 fullname = entry.pw_gecos.split(",")[0]
181 else:
182 fullname = None
183 if username is None:
184 username = entry.pw_name
185 if not fullname:
186 if username is None:
187 raise DefaultIdentityNotFound("no username found")
188 fullname = username
189 email = os.environ.get("EMAIL")
190 if email is None:
191 if username is None:
192 raise DefaultIdentityNotFound("no username found")
193 email = f"{username}@{socket.gethostname()}"
194 return (fullname, email)
197def get_user_identity(config: "StackedConfig", kind: Optional[str] = None) -> bytes:
198 """Determine the identity to use for new commits.
200 If kind is set, this first checks
201 GIT_${KIND}_NAME and GIT_${KIND}_EMAIL.
203 If those variables are not set, then it will fall back
204 to reading the user.name and user.email settings from
205 the specified configuration.
207 If that also fails, then it will fall back to using
208 the current users' identity as obtained from the host
209 system (e.g. the gecos field, $EMAIL, $USER@$(hostname -f).
211 Args:
212 config: Configuration stack to read from
213 kind: Optional kind to return identity for,
214 usually either "AUTHOR" or "COMMITTER".
216 Returns:
217 A user identity
218 """
219 user: Optional[bytes] = None
220 email: Optional[bytes] = None
221 if kind:
222 user_uc = os.environ.get("GIT_" + kind + "_NAME")
223 if user_uc is not None:
224 user = user_uc.encode("utf-8")
225 email_uc = os.environ.get("GIT_" + kind + "_EMAIL")
226 if email_uc is not None:
227 email = email_uc.encode("utf-8")
228 if user is None:
229 try:
230 user = config.get(("user",), "name")
231 except KeyError:
232 user = None
233 if email is None:
234 try:
235 email = config.get(("user",), "email")
236 except KeyError:
237 email = None
238 default_user, default_email = _get_default_identity()
239 if user is None:
240 user = default_user.encode("utf-8")
241 if email is None:
242 email = default_email.encode("utf-8")
243 if email.startswith(b"<") and email.endswith(b">"):
244 email = email[1:-1]
245 return user + b" <" + email + b">"
248def check_user_identity(identity: bytes) -> None:
249 """Verify that a user identity is formatted correctly.
251 Args:
252 identity: User identity bytestring
253 Raises:
254 InvalidUserIdentity: Raised when identity is invalid
255 """
256 try:
257 fst, snd = identity.split(b" <", 1)
258 except ValueError as exc:
259 raise InvalidUserIdentity(identity.decode("utf-8", "replace")) from exc
260 if b">" not in snd:
261 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
262 if b"\0" in identity or b"\n" in identity:
263 raise InvalidUserIdentity(identity.decode("utf-8", "replace"))
266def parse_graftpoints(
267 graftpoints: Iterable[bytes],
268) -> dict[bytes, list[bytes]]:
269 """Convert a list of graftpoints into a dict.
271 Args:
272 graftpoints: Iterator of graftpoint lines
274 Each line is formatted as:
275 <commit sha1> <parent sha1> [<parent sha1>]*
277 Resulting dictionary is:
278 <commit sha1>: [<parent sha1>*]
280 https://git.wiki.kernel.org/index.php/GraftPoint
281 """
282 grafts = {}
283 for line in graftpoints:
284 raw_graft = line.split(None, 1)
286 commit = raw_graft[0]
287 if len(raw_graft) == 2:
288 parents = raw_graft[1].split()
289 else:
290 parents = []
292 for sha in [commit, *parents]:
293 check_hexsha(sha, "Invalid graftpoint")
295 grafts[commit] = parents
296 return grafts
299def serialize_graftpoints(graftpoints: dict[bytes, list[bytes]]) -> bytes:
300 """Convert a dictionary of grafts into string.
302 The graft dictionary is:
303 <commit sha1>: [<parent sha1>*]
305 Each line is formatted as:
306 <commit sha1> <parent sha1> [<parent sha1>]*
308 https://git.wiki.kernel.org/index.php/GraftPoint
310 """
311 graft_lines = []
312 for commit, parents in graftpoints.items():
313 if parents:
314 graft_lines.append(commit + b" " + b" ".join(parents))
315 else:
316 graft_lines.append(commit)
317 return b"\n".join(graft_lines)
320def _set_filesystem_hidden(path: str) -> None:
321 """Mark path as to be hidden if supported by platform and filesystem.
323 On win32 uses SetFileAttributesW api:
324 <https://docs.microsoft.com/windows/desktop/api/fileapi/nf-fileapi-setfileattributesw>
325 """
326 if sys.platform == "win32":
327 import ctypes
328 from ctypes.wintypes import BOOL, DWORD, LPCWSTR
330 FILE_ATTRIBUTE_HIDDEN = 2
331 SetFileAttributesW = ctypes.WINFUNCTYPE(BOOL, LPCWSTR, DWORD)(
332 ("SetFileAttributesW", ctypes.windll.kernel32)
333 )
335 if isinstance(path, bytes):
336 path = os.fsdecode(path)
337 if not SetFileAttributesW(path, FILE_ATTRIBUTE_HIDDEN):
338 pass # Could raise or log `ctypes.WinError()` here
340 # Could implement other platform specific filesystem hiding here
343class ParentsProvider:
344 """Provider for commit parent information."""
346 def __init__(
347 self,
348 store: "BaseObjectStore",
349 grafts: dict = {},
350 shallows: Iterable[bytes] = [],
351 ) -> None:
352 """Initialize ParentsProvider.
354 Args:
355 store: Object store to use
356 grafts: Graft information
357 shallows: Shallow commit SHAs
358 """
359 self.store = store
360 self.grafts = grafts
361 self.shallows = set(shallows)
363 # Get commit graph once at initialization for performance
364 self.commit_graph = store.get_commit_graph()
366 def get_parents(
367 self, commit_id: bytes, commit: Optional[Commit] = None
368 ) -> list[bytes]:
369 """Get parents for a commit using the parents provider."""
370 try:
371 return self.grafts[commit_id]
372 except KeyError:
373 pass
374 if commit_id in self.shallows:
375 return []
377 # Try to use commit graph for faster parent lookup
378 if self.commit_graph:
379 parents = self.commit_graph.get_parents(commit_id)
380 if parents is not None:
381 return parents
383 # Fallback to reading the commit object
384 if commit is None:
385 obj = self.store[commit_id]
386 assert isinstance(obj, Commit)
387 commit = obj
388 return commit.parents
391class BaseRepo:
392 """Base class for a git repository.
394 This base class is meant to be used for Repository implementations that e.g.
395 work on top of a different transport than a standard filesystem path.
397 Attributes:
398 object_store: Dictionary-like object for accessing
399 the objects
400 refs: Dictionary-like object with the refs in this
401 repository
402 """
404 def __init__(self, object_store: PackBasedObjectStore, refs: RefsContainer) -> None:
405 """Open a repository.
407 This shouldn't be called directly, but rather through one of the
408 base classes, such as MemoryRepo or Repo.
410 Args:
411 object_store: Object store to use
412 refs: Refs container to use
413 """
414 self.object_store = object_store
415 self.refs = refs
417 self._graftpoints: dict[bytes, list[bytes]] = {}
418 self.hooks: dict[str, Hook] = {}
420 def _determine_file_mode(self) -> bool:
421 """Probe the file-system to determine whether permissions can be trusted.
423 Returns: True if permissions can be trusted, False otherwise.
424 """
425 raise NotImplementedError(self._determine_file_mode)
427 def _determine_symlinks(self) -> bool:
428 """Probe the filesystem to determine whether symlinks can be created.
430 Returns: True if symlinks can be created, False otherwise.
431 """
432 # For now, just mimic the old behaviour
433 return sys.platform != "win32"
435 def _init_files(
436 self, bare: bool, symlinks: Optional[bool] = None, format: Optional[int] = None
437 ) -> None:
438 """Initialize a default set of named files."""
439 from .config import ConfigFile
441 self._put_named_file("description", b"Unnamed repository")
442 f = BytesIO()
443 cf = ConfigFile()
444 if format is None:
445 format = 0
446 if format not in (0, 1):
447 raise ValueError(f"Unsupported repository format version: {format}")
448 cf.set("core", "repositoryformatversion", str(format))
449 if self._determine_file_mode():
450 cf.set("core", "filemode", True)
451 else:
452 cf.set("core", "filemode", False)
454 if symlinks is None and not bare:
455 symlinks = self._determine_symlinks()
457 if symlinks is False:
458 cf.set("core", "symlinks", symlinks)
460 cf.set("core", "bare", bare)
461 cf.set("core", "logallrefupdates", True)
462 cf.write_to_file(f)
463 self._put_named_file("config", f.getvalue())
464 self._put_named_file(os.path.join("info", "exclude"), b"")
466 def get_named_file(self, path: str) -> Optional[BinaryIO]:
467 """Get a file from the control dir with a specific name.
469 Although the filename should be interpreted as a filename relative to
470 the control dir in a disk-based Repo, the object returned need not be
471 pointing to a file in that location.
473 Args:
474 path: The path to the file, relative to the control dir.
475 Returns: An open file object, or None if the file does not exist.
476 """
477 raise NotImplementedError(self.get_named_file)
479 def _put_named_file(self, path: str, contents: bytes) -> None:
480 """Write a file to the control dir with the given name and contents.
482 Args:
483 path: The path to the file, relative to the control dir.
484 contents: A string to write to the file.
485 """
486 raise NotImplementedError(self._put_named_file)
488 def _del_named_file(self, path: str) -> None:
489 """Delete a file in the control directory with the given name."""
490 raise NotImplementedError(self._del_named_file)
492 def open_index(self) -> "Index":
493 """Open the index for this repository.
495 Raises:
496 NoIndexPresent: If no index is present
497 Returns: The matching `Index`
498 """
499 raise NotImplementedError(self.open_index)
501 def fetch(
502 self,
503 target: "BaseRepo",
504 determine_wants: Optional[Callable] = None,
505 progress: Optional[Callable] = None,
506 depth: Optional[int] = None,
507 ) -> dict:
508 """Fetch objects into another repository.
510 Args:
511 target: The target repository
512 determine_wants: Optional function to determine what refs to
513 fetch.
514 progress: Optional progress function
515 depth: Optional shallow fetch depth
516 Returns: The local refs
517 """
518 if determine_wants is None:
519 determine_wants = target.object_store.determine_wants_all
520 count, pack_data = self.fetch_pack_data(
521 determine_wants,
522 target.get_graph_walker(),
523 progress=progress,
524 depth=depth,
525 )
526 target.object_store.add_pack_data(count, pack_data, progress)
527 return self.get_refs()
529 def fetch_pack_data(
530 self,
531 determine_wants: Callable,
532 graph_walker: "GraphWalker",
533 progress: Optional[Callable],
534 *,
535 get_tagged: Optional[Callable] = None,
536 depth: Optional[int] = None,
537 ) -> tuple:
538 """Fetch the pack data required for a set of revisions.
540 Args:
541 determine_wants: Function that takes a dictionary with heads
542 and returns the list of heads to fetch.
543 graph_walker: Object that can iterate over the list of revisions
544 to fetch and has an "ack" method that will be called to acknowledge
545 that a revision is present.
546 progress: Simple progress function that will be called with
547 updated progress strings.
548 get_tagged: Function that returns a dict of pointed-to sha ->
549 tag sha for including tags.
550 depth: Shallow fetch depth
551 Returns: count and iterator over pack data
552 """
553 missing_objects = self.find_missing_objects(
554 determine_wants, graph_walker, progress, get_tagged=get_tagged, depth=depth
555 )
556 if missing_objects is None:
557 return 0, iter([])
558 remote_has = missing_objects.get_remote_has()
559 object_ids = list(missing_objects)
560 return len(object_ids), generate_unpacked_objects(
561 self.object_store, object_ids, progress=progress, other_haves=remote_has
562 )
564 def find_missing_objects(
565 self,
566 determine_wants: Callable,
567 graph_walker: "GraphWalker",
568 progress: Optional[Callable],
569 *,
570 get_tagged: Optional[Callable] = None,
571 depth: Optional[int] = None,
572 ) -> Optional[MissingObjectFinder]:
573 """Fetch the missing objects required for a set of revisions.
575 Args:
576 determine_wants: Function that takes a dictionary with heads
577 and returns the list of heads to fetch.
578 graph_walker: Object that can iterate over the list of revisions
579 to fetch and has an "ack" method that will be called to acknowledge
580 that a revision is present.
581 progress: Simple progress function that will be called with
582 updated progress strings.
583 get_tagged: Function that returns a dict of pointed-to sha ->
584 tag sha for including tags.
585 depth: Shallow fetch depth
586 Returns: iterator over objects, with __len__ implemented
587 """
588 refs = serialize_refs(self.object_store, self.get_refs())
590 wants = determine_wants(refs)
591 if not isinstance(wants, list):
592 raise TypeError("determine_wants() did not return a list")
594 current_shallow = set(getattr(graph_walker, "shallow", set()))
596 if depth not in (None, 0):
597 assert depth is not None
598 shallow, not_shallow = find_shallow(self.object_store, wants, depth)
599 # Only update if graph_walker has shallow attribute
600 if hasattr(graph_walker, "shallow"):
601 graph_walker.shallow.update(shallow - not_shallow)
602 new_shallow = graph_walker.shallow - current_shallow
603 unshallow = graph_walker.unshallow = not_shallow & current_shallow # type: ignore[attr-defined]
604 if hasattr(graph_walker, "update_shallow"):
605 graph_walker.update_shallow(new_shallow, unshallow)
606 else:
607 unshallow = getattr(graph_walker, "unshallow", set())
609 if wants == []:
610 # TODO(dborowitz): find a way to short-circuit that doesn't change
611 # this interface.
613 if getattr(graph_walker, "shallow", set()) or unshallow:
614 # Do not send a pack in shallow short-circuit path
615 return None
617 class DummyMissingObjectFinder:
618 """Dummy finder that returns no missing objects."""
620 def get_remote_has(self) -> None:
621 """Get remote has (always returns None).
623 Returns:
624 None
625 """
626 return None
628 def __len__(self) -> int:
629 return 0
631 def __iter__(self) -> Iterator[tuple[bytes, Optional[bytes]]]:
632 yield from []
634 return DummyMissingObjectFinder() # type: ignore
636 # If the graph walker is set up with an implementation that can
637 # ACK/NAK to the wire, it will write data to the client through
638 # this call as a side-effect.
639 haves = self.object_store.find_common_revisions(graph_walker)
641 # Deal with shallow requests separately because the haves do
642 # not reflect what objects are missing
643 if getattr(graph_walker, "shallow", set()) or unshallow:
644 # TODO: filter the haves commits from iter_shas. the specific
645 # commits aren't missing.
646 haves = []
648 parents_provider = ParentsProvider(self.object_store, shallows=current_shallow)
650 def get_parents(commit: Commit) -> list[bytes]:
651 """Get parents for a commit using the parents provider.
653 Args:
654 commit: Commit object
656 Returns:
657 List of parent commit SHAs
658 """
659 return parents_provider.get_parents(commit.id, commit)
661 return MissingObjectFinder(
662 self.object_store,
663 haves=haves,
664 wants=wants,
665 shallow=getattr(graph_walker, "shallow", set()),
666 progress=progress,
667 get_tagged=get_tagged,
668 get_parents=get_parents,
669 )
671 def generate_pack_data(
672 self,
673 have: Iterable[ObjectID],
674 want: Iterable[ObjectID],
675 progress: Optional[Callable[[str], None]] = None,
676 ofs_delta: Optional[bool] = None,
677 ) -> tuple[int, Iterator["UnpackedObject"]]:
678 """Generate pack data objects for a set of wants/haves.
680 Args:
681 have: List of SHA1s of objects that should not be sent
682 want: List of SHA1s of objects that should be sent
683 ofs_delta: Whether OFS deltas can be included
684 progress: Optional progress reporting method
685 """
686 return self.object_store.generate_pack_data(
687 have,
688 want,
689 shallow=self.get_shallow(),
690 progress=progress,
691 ofs_delta=ofs_delta,
692 )
694 def get_graph_walker(
695 self, heads: Optional[list[ObjectID]] = None
696 ) -> ObjectStoreGraphWalker:
697 """Retrieve a graph walker.
699 A graph walker is used by a remote repository (or proxy)
700 to find out which objects are present in this repository.
702 Args:
703 heads: Repository heads to use (optional)
704 Returns: A graph walker object
705 """
706 if heads is None:
707 heads = [
708 sha
709 for sha in self.refs.as_dict(b"refs/heads").values()
710 if sha in self.object_store
711 ]
712 parents_provider = ParentsProvider(self.object_store)
713 return ObjectStoreGraphWalker(
714 heads,
715 parents_provider.get_parents,
716 shallow=self.get_shallow(),
717 update_shallow=self.update_shallow,
718 )
720 def get_refs(self) -> dict[bytes, bytes]:
721 """Get dictionary with all refs.
723 Returns: A ``dict`` mapping ref names to SHA1s
724 """
725 return self.refs.as_dict()
727 def head(self) -> bytes:
728 """Return the SHA1 pointed at by HEAD."""
729 # TODO: move this method to WorkTree
730 return self.refs[b"HEAD"]
732 def _get_object(self, sha: bytes, cls: type[T]) -> T:
733 assert len(sha) in (20, 40)
734 ret = self.get_object(sha)
735 if not isinstance(ret, cls):
736 if cls is Commit:
737 raise NotCommitError(ret.id)
738 elif cls is Blob:
739 raise NotBlobError(ret.id)
740 elif cls is Tree:
741 raise NotTreeError(ret.id)
742 elif cls is Tag:
743 raise NotTagError(ret.id)
744 else:
745 raise Exception(f"Type invalid: {ret.type_name!r} != {cls.type_name!r}")
746 return ret
748 def get_object(self, sha: bytes) -> ShaFile:
749 """Retrieve the object with the specified SHA.
751 Args:
752 sha: SHA to retrieve
753 Returns: A ShaFile object
754 Raises:
755 KeyError: when the object can not be found
756 """
757 return self.object_store[sha]
759 def parents_provider(self) -> ParentsProvider:
760 """Get a parents provider for this repository.
762 Returns:
763 ParentsProvider instance configured with grafts and shallows
764 """
765 return ParentsProvider(
766 self.object_store,
767 grafts=self._graftpoints,
768 shallows=self.get_shallow(),
769 )
771 def get_parents(self, sha: bytes, commit: Optional[Commit] = None) -> list[bytes]:
772 """Retrieve the parents of a specific commit.
774 If the specific commit is a graftpoint, the graft parents
775 will be returned instead.
777 Args:
778 sha: SHA of the commit for which to retrieve the parents
779 commit: Optional commit matching the sha
780 Returns: List of parents
781 """
782 return self.parents_provider().get_parents(sha, commit)
784 def get_config(self) -> "ConfigFile":
785 """Retrieve the config object.
787 Returns: `ConfigFile` object for the ``.git/config`` file.
788 """
789 raise NotImplementedError(self.get_config)
791 def get_worktree_config(self) -> "ConfigFile":
792 """Retrieve the worktree config object."""
793 raise NotImplementedError(self.get_worktree_config)
795 def get_description(self) -> Optional[str]:
796 """Retrieve the description for this repository.
798 Returns: String with the description of the repository
799 as set by the user.
800 """
801 raise NotImplementedError(self.get_description)
803 def set_description(self, description: bytes) -> None:
804 """Set the description for this repository.
806 Args:
807 description: Text to set as description for this repository.
808 """
809 raise NotImplementedError(self.set_description)
811 def get_rebase_state_manager(self) -> "RebaseStateManager":
812 """Get the appropriate rebase state manager for this repository.
814 Returns: RebaseStateManager instance
815 """
816 raise NotImplementedError(self.get_rebase_state_manager)
818 def get_blob_normalizer(self) -> "BlobNormalizer":
819 """Return a BlobNormalizer object for checkin/checkout operations.
821 Returns: BlobNormalizer instance
822 """
823 raise NotImplementedError(self.get_blob_normalizer)
825 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
826 """Read gitattributes for the repository.
828 Args:
829 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
831 Returns:
832 GitAttributes object that can be used to match paths
833 """
834 raise NotImplementedError(self.get_gitattributes)
836 def get_config_stack(self) -> "StackedConfig":
837 """Return a config stack for this repository.
839 This stack accesses the configuration for both this repository
840 itself (.git/config) and the global configuration, which usually
841 lives in ~/.gitconfig.
843 Returns: `Config` instance for this repository
844 """
845 from .config import ConfigFile, StackedConfig
847 local_config = self.get_config()
848 backends: list[ConfigFile] = [local_config]
849 if local_config.get_boolean((b"extensions",), b"worktreeconfig", False):
850 backends.append(self.get_worktree_config())
852 backends += StackedConfig.default_backends()
853 return StackedConfig(backends, writable=local_config)
855 def get_shallow(self) -> set[ObjectID]:
856 """Get the set of shallow commits.
858 Returns: Set of shallow commits.
859 """
860 f = self.get_named_file("shallow")
861 if f is None:
862 return set()
863 with f:
864 return {line.strip() for line in f}
866 def update_shallow(
867 self, new_shallow: Optional[set[bytes]], new_unshallow: Optional[set[bytes]]
868 ) -> None:
869 """Update the list of shallow objects.
871 Args:
872 new_shallow: Newly shallow objects
873 new_unshallow: Newly no longer shallow objects
874 """
875 shallow = self.get_shallow()
876 if new_shallow:
877 shallow.update(new_shallow)
878 if new_unshallow:
879 shallow.difference_update(new_unshallow)
880 if shallow:
881 self._put_named_file("shallow", b"".join([sha + b"\n" for sha in shallow]))
882 else:
883 self._del_named_file("shallow")
885 def get_peeled(self, ref: Ref) -> ObjectID:
886 """Get the peeled value of a ref.
888 Args:
889 ref: The refname to peel.
890 Returns: The fully-peeled SHA1 of a tag object, after peeling all
891 intermediate tags; if the original ref does not point to a tag,
892 this will equal the original SHA1.
893 """
894 cached = self.refs.get_peeled(ref)
895 if cached is not None:
896 return cached
897 return peel_sha(self.object_store, self.refs[ref])[1].id
899 @property
900 def notes(self) -> "Notes":
901 """Access notes functionality for this repository.
903 Returns:
904 Notes object for accessing notes
905 """
906 from .notes import Notes
908 return Notes(self.object_store, self.refs)
910 def get_walker(self, include: Optional[list[bytes]] = None, **kwargs) -> "Walker":
911 """Obtain a walker for this repository.
913 Args:
914 include: Iterable of SHAs of commits to include along with their
915 ancestors. Defaults to [HEAD]
916 **kwargs: Additional keyword arguments including:
918 * exclude: Iterable of SHAs of commits to exclude along with their
919 ancestors, overriding includes.
920 * order: ORDER_* constant specifying the order of results.
921 Anything other than ORDER_DATE may result in O(n) memory usage.
922 * reverse: If True, reverse the order of output, requiring O(n)
923 memory.
924 * max_entries: The maximum number of entries to yield, or None for
925 no limit.
926 * paths: Iterable of file or subtree paths to show entries for.
927 * rename_detector: diff.RenameDetector object for detecting
928 renames.
929 * follow: If True, follow path across renames/copies. Forces a
930 default rename_detector.
931 * since: Timestamp to list commits after.
932 * until: Timestamp to list commits before.
933 * queue_cls: A class to use for a queue of commits, supporting the
934 iterator protocol. The constructor takes a single argument, the Walker.
936 Returns: A `Walker` object
937 """
938 from .walk import Walker
940 if include is None:
941 include = [self.head()]
943 kwargs["get_parents"] = lambda commit: self.get_parents(commit.id, commit)
945 return Walker(self.object_store, include, **kwargs)
947 def __getitem__(self, name: Union[ObjectID, Ref]) -> "ShaFile":
948 """Retrieve a Git object by SHA1 or ref.
950 Args:
951 name: A Git object SHA1 or a ref name
952 Returns: A `ShaFile` object, such as a Commit or Blob
953 Raises:
954 KeyError: when the specified ref or object does not exist
955 """
956 if not isinstance(name, bytes):
957 raise TypeError(f"'name' must be bytestring, not {type(name).__name__:.80}")
958 if len(name) in (20, 40):
959 try:
960 return self.object_store[name]
961 except (KeyError, ValueError):
962 pass
963 try:
964 return self.object_store[self.refs[name]]
965 except RefFormatError as exc:
966 raise KeyError(name) from exc
968 def __contains__(self, name: bytes) -> bool:
969 """Check if a specific Git object or ref is present.
971 Args:
972 name: Git object SHA1 or ref name
973 """
974 if len(name) == 20 or (len(name) == 40 and valid_hexsha(name)):
975 return name in self.object_store or name in self.refs
976 else:
977 return name in self.refs
979 def __setitem__(self, name: bytes, value: Union[ShaFile, bytes]) -> None:
980 """Set a ref.
982 Args:
983 name: ref name
984 value: Ref value - either a ShaFile object, or a hex sha
985 """
986 if name.startswith(b"refs/") or name == b"HEAD":
987 if isinstance(value, ShaFile):
988 self.refs[name] = value.id
989 elif isinstance(value, bytes):
990 self.refs[name] = value
991 else:
992 raise TypeError(value)
993 else:
994 raise ValueError(name)
996 def __delitem__(self, name: bytes) -> None:
997 """Remove a ref.
999 Args:
1000 name: Name of the ref to remove
1001 """
1002 if name.startswith(b"refs/") or name == b"HEAD":
1003 del self.refs[name]
1004 else:
1005 raise ValueError(name)
1007 def _get_user_identity(
1008 self, config: "StackedConfig", kind: Optional[str] = None
1009 ) -> bytes:
1010 """Determine the identity to use for new commits."""
1011 warnings.warn(
1012 "use get_user_identity() rather than Repo._get_user_identity",
1013 DeprecationWarning,
1014 )
1015 return get_user_identity(config)
1017 def _add_graftpoints(self, updated_graftpoints: dict[bytes, list[bytes]]) -> None:
1018 """Add or modify graftpoints.
1020 Args:
1021 updated_graftpoints: Dict of commit shas to list of parent shas
1022 """
1023 # Simple validation
1024 for commit, parents in updated_graftpoints.items():
1025 for sha in [commit, *parents]:
1026 check_hexsha(sha, "Invalid graftpoint")
1028 self._graftpoints.update(updated_graftpoints)
1030 def _remove_graftpoints(self, to_remove: list[bytes] = []) -> None:
1031 """Remove graftpoints.
1033 Args:
1034 to_remove: List of commit shas
1035 """
1036 for sha in to_remove:
1037 del self._graftpoints[sha]
1039 def _read_heads(self, name: str) -> list[bytes]:
1040 f = self.get_named_file(name)
1041 if f is None:
1042 return []
1043 with f:
1044 return [line.strip() for line in f.readlines() if line.strip()]
1046 def get_worktree(self) -> "WorkTree":
1047 """Get the working tree for this repository.
1049 Returns:
1050 WorkTree instance for performing working tree operations
1052 Raises:
1053 NotImplementedError: If the repository doesn't support working trees
1054 """
1055 raise NotImplementedError(
1056 "Working tree operations not supported by this repository type"
1057 )
1059 @replace_me(remove_in="0.26.0")
1060 def do_commit(
1061 self,
1062 message: Optional[bytes] = None,
1063 committer: Optional[bytes] = None,
1064 author: Optional[bytes] = None,
1065 commit_timestamp: Optional[float] = None,
1066 commit_timezone: Optional[int] = None,
1067 author_timestamp: Optional[float] = None,
1068 author_timezone: Optional[int] = None,
1069 tree: Optional[ObjectID] = None,
1070 encoding: Optional[bytes] = None,
1071 ref: Optional[Ref] = b"HEAD",
1072 merge_heads: Optional[list[ObjectID]] = None,
1073 no_verify: bool = False,
1074 sign: bool = False,
1075 ) -> bytes:
1076 """Create a new commit.
1078 If not specified, committer and author default to
1079 get_user_identity(..., 'COMMITTER')
1080 and get_user_identity(..., 'AUTHOR') respectively.
1082 Args:
1083 message: Commit message (bytes or callable that takes (repo, commit)
1084 and returns bytes)
1085 committer: Committer fullname
1086 author: Author fullname
1087 commit_timestamp: Commit timestamp (defaults to now)
1088 commit_timezone: Commit timestamp timezone (defaults to GMT)
1089 author_timestamp: Author timestamp (defaults to commit
1090 timestamp)
1091 author_timezone: Author timestamp timezone
1092 (defaults to commit timestamp timezone)
1093 tree: SHA1 of the tree root to use (if not specified the
1094 current index will be committed).
1095 encoding: Encoding
1096 ref: Optional ref to commit to (defaults to current branch).
1097 If None, creates a dangling commit without updating any ref.
1098 merge_heads: Merge heads (defaults to .git/MERGE_HEAD)
1099 no_verify: Skip pre-commit and commit-msg hooks
1100 sign: GPG Sign the commit (bool, defaults to False,
1101 pass True to use default GPG key,
1102 pass a str containing Key ID to use a specific GPG key)
1104 Returns:
1105 New commit SHA1
1106 """
1107 return self.get_worktree().commit(
1108 message=message,
1109 committer=committer,
1110 author=author,
1111 commit_timestamp=commit_timestamp,
1112 commit_timezone=commit_timezone,
1113 author_timestamp=author_timestamp,
1114 author_timezone=author_timezone,
1115 tree=tree,
1116 encoding=encoding,
1117 ref=ref,
1118 merge_heads=merge_heads,
1119 no_verify=no_verify,
1120 sign=sign,
1121 )
1124def read_gitfile(f: BinaryIO) -> str:
1125 """Read a ``.git`` file.
1127 The first line of the file should start with "gitdir: "
1129 Args:
1130 f: File-like object to read from
1131 Returns: A path
1132 """
1133 cs = f.read()
1134 if not cs.startswith(b"gitdir: "):
1135 raise ValueError("Expected file to start with 'gitdir: '")
1136 return cs[len(b"gitdir: ") :].rstrip(b"\n").decode("utf-8")
1139class UnsupportedVersion(Exception):
1140 """Unsupported repository version."""
1142 def __init__(self, version) -> None:
1143 """Initialize UnsupportedVersion exception.
1145 Args:
1146 version: The unsupported repository version
1147 """
1148 self.version = version
1151class UnsupportedExtension(Exception):
1152 """Unsupported repository extension."""
1154 def __init__(self, extension) -> None:
1155 """Initialize UnsupportedExtension exception.
1157 Args:
1158 extension: The unsupported repository extension
1159 """
1160 self.extension = extension
1163class Repo(BaseRepo):
1164 """A git repository backed by local disk.
1166 To open an existing repository, call the constructor with
1167 the path of the repository.
1169 To create a new repository, use the Repo.init class method.
1171 Note that a repository object may hold on to resources such
1172 as file handles for performance reasons; call .close() to free
1173 up those resources.
1175 Attributes:
1176 path: Path to the working copy (if it exists) or repository control
1177 directory (if the repository is bare)
1178 bare: Whether this is a bare repository
1179 """
1181 path: str
1182 bare: bool
1183 object_store: DiskObjectStore
1185 def __init__(
1186 self,
1187 root: Union[str, bytes, os.PathLike],
1188 object_store: Optional[PackBasedObjectStore] = None,
1189 bare: Optional[bool] = None,
1190 ) -> None:
1191 """Open a repository on disk.
1193 Args:
1194 root: Path to the repository's root.
1195 object_store: ObjectStore to use; if omitted, we use the
1196 repository's default object store
1197 bare: True if this is a bare repository.
1198 """
1199 root = os.fspath(root)
1200 if isinstance(root, bytes):
1201 root = os.fsdecode(root)
1202 hidden_path = os.path.join(root, CONTROLDIR)
1203 if bare is None:
1204 if os.path.isfile(hidden_path) or os.path.isdir(
1205 os.path.join(hidden_path, OBJECTDIR)
1206 ):
1207 bare = False
1208 elif os.path.isdir(os.path.join(root, OBJECTDIR)) and os.path.isdir(
1209 os.path.join(root, REFSDIR)
1210 ):
1211 bare = True
1212 else:
1213 raise NotGitRepository(
1214 "No git repository was found at {path}".format(**dict(path=root))
1215 )
1217 self.bare = bare
1218 if bare is False:
1219 if os.path.isfile(hidden_path):
1220 with open(hidden_path, "rb") as f:
1221 path = read_gitfile(f)
1222 self._controldir = os.path.join(root, path)
1223 else:
1224 self._controldir = hidden_path
1225 else:
1226 self._controldir = root
1227 commondir = self.get_named_file(COMMONDIR)
1228 if commondir is not None:
1229 with commondir:
1230 self._commondir = os.path.join(
1231 self.controldir(),
1232 os.fsdecode(commondir.read().rstrip(b"\r\n")),
1233 )
1234 else:
1235 self._commondir = self._controldir
1236 self.path = root
1238 # Initialize refs early so they're available for config condition matchers
1239 self.refs = DiskRefsContainer(
1240 self.commondir(), self._controldir, logger=self._write_reflog
1241 )
1243 # Initialize worktrees container
1244 from .worktree import WorkTreeContainer
1246 self.worktrees = WorkTreeContainer(self)
1248 config = self.get_config()
1249 try:
1250 repository_format_version = config.get("core", "repositoryformatversion")
1251 format_version = (
1252 0
1253 if repository_format_version is None
1254 else int(repository_format_version)
1255 )
1256 except KeyError:
1257 format_version = 0
1259 if format_version not in (0, 1):
1260 raise UnsupportedVersion(format_version)
1262 # Track extensions we encounter
1263 has_reftable_extension = False
1264 for extension, value in config.items((b"extensions",)):
1265 if extension.lower() == b"refstorage":
1266 if value == b"reftable":
1267 has_reftable_extension = True
1268 else:
1269 raise UnsupportedExtension(f"refStorage = {value.decode()}")
1270 elif extension.lower() not in (b"worktreeconfig",):
1271 raise UnsupportedExtension(extension)
1273 if object_store is None:
1274 object_store = DiskObjectStore.from_config(
1275 os.path.join(self.commondir(), OBJECTDIR), config
1276 )
1278 # Use reftable if extension is configured
1279 if has_reftable_extension:
1280 from .reftable import ReftableRefsContainer
1282 self.refs = ReftableRefsContainer(self.commondir())
1283 # Update worktrees container after refs change
1284 self.worktrees = WorkTreeContainer(self)
1285 BaseRepo.__init__(self, object_store, self.refs)
1287 self._graftpoints = {}
1288 graft_file = self.get_named_file(
1289 os.path.join("info", "grafts"), basedir=self.commondir()
1290 )
1291 if graft_file:
1292 with graft_file:
1293 self._graftpoints.update(parse_graftpoints(graft_file))
1294 graft_file = self.get_named_file("shallow", basedir=self.commondir())
1295 if graft_file:
1296 with graft_file:
1297 self._graftpoints.update(parse_graftpoints(graft_file))
1299 self.hooks["pre-commit"] = PreCommitShellHook(self.path, self.controldir())
1300 self.hooks["commit-msg"] = CommitMsgShellHook(self.controldir())
1301 self.hooks["post-commit"] = PostCommitShellHook(self.controldir())
1302 self.hooks["post-receive"] = PostReceiveShellHook(self.controldir())
1304 def get_worktree(self) -> "WorkTree":
1305 """Get the working tree for this repository.
1307 Returns:
1308 WorkTree instance for performing working tree operations
1309 """
1310 from .worktree import WorkTree
1312 return WorkTree(self, self.path)
1314 def _write_reflog(
1315 self, ref, old_sha, new_sha, committer, timestamp, timezone, message
1316 ) -> None:
1317 from .reflog import format_reflog_line
1319 path = self._reflog_path(ref)
1320 try:
1321 os.makedirs(os.path.dirname(path))
1322 except FileExistsError:
1323 pass
1324 if committer is None:
1325 config = self.get_config_stack()
1326 committer = get_user_identity(config)
1327 check_user_identity(committer)
1328 if timestamp is None:
1329 timestamp = int(time.time())
1330 if timezone is None:
1331 timezone = 0 # FIXME
1332 with open(path, "ab") as f:
1333 f.write(
1334 format_reflog_line(
1335 old_sha, new_sha, committer, timestamp, timezone, message
1336 )
1337 + b"\n"
1338 )
1340 def _reflog_path(self, ref: bytes) -> str:
1341 if ref.startswith((b"main-worktree/", b"worktrees/")):
1342 raise NotImplementedError(f"refs {ref.decode()} are not supported")
1344 base = self.controldir() if is_per_worktree_ref(ref) else self.commondir()
1345 return os.path.join(base, "logs", os.fsdecode(ref))
1347 def read_reflog(self, ref):
1348 """Read reflog entries for a reference.
1350 Args:
1351 ref: Reference name (e.g. b'HEAD', b'refs/heads/master')
1353 Yields:
1354 reflog.Entry objects in chronological order (oldest first)
1355 """
1356 from .reflog import read_reflog
1358 path = self._reflog_path(ref)
1359 try:
1360 with open(path, "rb") as f:
1361 yield from read_reflog(f)
1362 except FileNotFoundError:
1363 return
1365 @classmethod
1366 def discover(cls, start="."):
1367 """Iterate parent directories to discover a repository.
1369 Return a Repo object for the first parent directory that looks like a
1370 Git repository.
1372 Args:
1373 start: The directory to start discovery from (defaults to '.')
1374 """
1375 remaining = True
1376 path = os.path.abspath(start)
1377 while remaining:
1378 try:
1379 return cls(path)
1380 except NotGitRepository:
1381 path, remaining = os.path.split(path)
1382 raise NotGitRepository(
1383 "No git repository was found at {path}".format(**dict(path=start))
1384 )
1386 def controldir(self) -> str:
1387 """Return the path of the control directory."""
1388 return self._controldir
1390 def commondir(self) -> str:
1391 """Return the path of the common directory.
1393 For a main working tree, it is identical to controldir().
1395 For a linked working tree, it is the control directory of the
1396 main working tree.
1397 """
1398 return self._commondir
1400 def _determine_file_mode(self) -> bool:
1401 """Probe the file-system to determine whether permissions can be trusted.
1403 Returns: True if permissions can be trusted, False otherwise.
1404 """
1405 fname = os.path.join(self.path, ".probe-permissions")
1406 with open(fname, "w") as f:
1407 f.write("")
1409 st1 = os.lstat(fname)
1410 try:
1411 os.chmod(fname, st1.st_mode ^ stat.S_IXUSR)
1412 except PermissionError:
1413 return False
1414 st2 = os.lstat(fname)
1416 os.unlink(fname)
1418 mode_differs = st1.st_mode != st2.st_mode
1419 st2_has_exec = (st2.st_mode & stat.S_IXUSR) != 0
1421 return mode_differs and st2_has_exec
1423 def _determine_symlinks(self) -> bool:
1424 """Probe the filesystem to determine whether symlinks can be created.
1426 Returns: True if symlinks can be created, False otherwise.
1427 """
1428 # TODO(jelmer): Actually probe disk / look at filesystem
1429 return sys.platform != "win32"
1431 def _put_named_file(self, path: str, contents: bytes) -> None:
1432 """Write a file to the control dir with the given name and contents.
1434 Args:
1435 path: The path to the file, relative to the control dir.
1436 contents: A string to write to the file.
1437 """
1438 path = path.lstrip(os.path.sep)
1439 with GitFile(os.path.join(self.controldir(), path), "wb") as f:
1440 f.write(contents)
1442 def _del_named_file(self, path: str) -> None:
1443 try:
1444 os.unlink(os.path.join(self.controldir(), path))
1445 except FileNotFoundError:
1446 return
1448 def get_named_file(self, path, basedir=None):
1449 """Get a file from the control dir with a specific name.
1451 Although the filename should be interpreted as a filename relative to
1452 the control dir in a disk-based Repo, the object returned need not be
1453 pointing to a file in that location.
1455 Args:
1456 path: The path to the file, relative to the control dir.
1457 basedir: Optional argument that specifies an alternative to the
1458 control dir.
1459 Returns: An open file object, or None if the file does not exist.
1460 """
1461 # TODO(dborowitz): sanitize filenames, since this is used directly by
1462 # the dumb web serving code.
1463 if basedir is None:
1464 basedir = self.controldir()
1465 path = path.lstrip(os.path.sep)
1466 try:
1467 return open(os.path.join(basedir, path), "rb")
1468 except FileNotFoundError:
1469 return None
1471 def index_path(self):
1472 """Return path to the index file."""
1473 return os.path.join(self.controldir(), INDEX_FILENAME)
1475 def open_index(self) -> "Index":
1476 """Open the index for this repository.
1478 Raises:
1479 NoIndexPresent: If no index is present
1480 Returns: The matching `Index`
1481 """
1482 from .index import Index
1484 if not self.has_index():
1485 raise NoIndexPresent
1487 # Check for manyFiles feature configuration
1488 config = self.get_config_stack()
1489 many_files = config.get_boolean(b"feature", b"manyFiles", False)
1490 skip_hash = False
1491 index_version = None
1493 if many_files:
1494 # When feature.manyFiles is enabled, set index.version=4 and index.skipHash=true
1495 try:
1496 index_version_str = config.get(b"index", b"version")
1497 index_version = int(index_version_str)
1498 except KeyError:
1499 index_version = 4 # Default to version 4 for manyFiles
1500 skip_hash = config.get_boolean(b"index", b"skipHash", True)
1501 else:
1502 # Check for explicit index settings
1503 try:
1504 index_version_str = config.get(b"index", b"version")
1505 index_version = int(index_version_str)
1506 except KeyError:
1507 index_version = None
1508 skip_hash = config.get_boolean(b"index", b"skipHash", False)
1510 return Index(self.index_path(), skip_hash=skip_hash, version=index_version)
1512 def has_index(self) -> bool:
1513 """Check if an index is present."""
1514 # Bare repos must never have index files; non-bare repos may have a
1515 # missing index file, which is treated as empty.
1516 return not self.bare
1518 @replace_me(remove_in="0.26.0")
1519 def stage(
1520 self,
1521 fs_paths: Union[
1522 str, bytes, os.PathLike, Iterable[Union[str, bytes, os.PathLike]]
1523 ],
1524 ) -> None:
1525 """Stage a set of paths.
1527 Args:
1528 fs_paths: List of paths, relative to the repository path
1529 """
1530 return self.get_worktree().stage(fs_paths)
1532 @replace_me(remove_in="0.26.0")
1533 def unstage(self, fs_paths: list[str]) -> None:
1534 """Unstage specific file in the index.
1536 Args:
1537 fs_paths: a list of files to unstage,
1538 relative to the repository path.
1539 """
1540 return self.get_worktree().unstage(fs_paths)
1542 def clone(
1543 self,
1544 target_path,
1545 *,
1546 mkdir=True,
1547 bare=False,
1548 origin=b"origin",
1549 checkout=None,
1550 branch=None,
1551 progress=None,
1552 depth: Optional[int] = None,
1553 symlinks=None,
1554 ) -> "Repo":
1555 """Clone this repository.
1557 Args:
1558 target_path: Target path
1559 mkdir: Create the target directory
1560 bare: Whether to create a bare repository
1561 checkout: Whether or not to check-out HEAD after cloning
1562 origin: Base name for refs in target repository
1563 cloned from this repository
1564 branch: Optional branch or tag to be used as HEAD in the new repository
1565 instead of this repository's HEAD.
1566 progress: Optional progress function
1567 depth: Depth at which to fetch
1568 symlinks: Symlinks setting (default to autodetect)
1569 Returns: Created repository as `Repo`
1570 """
1571 encoded_path = os.fsencode(self.path)
1573 if mkdir:
1574 os.mkdir(target_path)
1576 try:
1577 if not bare:
1578 target = Repo.init(target_path, symlinks=symlinks)
1579 if checkout is None:
1580 checkout = True
1581 else:
1582 if checkout:
1583 raise ValueError("checkout and bare are incompatible")
1584 target = Repo.init_bare(target_path)
1586 try:
1587 target_config = target.get_config()
1588 target_config.set((b"remote", origin), b"url", encoded_path)
1589 target_config.set(
1590 (b"remote", origin),
1591 b"fetch",
1592 b"+refs/heads/*:refs/remotes/" + origin + b"/*",
1593 )
1594 target_config.write_to_path()
1596 ref_message = b"clone: from " + encoded_path
1597 self.fetch(target, depth=depth)
1598 target.refs.import_refs(
1599 b"refs/remotes/" + origin,
1600 self.refs.as_dict(b"refs/heads"),
1601 message=ref_message,
1602 )
1603 target.refs.import_refs(
1604 b"refs/tags", self.refs.as_dict(b"refs/tags"), message=ref_message
1605 )
1607 head_chain, origin_sha = self.refs.follow(b"HEAD")
1608 origin_head = head_chain[-1] if head_chain else None
1609 if origin_sha and not origin_head:
1610 # set detached HEAD
1611 target.refs[b"HEAD"] = origin_sha
1612 else:
1613 _set_origin_head(target.refs, origin, origin_head)
1614 head_ref = _set_default_branch(
1615 target.refs, origin, origin_head, branch, ref_message
1616 )
1618 # Update target head
1619 if head_ref:
1620 head = _set_head(target.refs, head_ref, ref_message)
1621 else:
1622 head = None
1624 if checkout and head is not None:
1625 target.get_worktree().reset_index()
1626 except BaseException:
1627 target.close()
1628 raise
1629 except BaseException:
1630 if mkdir:
1631 import shutil
1633 shutil.rmtree(target_path)
1634 raise
1635 return target
1637 @replace_me(remove_in="0.26.0")
1638 def reset_index(self, tree: Optional[bytes] = None):
1639 """Reset the index back to a specific tree.
1641 Args:
1642 tree: Tree SHA to reset to, None for current HEAD tree.
1643 """
1644 return self.get_worktree().reset_index(tree)
1646 def _get_config_condition_matchers(self) -> dict[str, "ConditionMatcher"]:
1647 """Get condition matchers for includeIf conditions.
1649 Returns a dict of condition prefix to matcher function.
1650 """
1651 from pathlib import Path
1653 from .config import ConditionMatcher, match_glob_pattern
1655 # Add gitdir matchers
1656 def match_gitdir(pattern: str, case_sensitive: bool = True) -> bool:
1657 """Match gitdir against a pattern.
1659 Args:
1660 pattern: Pattern to match against
1661 case_sensitive: Whether to match case-sensitively
1663 Returns:
1664 True if gitdir matches pattern
1665 """
1666 # Handle relative patterns (starting with ./)
1667 if pattern.startswith("./"):
1668 # Can't handle relative patterns without config directory context
1669 return False
1671 # Normalize repository path
1672 try:
1673 repo_path = str(Path(self._controldir).resolve())
1674 except (OSError, ValueError):
1675 return False
1677 # Expand ~ in pattern and normalize
1678 pattern = os.path.expanduser(pattern)
1680 # Normalize pattern following Git's rules
1681 pattern = pattern.replace("\\", "/")
1682 if not pattern.startswith(("~/", "./", "/", "**")):
1683 # Check for Windows absolute path
1684 if len(pattern) >= 2 and pattern[1] == ":":
1685 pass
1686 else:
1687 pattern = "**/" + pattern
1688 if pattern.endswith("/"):
1689 pattern = pattern + "**"
1691 # Use the existing _match_gitdir_pattern function
1692 from .config import _match_gitdir_pattern
1694 pattern_bytes = pattern.encode("utf-8", errors="replace")
1695 repo_path_bytes = repo_path.encode("utf-8", errors="replace")
1697 return _match_gitdir_pattern(
1698 repo_path_bytes, pattern_bytes, ignorecase=not case_sensitive
1699 )
1701 # Add onbranch matcher
1702 def match_onbranch(pattern: str) -> bool:
1703 """Match current branch against a pattern.
1705 Args:
1706 pattern: Pattern to match against
1708 Returns:
1709 True if current branch matches pattern
1710 """
1711 try:
1712 # Get the current branch using refs
1713 ref_chain, _ = self.refs.follow(b"HEAD")
1714 head_ref = ref_chain[-1] # Get the final resolved ref
1715 except KeyError:
1716 pass
1717 else:
1718 if head_ref and head_ref.startswith(b"refs/heads/"):
1719 # Extract branch name from ref
1720 branch = head_ref[11:].decode("utf-8", errors="replace")
1721 return match_glob_pattern(branch, pattern)
1722 return False
1724 matchers: dict[str, ConditionMatcher] = {
1725 "onbranch:": match_onbranch,
1726 "gitdir:": lambda pattern: match_gitdir(pattern, True),
1727 "gitdir/i:": lambda pattern: match_gitdir(pattern, False),
1728 }
1730 return matchers
1732 def get_worktree_config(self) -> "ConfigFile":
1733 """Get the worktree-specific config.
1735 Returns:
1736 ConfigFile object for the worktree config
1737 """
1738 from .config import ConfigFile
1740 path = os.path.join(self.commondir(), "config.worktree")
1741 try:
1742 # Pass condition matchers for includeIf evaluation
1743 condition_matchers = self._get_config_condition_matchers()
1744 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1745 except FileNotFoundError:
1746 cf = ConfigFile()
1747 cf.path = path
1748 return cf
1750 def get_config(self) -> "ConfigFile":
1751 """Retrieve the config object.
1753 Returns: `ConfigFile` object for the ``.git/config`` file.
1754 """
1755 from .config import ConfigFile
1757 path = os.path.join(self._commondir, "config")
1758 try:
1759 # Pass condition matchers for includeIf evaluation
1760 condition_matchers = self._get_config_condition_matchers()
1761 return ConfigFile.from_path(path, condition_matchers=condition_matchers)
1762 except FileNotFoundError:
1763 ret = ConfigFile()
1764 ret.path = path
1765 return ret
1767 def get_rebase_state_manager(self):
1768 """Get the appropriate rebase state manager for this repository.
1770 Returns: DiskRebaseStateManager instance
1771 """
1772 import os
1774 from .rebase import DiskRebaseStateManager
1776 path = os.path.join(self.controldir(), "rebase-merge")
1777 return DiskRebaseStateManager(path)
1779 def get_description(self):
1780 """Retrieve the description of this repository.
1782 Returns: A string describing the repository or None.
1783 """
1784 path = os.path.join(self._controldir, "description")
1785 try:
1786 with GitFile(path, "rb") as f:
1787 return f.read()
1788 except FileNotFoundError:
1789 return None
1791 def __repr__(self) -> str:
1792 """Return string representation of this repository."""
1793 return f"<Repo at {self.path!r}>"
1795 def set_description(self, description) -> None:
1796 """Set the description for this repository.
1798 Args:
1799 description: Text to set as description for this repository.
1800 """
1801 self._put_named_file("description", description)
1803 @classmethod
1804 def _init_maybe_bare(
1805 cls,
1806 path: Union[str, bytes, os.PathLike],
1807 controldir: Union[str, bytes, os.PathLike],
1808 bare,
1809 object_store=None,
1810 config=None,
1811 default_branch=None,
1812 symlinks: Optional[bool] = None,
1813 format: Optional[int] = None,
1814 ):
1815 path = os.fspath(path)
1816 if isinstance(path, bytes):
1817 path = os.fsdecode(path)
1818 controldir = os.fspath(controldir)
1819 if isinstance(controldir, bytes):
1820 controldir = os.fsdecode(controldir)
1821 for d in BASE_DIRECTORIES:
1822 os.mkdir(os.path.join(controldir, *d))
1823 if object_store is None:
1824 object_store = DiskObjectStore.init(os.path.join(controldir, OBJECTDIR))
1825 ret = cls(path, bare=bare, object_store=object_store)
1826 if default_branch is None:
1827 if config is None:
1828 from .config import StackedConfig
1830 config = StackedConfig.default()
1831 try:
1832 default_branch = config.get("init", "defaultBranch")
1833 except KeyError:
1834 default_branch = DEFAULT_BRANCH
1835 ret.refs.set_symbolic_ref(b"HEAD", LOCAL_BRANCH_PREFIX + default_branch)
1836 ret._init_files(bare=bare, symlinks=symlinks, format=format)
1837 return ret
1839 @classmethod
1840 def init(
1841 cls,
1842 path: Union[str, bytes, os.PathLike],
1843 *,
1844 mkdir: bool = False,
1845 config=None,
1846 default_branch=None,
1847 symlinks: Optional[bool] = None,
1848 format: Optional[int] = None,
1849 ) -> "Repo":
1850 """Create a new repository.
1852 Args:
1853 path: Path in which to create the repository
1854 mkdir: Whether to create the directory
1855 config: Configuration object
1856 default_branch: Default branch name
1857 symlinks: Whether to support symlinks
1858 format: Repository format version (defaults to 0)
1859 Returns: `Repo` instance
1860 """
1861 path = os.fspath(path)
1862 if isinstance(path, bytes):
1863 path = os.fsdecode(path)
1864 if mkdir:
1865 os.mkdir(path)
1866 controldir = os.path.join(path, CONTROLDIR)
1867 os.mkdir(controldir)
1868 _set_filesystem_hidden(controldir)
1869 return cls._init_maybe_bare(
1870 path,
1871 controldir,
1872 False,
1873 config=config,
1874 default_branch=default_branch,
1875 symlinks=symlinks,
1876 format=format,
1877 )
1879 @classmethod
1880 def _init_new_working_directory(
1881 cls,
1882 path: Union[str, bytes, os.PathLike],
1883 main_repo,
1884 identifier=None,
1885 mkdir=False,
1886 ):
1887 """Create a new working directory linked to a repository.
1889 Args:
1890 path: Path in which to create the working tree.
1891 main_repo: Main repository to reference
1892 identifier: Worktree identifier
1893 mkdir: Whether to create the directory
1894 Returns: `Repo` instance
1895 """
1896 path = os.fspath(path)
1897 if isinstance(path, bytes):
1898 path = os.fsdecode(path)
1899 if mkdir:
1900 os.mkdir(path)
1901 if identifier is None:
1902 identifier = os.path.basename(path)
1903 # Ensure we use absolute path for the worktree control directory
1904 main_controldir = os.path.abspath(main_repo.controldir())
1905 main_worktreesdir = os.path.join(main_controldir, WORKTREES)
1906 worktree_controldir = os.path.join(main_worktreesdir, identifier)
1907 gitdirfile = os.path.join(path, CONTROLDIR)
1908 with open(gitdirfile, "wb") as f:
1909 f.write(b"gitdir: " + os.fsencode(worktree_controldir) + b"\n")
1910 try:
1911 os.mkdir(main_worktreesdir)
1912 except FileExistsError:
1913 pass
1914 try:
1915 os.mkdir(worktree_controldir)
1916 except FileExistsError:
1917 pass
1918 with open(os.path.join(worktree_controldir, GITDIR), "wb") as f:
1919 f.write(os.fsencode(gitdirfile) + b"\n")
1920 with open(os.path.join(worktree_controldir, COMMONDIR), "wb") as f:
1921 f.write(b"../..\n")
1922 with open(os.path.join(worktree_controldir, "HEAD"), "wb") as f:
1923 f.write(main_repo.head() + b"\n")
1924 r = cls(os.path.normpath(path))
1925 r.get_worktree().reset_index()
1926 return r
1928 @classmethod
1929 def init_bare(
1930 cls,
1931 path: Union[str, bytes, os.PathLike],
1932 *,
1933 mkdir=False,
1934 object_store=None,
1935 config=None,
1936 default_branch=None,
1937 format: Optional[int] = None,
1938 ):
1939 """Create a new bare repository.
1941 ``path`` should already exist and be an empty directory.
1943 Args:
1944 path: Path to create bare repository in
1945 mkdir: Whether to create the directory
1946 object_store: Object store to use
1947 config: Configuration object
1948 default_branch: Default branch name
1949 format: Repository format version (defaults to 0)
1950 Returns: a `Repo` instance
1951 """
1952 path = os.fspath(path)
1953 if isinstance(path, bytes):
1954 path = os.fsdecode(path)
1955 if mkdir:
1956 os.mkdir(path)
1957 return cls._init_maybe_bare(
1958 path,
1959 path,
1960 True,
1961 object_store=object_store,
1962 config=config,
1963 default_branch=default_branch,
1964 format=format,
1965 )
1967 create = init_bare
1969 def close(self) -> None:
1970 """Close any files opened by this repository."""
1971 self.object_store.close()
1973 def __enter__(self):
1974 """Enter context manager."""
1975 return self
1977 def __exit__(self, exc_type, exc_val, exc_tb):
1978 """Exit context manager and close repository."""
1979 self.close()
1981 def _read_gitattributes(self) -> dict[bytes, dict[bytes, bytes]]:
1982 """Read .gitattributes file from working tree.
1984 Returns:
1985 Dictionary mapping file patterns to attributes
1986 """
1987 gitattributes = {}
1988 gitattributes_path = os.path.join(self.path, ".gitattributes")
1990 if os.path.exists(gitattributes_path):
1991 with open(gitattributes_path, "rb") as f:
1992 for line in f:
1993 line = line.strip()
1994 if not line or line.startswith(b"#"):
1995 continue
1997 parts = line.split()
1998 if len(parts) < 2:
1999 continue
2001 pattern = parts[0]
2002 attrs = {}
2004 for attr in parts[1:]:
2005 if attr.startswith(b"-"):
2006 # Unset attribute
2007 attrs[attr[1:]] = b"false"
2008 elif b"=" in attr:
2009 # Set to value
2010 key, value = attr.split(b"=", 1)
2011 attrs[key] = value
2012 else:
2013 # Set attribute
2014 attrs[attr] = b"true"
2016 gitattributes[pattern] = attrs
2018 return gitattributes
2020 def get_blob_normalizer(self):
2021 """Return a BlobNormalizer object."""
2022 from .filters import FilterBlobNormalizer, FilterRegistry
2024 # Get proper GitAttributes object
2025 git_attributes = self.get_gitattributes()
2026 config_stack = self.get_config_stack()
2028 # Create FilterRegistry with repo reference
2029 filter_registry = FilterRegistry(config_stack, self)
2031 # Return FilterBlobNormalizer which handles all filters including line endings
2032 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self)
2034 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
2035 """Read gitattributes for the repository.
2037 Args:
2038 tree: Tree SHA to read .gitattributes from (defaults to HEAD)
2040 Returns:
2041 GitAttributes object that can be used to match paths
2042 """
2043 from .attrs import (
2044 GitAttributes,
2045 Pattern,
2046 parse_git_attributes,
2047 )
2049 patterns = []
2051 # Read system gitattributes (TODO: implement this)
2052 # Read global gitattributes (TODO: implement this)
2054 # Read repository .gitattributes from index/tree
2055 if tree is None:
2056 try:
2057 # Try to get from HEAD
2058 head = self[b"HEAD"]
2059 if isinstance(head, Tag):
2060 _cls, obj = head.object
2061 head = self.get_object(obj)
2062 assert isinstance(head, Commit)
2063 tree = head.tree
2064 except KeyError:
2065 # No HEAD, no attributes from tree
2066 pass
2068 if tree is not None:
2069 try:
2070 tree_obj = self[tree]
2071 assert isinstance(tree_obj, Tree)
2072 if b".gitattributes" in tree_obj:
2073 _, attrs_sha = tree_obj[b".gitattributes"]
2074 attrs_blob = self[attrs_sha]
2075 if isinstance(attrs_blob, Blob):
2076 attrs_data = BytesIO(attrs_blob.data)
2077 for pattern_bytes, attrs in parse_git_attributes(attrs_data):
2078 pattern = Pattern(pattern_bytes)
2079 patterns.append((pattern, attrs))
2080 except (KeyError, NotTreeError):
2081 pass
2083 # Read .git/info/attributes
2084 info_attrs_path = os.path.join(self.controldir(), "info", "attributes")
2085 if os.path.exists(info_attrs_path):
2086 with open(info_attrs_path, "rb") as f:
2087 for pattern_bytes, attrs in parse_git_attributes(f):
2088 pattern = Pattern(pattern_bytes)
2089 patterns.append((pattern, attrs))
2091 # Read .gitattributes from working directory (if it exists)
2092 working_attrs_path = os.path.join(self.path, ".gitattributes")
2093 if os.path.exists(working_attrs_path):
2094 with open(working_attrs_path, "rb") as f:
2095 for pattern_bytes, attrs in parse_git_attributes(f):
2096 pattern = Pattern(pattern_bytes)
2097 patterns.append((pattern, attrs))
2099 return GitAttributes(patterns)
2101 @replace_me(remove_in="0.26.0")
2102 def _sparse_checkout_file_path(self) -> str:
2103 """Return the path of the sparse-checkout file in this repo's control dir."""
2104 return self.get_worktree()._sparse_checkout_file_path()
2106 @replace_me(remove_in="0.26.0")
2107 def configure_for_cone_mode(self) -> None:
2108 """Ensure the repository is configured for cone-mode sparse-checkout."""
2109 return self.get_worktree().configure_for_cone_mode()
2111 @replace_me(remove_in="0.26.0")
2112 def infer_cone_mode(self) -> bool:
2113 """Return True if 'core.sparseCheckoutCone' is set to 'true' in config, else False."""
2114 return self.get_worktree().infer_cone_mode()
2116 @replace_me(remove_in="0.26.0")
2117 def get_sparse_checkout_patterns(self) -> list[str]:
2118 """Return a list of sparse-checkout patterns from info/sparse-checkout.
2120 Returns:
2121 A list of patterns. Returns an empty list if the file is missing.
2122 """
2123 return self.get_worktree().get_sparse_checkout_patterns()
2125 @replace_me(remove_in="0.26.0")
2126 def set_sparse_checkout_patterns(self, patterns: list[str]) -> None:
2127 """Write the given sparse-checkout patterns into info/sparse-checkout.
2129 Creates the info/ directory if it does not exist.
2131 Args:
2132 patterns: A list of gitignore-style patterns to store.
2133 """
2134 return self.get_worktree().set_sparse_checkout_patterns(patterns)
2136 @replace_me(remove_in="0.26.0")
2137 def set_cone_mode_patterns(self, dirs: Union[list[str], None] = None) -> None:
2138 """Write the given cone-mode directory patterns into info/sparse-checkout.
2140 For each directory to include, add an inclusion line that "undoes" the prior
2141 ``!/*/`` 'exclude' that re-includes that directory and everything under it.
2142 Never add the same line twice.
2143 """
2144 return self.get_worktree().set_cone_mode_patterns(dirs)
2147class MemoryRepo(BaseRepo):
2148 """Repo that stores refs, objects, and named files in memory.
2150 MemoryRepos are always bare: they have no working tree and no index, since
2151 those have a stronger dependency on the filesystem.
2152 """
2154 def __init__(self) -> None:
2155 """Create a new repository in memory."""
2156 from .config import ConfigFile
2158 self._reflog: list[Any] = []
2159 refs_container = DictRefsContainer({}, logger=self._append_reflog)
2160 BaseRepo.__init__(self, MemoryObjectStore(), refs_container) # type: ignore[arg-type]
2161 self._named_files: dict[str, bytes] = {}
2162 self.bare = True
2163 self._config = ConfigFile()
2164 self._description = None
2166 def _append_reflog(self, *args) -> None:
2167 self._reflog.append(args)
2169 def set_description(self, description) -> None:
2170 """Set the description for this repository.
2172 Args:
2173 description: Text to set as description
2174 """
2175 self._description = description
2177 def get_description(self):
2178 """Get the description of this repository.
2180 Returns:
2181 Repository description as bytes
2182 """
2183 return self._description
2185 def _determine_file_mode(self):
2186 """Probe the file-system to determine whether permissions can be trusted.
2188 Returns: True if permissions can be trusted, False otherwise.
2189 """
2190 return sys.platform != "win32"
2192 def _determine_symlinks(self):
2193 """Probe the file-system to determine whether permissions can be trusted.
2195 Returns: True if permissions can be trusted, False otherwise.
2196 """
2197 return sys.platform != "win32"
2199 def _put_named_file(self, path, contents) -> None:
2200 """Write a file to the control dir with the given name and contents.
2202 Args:
2203 path: The path to the file, relative to the control dir.
2204 contents: A string to write to the file.
2205 """
2206 self._named_files[path] = contents
2208 def _del_named_file(self, path) -> None:
2209 try:
2210 del self._named_files[path]
2211 except KeyError:
2212 pass
2214 def get_named_file(self, path, basedir=None):
2215 """Get a file from the control dir with a specific name.
2217 Although the filename should be interpreted as a filename relative to
2218 the control dir in a disk-baked Repo, the object returned need not be
2219 pointing to a file in that location.
2221 Args:
2222 path: The path to the file, relative to the control dir.
2223 basedir: Optional base directory for the path
2224 Returns: An open file object, or None if the file does not exist.
2225 """
2226 contents = self._named_files.get(path, None)
2227 if contents is None:
2228 return None
2229 return BytesIO(contents)
2231 def open_index(self) -> "Index":
2232 """Fail to open index for this repo, since it is bare.
2234 Raises:
2235 NoIndexPresent: Raised when no index is present
2236 """
2237 raise NoIndexPresent
2239 def get_config(self):
2240 """Retrieve the config object.
2242 Returns: `ConfigFile` object.
2243 """
2244 return self._config
2246 def get_rebase_state_manager(self):
2247 """Get the appropriate rebase state manager for this repository.
2249 Returns: MemoryRebaseStateManager instance
2250 """
2251 from .rebase import MemoryRebaseStateManager
2253 return MemoryRebaseStateManager(self)
2255 def get_blob_normalizer(self):
2256 """Return a BlobNormalizer object for checkin/checkout operations."""
2257 from .filters import FilterBlobNormalizer, FilterRegistry
2259 # Get GitAttributes object
2260 git_attributes = self.get_gitattributes()
2261 config_stack = self.get_config_stack()
2263 # Create FilterRegistry with repo reference
2264 filter_registry = FilterRegistry(config_stack, self)
2266 # Return FilterBlobNormalizer which handles all filters
2267 return FilterBlobNormalizer(config_stack, git_attributes, filter_registry, self)
2269 def get_gitattributes(self, tree: Optional[bytes] = None) -> "GitAttributes":
2270 """Read gitattributes for the repository."""
2271 from .attrs import GitAttributes
2273 # Memory repos don't have working trees or gitattributes files
2274 # Return empty GitAttributes
2275 return GitAttributes([])
2277 def do_commit(
2278 self,
2279 message: Optional[bytes] = None,
2280 committer: Optional[bytes] = None,
2281 author: Optional[bytes] = None,
2282 commit_timestamp=None,
2283 commit_timezone=None,
2284 author_timestamp=None,
2285 author_timezone=None,
2286 tree: Optional[ObjectID] = None,
2287 encoding: Optional[bytes] = None,
2288 ref: Optional[Ref] = b"HEAD",
2289 merge_heads: Optional[list[ObjectID]] = None,
2290 no_verify: bool = False,
2291 sign: bool = False,
2292 ):
2293 """Create a new commit.
2295 This is a simplified implementation for in-memory repositories that
2296 doesn't support worktree operations or hooks.
2298 Args:
2299 message: Commit message
2300 committer: Committer fullname
2301 author: Author fullname
2302 commit_timestamp: Commit timestamp (defaults to now)
2303 commit_timezone: Commit timestamp timezone (defaults to GMT)
2304 author_timestamp: Author timestamp (defaults to commit timestamp)
2305 author_timezone: Author timestamp timezone (defaults to commit timezone)
2306 tree: SHA1 of the tree root to use
2307 encoding: Encoding
2308 ref: Optional ref to commit to (defaults to current branch).
2309 If None, creates a dangling commit without updating any ref.
2310 merge_heads: Merge heads
2311 no_verify: Skip pre-commit and commit-msg hooks (ignored for MemoryRepo)
2312 sign: GPG Sign the commit (ignored for MemoryRepo)
2314 Returns:
2315 New commit SHA1
2316 """
2317 import time
2319 from .objects import Commit
2321 if tree is None:
2322 raise ValueError("tree must be specified for MemoryRepo")
2324 c = Commit()
2325 if len(tree) != 40:
2326 raise ValueError("tree must be a 40-byte hex sha string")
2327 c.tree = tree
2329 config = self.get_config_stack()
2330 if merge_heads is None:
2331 merge_heads = []
2332 if committer is None:
2333 committer = get_user_identity(config, kind="COMMITTER")
2334 check_user_identity(committer)
2335 c.committer = committer
2336 if commit_timestamp is None:
2337 commit_timestamp = time.time()
2338 c.commit_time = int(commit_timestamp)
2339 if commit_timezone is None:
2340 commit_timezone = 0
2341 c.commit_timezone = commit_timezone
2342 if author is None:
2343 author = get_user_identity(config, kind="AUTHOR")
2344 c.author = author
2345 check_user_identity(author)
2346 if author_timestamp is None:
2347 author_timestamp = commit_timestamp
2348 c.author_time = int(author_timestamp)
2349 if author_timezone is None:
2350 author_timezone = commit_timezone
2351 c.author_timezone = author_timezone
2352 if encoding is None:
2353 try:
2354 encoding = config.get(("i18n",), "commitEncoding")
2355 except KeyError:
2356 pass
2357 if encoding is not None:
2358 c.encoding = encoding
2360 # Handle message (for MemoryRepo, we don't support callable messages)
2361 if callable(message):
2362 message = message(self, c)
2363 if message is None:
2364 raise ValueError("Message callback returned None")
2366 if message is None:
2367 raise ValueError("No commit message specified")
2369 c.message = message
2371 if ref is None:
2372 # Create a dangling commit
2373 c.parents = merge_heads
2374 self.object_store.add_object(c)
2375 else:
2376 try:
2377 old_head = self.refs[ref]
2378 c.parents = [old_head, *merge_heads]
2379 self.object_store.add_object(c)
2380 ok = self.refs.set_if_equals(
2381 ref,
2382 old_head,
2383 c.id,
2384 message=b"commit: " + message,
2385 committer=committer,
2386 timestamp=commit_timestamp,
2387 timezone=commit_timezone,
2388 )
2389 except KeyError:
2390 c.parents = merge_heads
2391 self.object_store.add_object(c)
2392 ok = self.refs.add_if_new(
2393 ref,
2394 c.id,
2395 message=b"commit: " + message,
2396 committer=committer,
2397 timestamp=commit_timestamp,
2398 timezone=commit_timezone,
2399 )
2400 if not ok:
2401 from .errors import CommitError
2403 raise CommitError(f"{ref!r} changed during commit")
2405 return c.id
2407 @classmethod
2408 def init_bare(cls, objects, refs, format: Optional[int] = None):
2409 """Create a new bare repository in memory.
2411 Args:
2412 objects: Objects for the new repository,
2413 as iterable
2414 refs: Refs as dictionary, mapping names
2415 to object SHA1s
2416 format: Repository format version (defaults to 0)
2417 """
2418 ret = cls()
2419 for obj in objects:
2420 ret.object_store.add_object(obj)
2421 for refname, sha in refs.items():
2422 ret.refs.add_if_new(refname, sha)
2423 ret._init_files(bare=True, format=format)
2424 return ret